You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Laurent Bonafons (JIRA)" <ji...@apache.org> on 2017/04/07 12:39:41 UTC

[jira] [Created] (AIRFLOW-1086) Fail to execute task with upstream dependency in subdag

Laurent Bonafons created AIRFLOW-1086:
-----------------------------------------

             Summary: Fail to execute task with upstream dependency in subdag
                 Key: AIRFLOW-1086
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1086
             Project: Apache Airflow
          Issue Type: Bug
          Components: celery, subdag
    Affects Versions: Airflow 1.8
            Reporter: Laurent Bonafons
         Attachments: test_bubdag_task_instances.png, test_subdag_graph.png, test_subdag.py

Hello,
We have been migrating from Airflow v1.7.1.3 to v1.8.0 and we can't run subdags anymore. We use CeleryExecutor with RabbitMQ for backend.

I tested on more and more simplified cases to finish up with the great example "test_subdag" from Joe Schmid (cf attachment).
And it still doesn't work. In a subdag only the first tasks, the ones without upstream dependencies, run.
When a task is successful in a subdag, downstream tasks are not executed at all even if in the log of the subdag we can see that "Dependencies all met" for the task.

This looks similar to AIRFLOW-955 ("job failed to execute tasks") reported by Jeff Liu
but here we're not on level 2, it's just a subdag containing tasks.

Here an example of subdag log in v1.7.1.3
{noformat} [2017-04-06 12:11:33,648] {models.py:154} INFO - Filling up the DagBag from /usr/local/airflow/dags/tricky_test_3.py
[2017-04-06 12:11:35,052] {models.py:154} INFO - Filling up the DagBag from /usr/local/airflow/dags/tricky_test_3.py
[2017-04-06 12:11:35,125] {models.py:1196} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2017-04-06 12:11:35,136] {models.py:1219} INFO - Executing <Task(SubDagOperator): SubDagOp> on 2017-04-03 00:00:00
[2017-04-06 12:11:35,165] {base_executor.py:36} INFO - Adding to queue: airflow run Test_SubDAG.SubDagOp SubDAG_Task1 2017-04-03T00:00:00 --local -sd DAGS_FOLDER/tricky_test_3.py 
[2017-04-06 12:11:40,014] {sequential_executor.py:26} INFO - Executing command: airflow run Test_SubDAG.SubDagOp SubDAG_Task1 2017-04-03T00:00:00 --local -sd DAGS_FOLDER/tricky_test_3.py 
[2017-04-06 12:11:46,176] {jobs.py:934} INFO - Task instance ('Test_SubDAG.SubDagOp', 'SubDAG_Task1', datetime.datetime(2017, 4, 3, 0, 0)) succeeded
[2017-04-06 12:11:46,176] {jobs.py:997} INFO - [backfill progress] | waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | skipped: 0 | deadlocked: 0
[2017-04-06 12:11:46,185] {base_executor.py:36} INFO - Adding to queue: airflow run Test_SubDAG.SubDagOp SubDAG_Task2 2017-04-03T00:00:00 --local -sd DAGS_FOLDER/tricky_test_3.py 
[2017-04-06 12:11:46,195] {sequential_executor.py:26} INFO - Executing command: airflow run Test_SubDAG.SubDagOp SubDAG_Task2 2017-04-03T00:00:00 --local -sd DAGS_FOLDER/tricky_test_3.py 
[2017-04-06 12:11:52,177] {jobs.py:934} INFO - Task instance ('Test_SubDAG.SubDagOp', 'SubDAG_Task2', datetime.datetime(2017, 4, 3, 0, 0)) succeeded
[2017-04-06 12:11:52,177] {jobs.py:997} INFO - [backfill progress] | waiting: 0 | succeeded: 2 | kicked_off: 2 | failed: 0 | skipped: 0 | deadlocked: 0
[2017-04-06 12:11:52,178] {jobs.py:1026} INFO - Backfill done. Exiting.
{noformat}

And here in v1.8.0
{noformat}
[2017-04-05 16:17:51,854] {models.py:167} INFO - Filling up the DagBag from /usr/local/airflow/dags/tricky_test_3.py
[2017-04-05 16:17:51,996] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run Test_SubDAG SubDagOp 2017-04-04T00:00:00 --job_id 9987 --raw -sd DAGS_FOLDER/tricky_test_3.py']
[2017-04-05 16:17:52,803] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:52,803] {__init__.py:57} INFO - Using executor CeleryExecutor
[2017-04-05 16:17:52,917] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:52,917] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-04-05 16:17:52,957] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:52,956] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2017-04-05 16:17:53,262] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,262] {models.py:167} INFO - Filling up the DagBag from /usr/local/airflow/dags/tricky_test_3.py
[2017-04-05 16:17:53,401] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,400] {models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp 2017-04-04 00:00:00 [queued]>
[2017-04-05 16:17:53,409] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,409] {models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp 2017-04-04 00:00:00 [queued]>
[2017-04-05 16:17:53,409] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,409] {models.py:1318} INFO - 
[2017-04-05 16:17:53,409] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-04-05 16:17:53,409] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1 of 1
[2017-04-05 16:17:53,409] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-04-05 16:17:53,410] {base_task_runner.py:95} INFO - Subtask: 
[2017-04-05 16:17:53,443] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,442] {models.py:1342} INFO - Executing <Task(SubDagOperator): SubDagOp> on 2017-04-04 00:00:00
[2017-04-05 16:17:53,553] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,552] {models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task1 2017-04-04 00:00:00 [scheduled]>
[2017-04-05 16:17:53,559] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,558] {base_executor.py:50} INFO - Adding to queue: airflow run Test_SubDAG.SubDagOp SubDAG_Task1 2017-04-04T00:00:00 --local -sd DAGS_FOLDER/tricky_test_3.py
[2017-04-05 16:17:53,644] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:53,644] {models.py:1120} INFO - Dependencies not met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task2 2017-04-04 00:00:00 [scheduled]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': Decimal('0'), 'failed': Decimal('0'), 'upstream_failed': Decimal('0'), 'skipped': Decimal('0'), 'done': 0L}, upstream_task_ids=['SubDAG_Task1']
[2017-04-05 16:17:58,477] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:58,477] {sequential_executor.py:40} INFO - Executing command: airflow run Test_SubDAG.SubDagOp SubDAG_Task1 2017-04-04T00:00:00 --local -sd DAGS_FOLDER/tricky_test_3.py
[2017-04-05 16:17:59,284] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:59,284] {__init__.py:57} INFO - Using executor CeleryExecutor
[2017-04-05 16:17:59,399] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:59,398] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-04-05 16:17:59,438] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:17:59,438] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2017-04-05 16:18:04,975] {base_task_runner.py:95} INFO - Subtask: Logging into: /usr/local/airflow/logs/Test_SubDAG.SubDagOp/SubDAG_Task1/2017-04-04T00:00:00
[2017-04-05 16:18:04,997] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:04,997] {models.py:4025} INFO - Updating state for <DagRun Test_SubDAG.SubDagOp @ 2017-04-04 00:00:00: backfill_2017-04-04T00:00:00, externally triggered: False> considering 2 task(s)
[2017-04-05 16:18:05,028] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:05,028] {jobs.py:1982} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2017-04-05 16:18:05,043] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:05,043] {models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task2 2017-04-04 00:00:00 [None]>
[2017-04-05 16:18:05,060] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:05,060] {models.py:4025} INFO - Updating state for <DagRun Test_SubDAG.SubDagOp @ 2017-04-04 00:00:00: backfill_2017-04-04T00:00:00, externally triggered: False> considering 2 task(s)
[2017-04-05 16:18:05,078] {base_task_runner.py:95} INFO - Subtask: /usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate.  Consider alternative strategies for improved performance.
[2017-04-05 16:18:05,078] {base_task_runner.py:95} INFO - Subtask:   'strategies for improved performance.' % expr)
[2017-04-05 16:18:05,081] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:05,080] {jobs.py:1982} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2017-04-05 16:18:05,095] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:05,095] {models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task2 2017-04-04 00:00:00 [None]>
[2017-04-05 16:18:10,071] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:10,070] {models.py:4025} INFO - Updating state for <DagRun Test_SubDAG.SubDagOp @ 2017-04-04 00:00:00: backfill_2017-04-04T00:00:00, externally triggered: False> considering 2 task(s)
[2017-04-05 16:18:10,092] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:10,091] {jobs.py:1982} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2017-04-05 16:18:10,106] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:10,105] {models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task2 2017-04-04 00:00:00 [None]>
[2017-04-05 16:18:15,080] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:15,080] {models.py:4025} INFO - Updating state for <DagRun Test_SubDAG.SubDagOp @ 2017-04-04 00:00:00: backfill_2017-04-04T00:00:00, externally triggered: False> considering 2 task(s)
[2017-04-05 16:18:15,101] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:15,100] {jobs.py:1982} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2017-04-05 16:18:15,116] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:15,116] {models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task2 2017-04-04 00:00:00 [None]>
[2017-04-05 16:18:20,091] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:20,090] {models.py:4025} INFO - Updating state for <DagRun Test_SubDAG.SubDagOp @ 2017-04-04 00:00:00: backfill_2017-04-04T00:00:00, externally triggered: False> considering 2 task(s)
[2017-04-05 16:18:20,112] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:20,111] {jobs.py:1982} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2017-04-05 16:18:20,127] {base_task_runner.py:95} INFO - Subtask: [2017-04-05 16:18:20,126] {models.py:1126} INFO - Dependencies all met for <TaskInstance: Test_SubDAG.SubDagOp.SubDAG_Task2 2017-04-04 00:00:00 [None]>
{noformat}

Also in attachment:
- test_subdag_graph.png is what you can see in the graph view
- test_bubdag_task_instances.png is what you can find by browsing in "Task Instances"




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)