You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/08/31 23:53:00 UTC

[jira] [Commented] (AIRFLOW-2145) Deadlock after clearing a running task

    [ https://issues.apache.org/jira/browse/AIRFLOW-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599411#comment-16599411 ] 

ASF GitHub Bot commented on AIRFLOW-2145:
-----------------------------------------

kaxil closed pull request #3657: [AIRFLOW-2145] fix deadlock on clearing running task instance
URL: https://github.com/apache/incubator-airflow/pull/3657
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index 9da98510eb..a351df07b9 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -101,7 +101,6 @@ def finished(cls):
         """
         return [
             cls.SUCCESS,
-            cls.SHUTDOWN,
             cls.FAILED,
             cls.SKIPPED,
         ]
@@ -117,5 +116,6 @@ def unfinished(cls):
             cls.SCHEDULED,
             cls.QUEUED,
             cls.RUNNING,
+            cls.SHUTDOWN,
             cls.UP_FOR_RETRY
         ]
diff --git a/tests/models.py b/tests/models.py
index 1c88ea47f7..529ae56454 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -801,7 +801,26 @@ def test_dagrun_deadlock(self):
         dr.update_state()
         self.assertEqual(dr.state, State.FAILED)
 
-    def test_dagrun_no_deadlock(self):
+    def test_dagrun_no_deadlock_with_shutdown(self):
+        session = settings.Session()
+        dag = DAG('test_dagrun_no_deadlock_with_shutdown',
+                  start_date=DEFAULT_DATE)
+        with dag:
+            op1 = DummyOperator(task_id='upstream_task')
+            op2 = DummyOperator(task_id='downstream_task')
+            op2.set_upstream(op1)
+
+        dr = dag.create_dagrun(run_id='test_dagrun_no_deadlock_with_shutdown',
+                               state=State.RUNNING,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE)
+        upstream_ti = dr.get_task_instance(task_id='upstream_task')
+        upstream_ti.set_state(State.SHUTDOWN, session=session)
+
+        dr.update_state()
+        self.assertEqual(dr.state, State.RUNNING)
+
+    def test_dagrun_no_deadlock_with_depends_on_past(self):
         session = settings.Session()
         dag = DAG('test_dagrun_no_deadlock',
                   start_date=DEFAULT_DATE)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Deadlock after clearing a running task
> --------------------------------------
>
>                 Key: AIRFLOW-2145
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2145
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.9.0
>            Reporter: George Roldugin
>            Priority: Minor
>             Fix For: 1.10.1
>
>         Attachments: image-2018-02-23-18-59-11-828.png, image-2018-02-23-19-00-37-741.png, image-2018-02-23-19-00-55-630.png, image-2018-02-23-19-01-45-012.png, image-2018-02-23-19-01-57-498.png, image-2018-02-23-19-02-18-837.png
>
>
> TL;DR The essense of the issue is that whenever a currently running ask is cleared, the dagrun enters a deadlocked state and fails.
>  
> We see this in production with Celery executors and {{TimeDeltaSensor}}, and I've been able to reproduce it locally with both {{TimeDeltaSensor}} and {{WebHDFSSensor}}.
> Here's the minimal example:
> {code:java}
> from datetime import datetime, timedelta
> import airflow
> from airflow.operators.sensors import TimeDeltaSensor
> from airflow.operators.dummy_operator import DummyOperator
> with airflow.DAG(
>     'foo',
>     schedule_interval='@daily',
>     start_date=datetime(2018, 1, 1)) as dag:
>     wait_for_upstream_sla = TimeDeltaSensor(
>         task_id="wait_for_upstream_sla",
>         delta=timedelta(days=365*10)
>     )
>     do_work = DummyOperator(task_id='do_work')
>     dag >> wait_for_upstream_sla >> do_work
> {code}
>  
> Sequence of actions, relevant DEBUG level logs, and some UI screenshots
> {code:java}
> airflow clear foo -e 2018-02-22 --no_confirm && airflow backfill foo -s 2018-02-22 -e 2018-02-22{code}
> {code:java}
> [2018-02-23 17:17:45,983] {__init__.py:45} INFO - Using executor SequentialExecutor
> [2018-02-23 17:17:46,069] {models.py:189} INFO - Filling up the DagBag from /Users/grol/Drive/dev/reporting/dags
> ...
> [2018-02-23 17:17:47,563] {jobs.py:2180} DEBUG - Task instance to run <TaskInstance: foo.wait_for_upstream_sla 2018-02-22 00:00:00 [scheduled]> state scheduled
> ...
> {code}
> !image-2018-02-23-18-59-11-828.png|width=418,height=87!
> Now we clear all DAG's tasks externally:
> {code:java}
> airflow clear foo -e 2018-02-22 --no_confirm
> {code}
> This causes the following:
> {code:java}
> [2018-02-23 17:17:55,258] {base_task_runner.py:98} INFO - Subtask: [2018-02-23 17:17:55,258] {sensors.py:629} INFO - Checking if the time (2018-02-23 16:19:00) has come
> [2018-02-23 17:17:58,844] {jobs.py:184} DEBUG - [heart] Boom.
> [2018-02-23 17:18:03,848] {jobs.py:184} DEBUG - [heart] Boom.
> [2018-02-23 17:18:08,856] {jobs.py:2585} WARNING - State of this instance has been externally set to shutdown. Taking the poison pill.
> [2018-02-23 17:18:08,874] {helpers.py:266} DEBUG - There are no descendant processes to kill
> [2018-02-23 17:18:08,875] {jobs.py:184} DEBUG - [heart] Boom.
> [2018-02-23 17:18:08,900] {helpers.py:266} DEBUG - There are no descendant processes to kill
> [2018-02-23 17:18:08,922] {helpers.py:266} DEBUG - There are no descendant processes to kill
> [2018-02-23 17:18:09,005] {sequential_executor.py:47} ERROR - Failed to execute task Command 'airflow run foo wait_for_upstream_sla 2018-02-22T00:00:00 --local -sd DAGS_FOLDER/foo.py' returned non-zero exit status 1.
> [2018-02-23 17:18:09,012] {jobs.py:2004} DEBUG - Executor state: failed task <TaskInstance: foo.wait_for_upstream_sla 2018-02-22 00:00:00 [shutdown]>
> [2018-02-23 17:18:09,018] {models.py:4584} INFO - Updating state for <DagRun foo @ 2018-02-22 00:00:00: backfill_2018-02-22T00:00:00, externally triggered: False> considering 2 task(s)
> [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - <TaskInstance: foo.do_work 2018-02-22 00:00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
> [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - <TaskInstance: foo.do_work 2018-02-22 00:00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The context specified that being in a retry period was permitted.
> [2018-02-23 17:18:09,027] {models.py:1215} DEBUG - <TaskInstance: foo.do_work 2018-02-22 00:00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'skipped': 0, 'successes': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0, 'total': 1}, upstream_task_ids=['wait_for_upstream_sla']
> [2018-02-23 17:18:09,029] {models.py:4643} INFO - Deadlock; marking run <DagRun foo @ 2018-02-22 00:00:00: backfill_2018-02-22T00:00:00, externally triggered: False> failed
> [2018-02-23 17:18:09,045] {jobs.py:2125} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
> [2018-02-23 17:18:09,045] {jobs.py:2129} DEBUG - Finished dag run loop iteration. Remaining tasks [<TaskInstance: foo.do_work 2018-02-22 00:00:00 [scheduled]>]
> [2018-02-23 17:18:09,045] {jobs.py:2160} DEBUG - *** Clearing out not_ready list ***
> [2018-02-23 17:18:09,048] {jobs.py:2180} DEBUG - Task instance to run <TaskInstance: foo.do_work 2018-02-22 00:00:00 [None]> state None
> [2018-02-23 17:18:09,049] {jobs.py:2186} WARNING - FIXME: task instance {} state was set to None externally. This should not happen
> [2018-02-23 17:18:09,053] {models.py:1215} DEBUG - <TaskInstance: foo.do_work 2018-02-22 00:00:00 [scheduled]> dependency 'Task Instance State' PASSED: True, Task state scheduled was valid.
> [2018-02-23 17:18:09,053] {models.py:1215} DEBUG - <TaskInstance: foo.do_work 2018-02-22 00:00:00 [scheduled]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
> [2018-02-23 17:18:09,056] {models.py:1215} DEBUG - <TaskInstance: foo.do_work 2018-02-22 00:00:00 [scheduled]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not set.
> [2018-02-23 17:18:09,056] {models.py:1215} DEBUG - <TaskInstance: foo.do_work 2018-02-22 00:00:00 [scheduled]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
> [2018-02-23 17:18:09,061] {models.py:1215} DEBUG - <TaskInstance: foo.do_work 2018-02-22 00:00:00 [scheduled]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'skipped': 0, 'successes': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0, 'total': 1}, upstream_task_ids=['wait_for_upstream_sla']
> [2018-02-23 17:18:09,061] {models.py:1190} INFO - Dependencies not met for <TaskInstance: foo.do_work 2018-02-22 00:00:00 [scheduled]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'skipped': 0, 'successes': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0, 'total': 1}, upstream_task_ids=['wait_for_upstream_sla']
> [2018-02-23 17:18:09,061] {jobs.py:2274} DEBUG - Adding <TaskInstance: foo.do_work 2018-02-22 00:00:00 [scheduled]> to not_ready
> [2018-02-23 17:18:09,067] {jobs.py:184} DEBUG - [heart] Boom.
> {code}
> !image-2018-02-23-19-00-37-741.png|width=375,height=78!
> !image-2018-02-23-19-01-57-498.png|width=374,height=77!
> Interestingly, once the success condition of the {{TimeDeltaSensor}} is met, in production we see the following final state in the UI: DAG failed, while the {{TimeDeltaSensor}} task succeeded, though there's no evidence of success in the celery executors logs.
>   !image-2018-02-23-19-02-18-837.png|width=563,height=87!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)