You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Gerard Toonstra (JIRA)" <ji...@apache.org> on 2017/04/27 14:40:04 UTC

[jira] [Commented] (AIRFLOW-1143) Tasks rejected by workers get stuck in QUEUED

    [ https://issues.apache.org/jira/browse/AIRFLOW-1143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15986743#comment-15986743 ] 

Gerard Toonstra commented on AIRFLOW-1143:
------------------------------------------

The scheduler code and interaction between workers, task instances getting executed by them is a really interesting and complex piece of code. 

The heartbeats by the way do not check if tasks are still really executing, it just ensures that processes get started and results are collected. From the perspective of the executor, there are two consequences: success or fail, which is determined by the return value of the called process.

This means that a return or an exception thrown in the models.TaskInstance.run() method eventually determines whether something is considered failing or succeeding. Within that method, the worker has a connection to the database and the method should from that perspective set the task instance state correctly, otherwise the task instance gets stuck. Celery doesn't have capabilities I believe to verify some task is still running (anywhere) or not. 

What happens here is that the run() method started ok, but there are two checks for additional dependency checks, which could fail. One of them changes the state to NONE, which I believe would get the task instance to be rerun for an active dagrun in the _process_task_instances method of the SchedulerJob.py. The first QUEUE_DEPS check however does not set this state, so the state of this task would remain in QUEUED. 

In the reaping process when the scheduler restarts, it checks the task instance key against the executor.tasks_queued list (which is then empty), so then it picks up the task instance and sets the state to NONE there. This is only called at the start of the scheduler, because otherwise queued and scheduled tasks would get removed.

Sounds like setting the state to NONE in the TaskInstance.run() method after the queue_dep_context check fails could solve this in the quickest way.

The TaskInstance.run() is responsible for setting the final state on the TI in the database, but there is a check in the SchedulerJob if the recorded state matches the output state from the executor deployment (in _manage_executor_state). You'd then see this in the log : " Executor reports task instance {} finished ({}) "
                           "although the task says its {}." 

For the completeness of this JIRA, it would also be helpful if you indicate seeing this or not: 

FIXME: Rescheduling due to concurrency limits

Time for others to shine light on what I just said...

> Tasks rejected by workers get stuck in QUEUED
> ---------------------------------------------
>
>                 Key: AIRFLOW-1143
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1143
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>            Reporter: Dan Davydov
>            Assignee: Gerard Toonstra
>
> If the scheduler schedules a task that is sent to a worker that then rejects the task (e.g. because one of the dependencies of the tasks became bad, like the pool became full), the task will be stuck in the QUEUED state. We hit this trying to switch from invoking the scheduler "airflow scheduler -n 5" to just "airflow scheduler".
> Restarting the scheduler fixes this because it cleans up orphans, but we shouldn't have to restart the scheduler to fix these problems (the missing job heartbeats should make the scheduler requeue the task).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)