You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 02:49:13 UTC

[09/28] incubator-airflow git commit: [AIRFLOW-897] Prevent dagruns from failing with unfinished tasks

[AIRFLOW-897] Prevent dagruns from failing with unfinished tasks

Closes #2099 from
aoen/ddavydov/fix_premature_dagrun_failures


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c29af466
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c29af466
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c29af466

Branch: refs/heads/v1-8-test
Commit: c29af4668a67b5d7f969140549558714fb7b32c9
Parents: ff0fa00
Author: Dan Davydov <da...@airbnb.com>
Authored: Fri Feb 24 14:29:11 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:17:40 2017 -0700

----------------------------------------------------------------------
 airflow/models.py             |  6 +++---
 tests/dags/test_issue_1225.py | 13 +++++++++++++
 tests/jobs.py                 | 24 ++++++++++++++++++++++++
 3 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c29af466/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 1829ff3..3fef407 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3993,12 +3993,12 @@ class DagRun(Base):
 
         # future: remove the check on adhoc tasks (=active_tasks)
         if len(tis) == len(dag.active_tasks):
-            # if any roots failed, the run failed
             root_ids = [t.task_id for t in dag.roots]
             roots = [t for t in tis if t.task_id in root_ids]
 
-            if any(r.state in (State.FAILED, State.UPSTREAM_FAILED)
-                   for r in roots):
+            # if all roots finished and at least on failed, the run failed
+            if (not unfinished_tasks and
+                    any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
                 logging.info('Marking run {} failed'.format(self))
                 self.state = State.FAILED
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c29af466/tests/dags/test_issue_1225.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py
index 021561f..d01fd79 100644
--- a/tests/dags/test_issue_1225.py
+++ b/tests/dags/test_issue_1225.py
@@ -129,3 +129,16 @@ dag7_subdag1 = SubDagOperator(
     subdag=subdag7)
 subdag7_task1.set_downstream(subdag7_task2)
 subdag7_task2.set_downstream(subdag7_task3)
+
+# DAG tests that a Dag run that doesn't complete but has a root failure is marked running
+dag8 = DAG(dag_id='test_dagrun_states_root_fail_unfinished', default_args=default_args)
+dag8_task1 = DummyOperator(
+    task_id='test_dagrun_unfinished',  # The test will unset the task instance state after
+                                       # running this test
+    dag=dag8,
+)
+dag8_task2 = PythonOperator(
+    task_id='test_dagrun_fail',
+    dag=dag8,
+    python_callable=fail,
+)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c29af466/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index e520b44..1f7950e 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -358,6 +358,30 @@ class SchedulerJobTest(unittest.TestCase):
             },
             dagrun_state=State.FAILED)
 
+    def test_dagrun_root_fail_unfinished(self):
+        """
+        DagRuns with one unfinished and one failed root task -> RUNNING
+        """
+        # Run both the failed and successful tasks
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        dag_id = 'test_dagrun_states_root_fail_unfinished'
+        dag = self.dagbag.get_dag(dag_id)
+        dag.clear()
+        dr = scheduler.create_dag_run(dag)
+        try:
+            dag.run(start_date=dr.execution_date, end_date=dr.execution_date)
+        except AirflowException:  # Expect an exception since there is a failed task
+            pass
+
+        # Mark the successful task as never having run since we want to see if the
+        # dagrun will be in a running state despite haveing an unfinished task.
+        session = settings.Session()
+        ti = dr.get_task_instance('test_dagrun_unfinished', session=session)
+        ti.state = State.NONE
+        session.commit()
+        dr_state = dr.update_state()
+        self.assertEqual(dr_state, State.RUNNING)
+
     def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self):
         """
         DagRun is marked a success if ignore_first_depends_on_past=True