You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 04:45:30 UTC

[32/45] incubator-airflow git commit: [AIRFLOW-719] Prevent DAGs from ending prematurely

[AIRFLOW-719] Prevent DAGs from ending prematurely

DAGs using ALL_SUCCESS and ONE_SUCCESS trigger
rules were ending
prematurely when upstream tasks were skipped.
Changes mean that the
ALL_SUCCESS and ONE_SUCCESS triggers rule
encompasses both SUCCESS and
SKIPPED tasks.

Closes #2125 from dhuang/AIRFLOW-719


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4077c6de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4077c6de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4077c6de

Branch: refs/heads/v1-8-stable
Commit: 4077c6de297566a4c598065867a9a27324ae6eb1
Parents: 157054e
Author: Daniel Huang <dx...@gmail.com>
Authored: Sat Mar 4 17:33:23 2017 +0100
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:27:30 2017 -0700

----------------------------------------------------------------------
 airflow/ti_deps/deps/trigger_rule_dep.py      |  6 +-
 tests/dags/test_dagrun_short_circuit_false.py | 38 +++++++++++
 tests/models.py                               | 79 +++++++++++++++++++---
 3 files changed, 111 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4077c6de/airflow/ti_deps/deps/trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py
index 281ed51..da13bba 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -135,7 +135,7 @@ class TriggerRuleDep(BaseTIDep):
             if tr == TR.ALL_SUCCESS:
                 if upstream_failed or failed:
                     ti.set_state(State.UPSTREAM_FAILED, session)
-                elif skipped:
+                elif skipped == upstream:
                     ti.set_state(State.SKIPPED, session)
             elif tr == TR.ALL_FAILED:
                 if successes or skipped:
@@ -148,7 +148,7 @@ class TriggerRuleDep(BaseTIDep):
                     ti.set_state(State.SKIPPED, session)
 
         if tr == TR.ONE_SUCCESS:
-            if successes <= 0:
+            if successes <= 0 and skipped <= 0:
                 yield self._failing_status(
                     reason="Task's trigger rule '{0}' requires one upstream "
                     "task success, but none were found. "
@@ -162,7 +162,7 @@ class TriggerRuleDep(BaseTIDep):
                     "upstream_tasks_state={1}, upstream_task_ids={2}"
                     .format(tr, upstream_tasks_state, task.upstream_task_ids))
         elif tr == TR.ALL_SUCCESS:
-            num_failures = upstream - successes
+            num_failures = upstream - (successes + skipped)
             if num_failures > 0:
                 yield self._failing_status(
                     reason="Task's trigger rule '{0}' requires all upstream "

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4077c6de/tests/dags/test_dagrun_short_circuit_false.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_dagrun_short_circuit_false.py b/tests/dags/test_dagrun_short_circuit_false.py
new file mode 100644
index 0000000..805ab67
--- /dev/null
+++ b/tests/dags/test_dagrun_short_circuit_false.py
@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+
+from airflow.models import DAG
+from airflow.operators.python_operator import ShortCircuitOperator
+from airflow.operators.dummy_operator import DummyOperator
+
+
+# DAG that has its short circuit op fail and skip multiple downstream tasks
+dag = DAG(
+    dag_id='test_dagrun_short_circuit_false',
+    start_date=datetime(2017, 1, 1)
+)
+dag_task1 = ShortCircuitOperator(
+    task_id='test_short_circuit_false',
+    dag=dag,
+    python_callable=lambda: False)
+dag_task2 = DummyOperator(
+    task_id='test_state_skipped1',
+    dag=dag)
+dag_task3 = DummyOperator(
+    task_id='test_state_skipped2',
+    dag=dag)
+dag_task1.set_downstream(dag_task2)
+dag_task2.set_downstream(dag_task3)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4077c6de/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 7ca01e7..d904ff3 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -34,6 +34,7 @@ from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.utils.state import State
 from mock import patch
 from nose_parameterized import parameterized
+from tests.core import TEST_DAG_FOLDER
 
 DEFAULT_DATE = datetime.datetime(2016, 1, 1)
 TEST_DAGS_FOLDER = os.path.join(
@@ -117,13 +118,71 @@ class DagTest(unittest.TestCase):
         self.assertEqual(dag.dag_id, 'creating_dag_in_cm')
         self.assertEqual(dag.tasks[0].task_id, 'op6')
 
+
 class DagRunTest(unittest.TestCase):
+
+    def setUp(self):
+        self.dagbag = models.DagBag(dag_folder=TEST_DAG_FOLDER)
+
+    def create_dag_run(self, dag_id, state=State.RUNNING, task_states=None):
+        now = datetime.datetime.now()
+        dag = self.dagbag.get_dag(dag_id)
+        dag_run = dag.create_dagrun(
+            run_id='manual__' + now.isoformat(),
+            execution_date=now,
+            start_date=now,
+            state=State.RUNNING,
+            external_trigger=False,
+        )
+
+        if task_states is not None:
+            session = settings.Session()
+            for task_id, state in task_states.items():
+                ti = dag_run.get_task_instance(task_id)
+                ti.set_state(state, session)
+            session.close()
+
+        return dag_run
+
     def test_id_for_date(self):
         run_id = models.DagRun.id_for_date(
             datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None))
-        assert run_id == 'scheduled__2015-01-02T03:04:05', (
+        self.assertEqual(
+            'scheduled__2015-01-02T03:04:05', run_id,
             'Generated run_id did not match expectations: {0}'.format(run_id))
 
+    def test_dagrun_running_when_upstream_skipped(self):
+        """
+        Tests that a DAG run is not failed when an upstream task is skipped
+        """
+        initial_task_states = {
+            'test_short_circuit_false': State.SUCCESS,
+            'test_state_skipped1': State.SKIPPED,
+            'test_state_skipped2': State.NONE,
+        }
+        # dags/test_dagrun_short_circuit_false.py
+        dag_run = self.create_dag_run('test_dagrun_short_circuit_false',
+                                      state=State.RUNNING,
+                                      task_states=initial_task_states)
+        updated_dag_state = dag_run.update_state()
+        self.assertEqual(State.RUNNING, updated_dag_state)
+
+    def test_dagrun_success_when_all_skipped(self):
+        """
+        Tests that a DAG run succeeds when all tasks are skipped
+        """
+        initial_task_states = {
+            'test_short_circuit_false': State.SUCCESS,
+            'test_state_skipped1': State.SKIPPED,
+            'test_state_skipped2': State.SKIPPED,
+        }
+        # dags/test_dagrun_short_circuit_false.py
+        dag_run = self.create_dag_run('test_dagrun_short_circuit_false',
+                                      state=State.RUNNING,
+                                      task_states=initial_task_states)
+        updated_dag_state = dag_run.update_state()
+        self.assertEqual(State.SUCCESS, updated_dag_state)
+
 
 class DagBagTest(unittest.TestCase):
 
@@ -501,7 +560,7 @@ class TaskInstanceTest(unittest.TestCase):
         self.assertEqual(dt, ti.end_date+max_delay)
 
     def test_depends_on_past(self):
-        dagbag = models.DagBag()
+        dagbag = models.DagBag(dag_folder=TEST_DAG_FOLDER)
         dag = dagbag.get_dag('test_depends_on_past')
         dag.clear()
         task = dag.tasks[0]
@@ -530,10 +589,11 @@ class TaskInstanceTest(unittest.TestCase):
         #
         # Tests for all_success
         #
-        ['all_success', 5, 0, 0, 0, 0, True, None, True],
-        ['all_success', 2, 0, 0, 0, 0, True, None, False],
-        ['all_success', 2, 0, 1, 0, 0, True, ST.UPSTREAM_FAILED, False],
-        ['all_success', 2, 1, 0, 0, 0, True, ST.SKIPPED, False],
+        ['all_success', 5, 0, 0, 0, 5, True, None, True],
+        ['all_success', 2, 0, 0, 0, 2, True, None, False],
+        ['all_success', 2, 0, 1, 0, 3, True, ST.UPSTREAM_FAILED, False],
+        ['all_success', 2, 1, 0, 0, 3, True, None, False],
+        ['all_success', 0, 5, 0, 0, 5, True, ST.SKIPPED, True],
         #
         # Tests for one_success
         #
@@ -541,6 +601,7 @@ class TaskInstanceTest(unittest.TestCase):
         ['one_success', 2, 0, 0, 0, 2, True, None, True],
         ['one_success', 2, 0, 1, 0, 3, True, None, True],
         ['one_success', 2, 1, 0, 0, 3, True, None, True],
+        ['one_success', 0, 2, 0, 0, 2, True, None, True],
         #
         # Tests for all_failed
         #
@@ -552,9 +613,9 @@ class TaskInstanceTest(unittest.TestCase):
         #
         # Tests for one_failed
         #
-        ['one_failed', 5, 0, 0, 0, 0, True, None, False],
-        ['one_failed', 2, 0, 0, 0, 0, True, None, False],
-        ['one_failed', 2, 0, 1, 0, 0, True, None, True],
+        ['one_failed', 5, 0, 0, 0, 5, True, ST.SKIPPED, False],
+        ['one_failed', 2, 0, 0, 0, 2, True, None, False],
+        ['one_failed', 2, 0, 1, 0, 2, True, None, True],
         ['one_failed', 2, 1, 0, 0, 3, True, None, False],
         ['one_failed', 2, 3, 0, 0, 5, True, ST.SKIPPED, False],
         #