You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/04 15:04:27 UTC

[1/4] incubator-airflow git commit: Revert "[AIRFLOW-719] Prevent DAGs from ending prematurely"

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f2dae7d15 -> 4a6bef69d


Revert "[AIRFLOW-719] Prevent DAGs from ending prematurely"

This reverts commit 1fdcf2480555f06cce3fc9bba97fbf3d64f074d3.

This reinstates the previous logic (< 1.8.0) that ALL_SUCCESS requires
all tasks to be successful instead of also counting SKIPPED
tasks as part of the successful tasks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/92965e82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/92965e82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/92965e82

Branch: refs/heads/master
Commit: 92965e8275c6f2ec2282ad46c09950bab10c1cb2
Parents: 4c09050
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Mon Mar 27 20:12:29 2017 -0700
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Mar 28 17:42:48 2017 -0700

----------------------------------------------------------------------
 airflow/ti_deps/deps/trigger_rule_dep.py      |  6 +-
 tests/dags/test_dagrun_short_circuit_false.py | 38 ----------
 tests/models.py                               | 83 +++++++++++-----------
 3 files changed, 46 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92965e82/airflow/ti_deps/deps/trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py
index 3a77b00..cf06c0b 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -135,7 +135,7 @@ class TriggerRuleDep(BaseTIDep):
             if tr == TR.ALL_SUCCESS:
                 if upstream_failed or failed:
                     ti.set_state(State.UPSTREAM_FAILED, session)
-                elif skipped == upstream:
+                elif skipped:
                     ti.set_state(State.SKIPPED, session)
             elif tr == TR.ALL_FAILED:
                 if successes or skipped:
@@ -148,7 +148,7 @@ class TriggerRuleDep(BaseTIDep):
                     ti.set_state(State.SKIPPED, session)
 
         if tr == TR.ONE_SUCCESS:
-            if successes <= 0 and skipped <= 0:
+            if successes <= 0:
                 yield self._failing_status(
                     reason="Task's trigger rule '{0}' requires one upstream "
                     "task success, but none were found. "
@@ -162,7 +162,7 @@ class TriggerRuleDep(BaseTIDep):
                     "upstream_tasks_state={1}, upstream_task_ids={2}"
                     .format(tr, upstream_tasks_state, task.upstream_task_ids))
         elif tr == TR.ALL_SUCCESS:
-            num_failures = upstream - (successes + skipped)
+            num_failures = upstream - successes
             if num_failures > 0:
                 yield self._failing_status(
                     reason="Task's trigger rule '{0}' requires all upstream "

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92965e82/tests/dags/test_dagrun_short_circuit_false.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_dagrun_short_circuit_false.py b/tests/dags/test_dagrun_short_circuit_false.py
deleted file mode 100644
index 805ab67..0000000
--- a/tests/dags/test_dagrun_short_circuit_false.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from datetime import datetime
-
-from airflow.models import DAG
-from airflow.operators.python_operator import ShortCircuitOperator
-from airflow.operators.dummy_operator import DummyOperator
-
-
-# DAG that has its short circuit op fail and skip multiple downstream tasks
-dag = DAG(
-    dag_id='test_dagrun_short_circuit_false',
-    start_date=datetime(2017, 1, 1)
-)
-dag_task1 = ShortCircuitOperator(
-    task_id='test_short_circuit_false',
-    dag=dag,
-    python_callable=lambda: False)
-dag_task2 = DummyOperator(
-    task_id='test_state_skipped1',
-    dag=dag)
-dag_task3 = DummyOperator(
-    task_id='test_state_skipped2',
-    dag=dag)
-dag_task1.set_downstream(dag_task2)
-dag_task2.set_downstream(dag_task3)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92965e82/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index dcba354..3e77894 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -31,11 +31,12 @@ from airflow.models import DagModel
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
+from airflow.operators.python_operator import ShortCircuitOperator
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.utils.state import State
 from mock import patch
 from nose_parameterized import parameterized
-from tests.core import TEST_DAG_FOLDER
+
 
 DEFAULT_DATE = datetime.datetime(2016, 1, 1)
 TEST_DAGS_FOLDER = os.path.join(
@@ -197,17 +198,13 @@ class DagTest(unittest.TestCase):
 
 class DagRunTest(unittest.TestCase):
 
-    def setUp(self):
-        self.dagbag = models.DagBag(dag_folder=TEST_DAG_FOLDER)
-
-    def create_dag_run(self, dag_id, state=State.RUNNING, task_states=None):
+    def create_dag_run(self, dag, state=State.RUNNING, task_states=None):
         now = datetime.datetime.now()
-        dag = self.dagbag.get_dag(dag_id)
         dag_run = dag.create_dagrun(
             run_id='manual__' + now.isoformat(),
             execution_date=now,
             start_date=now,
-            state=State.RUNNING,
+            state=state,
             external_trigger=False,
         )
 
@@ -223,37 +220,38 @@ class DagRunTest(unittest.TestCase):
     def test_id_for_date(self):
         run_id = models.DagRun.id_for_date(
             datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None))
-        self.assertEqual(
-            'scheduled__2015-01-02T03:04:05', run_id,
-            'Generated run_id did not match expectations: {0}'.format(run_id))
-
-    def test_dagrun_running_when_upstream_skipped(self):
-        """
-        Tests that a DAG run is not failed when an upstream task is skipped
-        """
-        initial_task_states = {
-            'test_short_circuit_false': State.SUCCESS,
-            'test_state_skipped1': State.SKIPPED,
-            'test_state_skipped2': State.NONE,
-        }
-        # dags/test_dagrun_short_circuit_false.py
-        dag_run = self.create_dag_run('test_dagrun_short_circuit_false',
-                                      state=State.RUNNING,
-                                      task_states=initial_task_states)
-        updated_dag_state = dag_run.update_state()
-        self.assertEqual(State.RUNNING, updated_dag_state)
+        self.assertEqual('scheduled__2015-01-02T03:04:05', run_id,
+                         msg='Generated run_id did not match expectations: {0}'
+                         .format(run_id))
 
     def test_dagrun_success_when_all_skipped(self):
         """
         Tests that a DAG run succeeds when all tasks are skipped
         """
+        dag = DAG(
+            dag_id='test_dagrun_success_when_all_skipped',
+            start_date=datetime.datetime(2017, 1, 1)
+        )
+        dag_task1 = ShortCircuitOperator(
+            task_id='test_short_circuit_false',
+            dag=dag,
+            python_callable=lambda: False)
+        dag_task2 = DummyOperator(
+            task_id='test_state_skipped1',
+            dag=dag)
+        dag_task3 = DummyOperator(
+            task_id='test_state_skipped2',
+            dag=dag)
+        dag_task1.set_downstream(dag_task2)
+        dag_task2.set_downstream(dag_task3)
+
         initial_task_states = {
             'test_short_circuit_false': State.SUCCESS,
             'test_state_skipped1': State.SKIPPED,
             'test_state_skipped2': State.SKIPPED,
         }
-        # dags/test_dagrun_short_circuit_false.py
-        dag_run = self.create_dag_run('test_dagrun_short_circuit_false',
+
+        dag_run = self.create_dag_run(dag=dag,
                                       state=State.RUNNING,
                                       task_states=initial_task_states)
         updated_dag_state = dag_run.update_state()
@@ -314,10 +312,17 @@ class DagRunTest(unittest.TestCase):
         """
         Make sure that a proper value is returned when a dagrun has no task instances
         """
+        dag = DAG(
+            dag_id='test_get_task_instance_on_empty_dagrun',
+            start_date=datetime.datetime(2017, 1, 1)
+        )
+        dag_task1 = ShortCircuitOperator(
+            task_id='test_short_circuit_false',
+            dag=dag,
+            python_callable=lambda: False)
+
         session = settings.Session()
 
-        # Any dag will work for this
-        dag = self.dagbag.get_dag('test_dagrun_short_circuit_false')
         now = datetime.datetime.now()
 
         # Don't use create_dagrun since it will create the task instances too which we
@@ -713,7 +718,7 @@ class TaskInstanceTest(unittest.TestCase):
         self.assertEqual(dt, ti.end_date+max_delay)
 
     def test_depends_on_past(self):
-        dagbag = models.DagBag(dag_folder=TEST_DAG_FOLDER)
+        dagbag = models.DagBag()
         dag = dagbag.get_dag('test_depends_on_past')
         dag.clear()
         task = dag.tasks[0]
@@ -742,11 +747,10 @@ class TaskInstanceTest(unittest.TestCase):
         #
         # Tests for all_success
         #
-        ['all_success', 5, 0, 0, 0, 5, True, None, True],
-        ['all_success', 2, 0, 0, 0, 2, True, None, False],
-        ['all_success', 2, 0, 1, 0, 3, True, ST.UPSTREAM_FAILED, False],
-        ['all_success', 2, 1, 0, 0, 3, True, None, False],
-        ['all_success', 0, 5, 0, 0, 5, True, ST.SKIPPED, True],
+        ['all_success', 5, 0, 0, 0, 0, True, None, True],
+        ['all_success', 2, 0, 0, 0, 0, True, None, False],
+        ['all_success', 2, 0, 1, 0, 0, True, ST.UPSTREAM_FAILED, False],
+        ['all_success', 2, 1, 0, 0, 0, True, ST.SKIPPED, False],
         #
         # Tests for one_success
         #
@@ -754,7 +758,6 @@ class TaskInstanceTest(unittest.TestCase):
         ['one_success', 2, 0, 0, 0, 2, True, None, True],
         ['one_success', 2, 0, 1, 0, 3, True, None, True],
         ['one_success', 2, 1, 0, 0, 3, True, None, True],
-        ['one_success', 0, 2, 0, 0, 2, True, None, True],
         #
         # Tests for all_failed
         #
@@ -766,9 +769,9 @@ class TaskInstanceTest(unittest.TestCase):
         #
         # Tests for one_failed
         #
-        ['one_failed', 5, 0, 0, 0, 5, True, ST.SKIPPED, False],
-        ['one_failed', 2, 0, 0, 0, 2, True, None, False],
-        ['one_failed', 2, 0, 1, 0, 2, True, None, True],
+        ['one_failed', 5, 0, 0, 0, 0, True, None, False],
+        ['one_failed', 2, 0, 0, 0, 0, True, None, False],
+        ['one_failed', 2, 0, 1, 0, 0, True, None, True],
         ['one_failed', 2, 1, 0, 0, 3, True, None, False],
         ['one_failed', 2, 3, 0, 0, 5, True, ST.SKIPPED, False],
         #


[2/4] incubator-airflow git commit: [AIRFLOW-719] Fix race condition in ShortCircuit, Branch and LatestOnly

Posted by bo...@apache.org.
[AIRFLOW-719] Fix race condition in ShortCircuit, Branch and LatestOnly

Both the ShortCircuitOperator, Branchoperator and LatestOnlyOperator
 were arbitrarily changing the states of TaskInstances without locking
them in the database. As the scheduler checks the state of dag runs
asynchronously the dag run state could be set to failed while the
operators are updating the downstream tasks.

A better fix would to use the dag run iteself in the context of the
Operator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb705fd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb705fd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb705fd5

Branch: refs/heads/master
Commit: eb705fd55c30cea778282140d927f51b4a649c73
Parents: 92965e8
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Mar 28 16:29:39 2017 -0700
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Apr 3 10:38:12 2017 +0200

----------------------------------------------------------------------
 airflow/operators/latest_only_operator.py |  30 ++++-
 airflow/operators/python_operator.py      |  82 +++++++++---
 scripts/ci/requirements.txt               |   1 +
 tests/operators/__init__.py               |   2 +
 tests/operators/latest_only_operator.py   |   2 +-
 tests/operators/python_operator.py        | 167 ++++++++++++++++++++++++-
 6 files changed, 258 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb705fd5/airflow/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py
index 8b4e614..9d5defb 100644
--- a/airflow/operators/latest_only_operator.py
+++ b/airflow/operators/latest_only_operator.py
@@ -34,7 +34,7 @@ class LatestOnlyOperator(BaseOperator):
     def execute(self, context):
         # If the DAG Run is externally triggered, then return without
         # skipping downstream tasks
-        if context['dag_run'].external_trigger:
+        if context['dag_run'] and context['dag_run'].external_trigger:
             logging.info("""Externally triggered DAG_Run:
                          allowing execution to proceed.""")
             return
@@ -46,17 +46,39 @@ class LatestOnlyOperator(BaseOperator):
         logging.info(
             'Checking latest only with left_window: %s right_window: %s '
             'now: %s', left_window, right_window, now)
+
         if not left_window < now <= right_window:
             logging.info('Not latest execution, skipping downstream.')
             session = settings.Session()
-            for task in context['task'].downstream_list:
-                ti = TaskInstance(
-                    task, execution_date=context['ti'].execution_date)
+
+            TI = TaskInstance
+            tis = session.query(TI).filter(
+                TI.execution_date == context['ti'].execution_date,
+                TI.task_id.in_(context['task'].downstream_task_ids)
+            ).with_for_update().all()
+
+            for ti in tis:
                 logging.info('Skipping task: %s', ti.task_id)
                 ti.state = State.SKIPPED
                 ti.start_date = now
                 ti.end_date = now
                 session.merge(ti)
+
+            # this is defensive against dag runs that are not complete
+            for task in context['task'].downstream_list:
+                if task.task_id in tis:
+                    continue
+
+                logging.warning("Task {} was not part of a dag run. "
+                                "This should not happen."
+                                .format(task))
+                now = datetime.datetime.now()
+                ti = TaskInstance(task, execution_date=context['ti'].execution_date)
+                ti.state = State.SKIPPED
+                ti.start_date = now
+                ti.end_date = now
+                session.merge(ti)
+
             session.commit()
             session.close()
             logging.info('Done.')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb705fd5/airflow/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py
index a17e6fa..cf240f2 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -109,14 +109,36 @@ class BranchPythonOperator(PythonOperator):
         logging.info("Following branch " + branch)
         logging.info("Marking other directly downstream tasks as skipped")
         session = settings.Session()
+
+        TI = TaskInstance
+        tis = session.query(TI).filter(
+            TI.execution_date == context['ti'].execution_date,
+            TI.task_id.in_(context['task'].downstream_task_ids),
+            TI.task_id != branch,
+        ).with_for_update().all()
+
+        for ti in tis:
+            logging.info('Skipping task: %s', ti.task_id)
+            ti.state = State.SKIPPED
+            ti.start_date = datetime.now()
+            ti.end_date = datetime.now()
+
+        # this is defensive against dag runs that are not complete
         for task in context['task'].downstream_list:
-            if task.task_id != branch:
-                ti = TaskInstance(
-                    task, execution_date=context['ti'].execution_date)
-                ti.state = State.SKIPPED
-                ti.start_date = datetime.now()
-                ti.end_date = datetime.now()
-                session.merge(ti)
+            if task.task_id in tis:
+                continue
+
+            if task.task_id == branch:
+                continue
+
+            logging.warning("Task {} was not part of a dag run. This should not happen."
+                            .format(task))
+            ti = TaskInstance(task, execution_date=context['ti'].execution_date)
+            ti.state = State.SKIPPED
+            ti.start_date = datetime.now()
+            ti.end_date = datetime.now()
+            session.merge(ti)
+
         session.commit()
         session.close()
         logging.info("Done.")
@@ -137,19 +159,39 @@ class ShortCircuitOperator(PythonOperator):
     def execute(self, context):
         condition = super(ShortCircuitOperator, self).execute(context)
         logging.info("Condition result is {}".format(condition))
+
         if condition:
             logging.info('Proceeding with downstream tasks...')
             return
-        else:
-            logging.info('Skipping downstream tasks...')
-            session = settings.Session()
-            for task in context['task'].downstream_list:
-                ti = TaskInstance(
-                    task, execution_date=context['ti'].execution_date)
-                ti.state = State.SKIPPED
-                ti.start_date = datetime.now()
-                ti.end_date = datetime.now()
-                session.merge(ti)
-            session.commit()
-            session.close()
-            logging.info("Done.")
+
+        logging.info('Skipping downstream tasks...')
+        session = settings.Session()
+
+        TI = TaskInstance
+        tis = session.query(TI).filter(
+            TI.execution_date == context['ti'].execution_date,
+            TI.task_id.in_(context['task'].downstream_task_ids),
+        ).with_for_update().all()
+
+        for ti in tis:
+            logging.info('Skipping task: %s', ti.task_id)
+            ti.state = State.SKIPPED
+            ti.start_date = datetime.now()
+            ti.end_date = datetime.now()
+
+        # this is defensive against dag runs that are not complete
+        for task in context['task'].downstream_list:
+            if task.task_id in tis:
+                continue
+
+            logging.warning("Task {} was not part of a dag run. This should not happen."
+                            .format(task))
+            ti = TaskInstance(task, execution_date=context['ti'].execution_date)
+            ti.state = State.SKIPPED
+            ti.start_date = datetime.now()
+            ti.end_date = datetime.now()
+            session.merge(ti)
+
+        session.commit()
+        session.close()
+        logging.info("Done.")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb705fd5/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index 7fdd18e..d206f16 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -22,6 +22,7 @@ flask-cache
 flask-login==0.2.11
 Flask-WTF
 flower
+freezegun
 future
 gunicorn
 hdfs

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb705fd5/tests/operators/__init__.py
----------------------------------------------------------------------
diff --git a/tests/operators/__init__.py b/tests/operators/__init__.py
index 7a517a1..e6f6830 100644
--- a/tests/operators/__init__.py
+++ b/tests/operators/__init__.py
@@ -19,3 +19,5 @@ from .sensors import *
 from .hive_operator import *
 from .s3_to_hive_operator import *
 from .python_operator import *
+from .latest_only_operator import *
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb705fd5/tests/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/latest_only_operator.py b/tests/operators/latest_only_operator.py
index 3ac5fac..9137491 100644
--- a/tests/operators/latest_only_operator.py
+++ b/tests/operators/latest_only_operator.py
@@ -80,7 +80,7 @@ class LatestOnlyOperatorTest(unittest.TestCase):
         self.assertEqual({
             datetime.datetime(2016, 1, 1): 'success',
             datetime.datetime(2016, 1, 1, 12): 'success',
-            datetime.datetime(2016, 1, 2): 'success', }, 
+            datetime.datetime(2016, 1, 2): 'success', },
             exec_date_to_latest_state)
 
         downstream_instances = get_task_instances('downstream')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb705fd5/tests/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/python_operator.py b/tests/operators/python_operator.py
index 621172f..3aa8b6c 100644
--- a/tests/operators/python_operator.py
+++ b/tests/operators/python_operator.py
@@ -18,7 +18,12 @@ import datetime
 import unittest
 
 from airflow import configuration, DAG
-from airflow.operators.python_operator import PythonOperator
+from airflow.models import TaskInstance as TI
+from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
+from airflow.operators.python_operator import ShortCircuitOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.settings import Session
+from airflow.utils.state import State
 
 from airflow.exceptions import AirflowException
 
@@ -77,3 +82,163 @@ class PythonOperatorTest(unittest.TestCase):
                 python_callable=not_callable,
                 task_id='python_operator',
                 dag=self.dag)
+
+
+class BranchOperatorTest(unittest.TestCase):
+    def setUp(self):
+        self.dag = DAG('branch_operator_test',
+                       default_args={
+                           'owner': 'airflow',
+                           'start_date': DEFAULT_DATE},
+                       schedule_interval=INTERVAL)
+        self.branch_op = BranchPythonOperator(task_id='make_choice',
+                                              dag=self.dag,
+                                              python_callable=lambda: 'branch_1')
+
+        self.branch_1 = DummyOperator(task_id='branch_1', dag=self.dag)
+        self.branch_1.set_upstream(self.branch_op)
+        self.branch_2 = DummyOperator(task_id='branch_2', dag=self.dag)
+        self.branch_2.set_upstream(self.branch_op)
+        self.dag.clear()
+
+    def test_without_dag_run(self):
+        """This checks the defensive against non existent tasks in a dag run"""
+        self.branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        session = Session()
+        tis = session.query(TI).filter(
+            TI.dag_id == self.dag.dag_id,
+            TI.execution_date == DEFAULT_DATE
+        )
+        session.close()
+
+        for ti in tis:
+            if ti.task_id == 'make_choice':
+                self.assertEquals(ti.state, State.SUCCESS)
+            elif ti.task_id == 'branch_1':
+                # should not exist
+                raise
+            elif ti.task_id == 'branch_2':
+                self.assertEquals(ti.state, State.SKIPPED)
+            else:
+                raise
+
+    def test_with_dag_run(self):
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            start_date=datetime.datetime.now(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+
+        self.branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        tis = dr.get_task_instances()
+        for ti in tis:
+            if ti.task_id == 'make_choice':
+                self.assertEquals(ti.state, State.SUCCESS)
+            elif ti.task_id == 'branch_1':
+                self.assertEquals(ti.state, State.NONE)
+            elif ti.task_id == 'branch_2':
+                self.assertEquals(ti.state, State.SKIPPED)
+            else:
+                raise
+
+
+class ShortCircuitOperatorTest(unittest.TestCase):
+    def setUp(self):
+        self.dag = DAG('shortcircuit_operator_test',
+                       default_args={
+                           'owner': 'airflow',
+                           'start_date': DEFAULT_DATE},
+                       schedule_interval=INTERVAL)
+        self.short_op = ShortCircuitOperator(task_id='make_choice',
+                                             dag=self.dag,
+                                             python_callable=lambda: self.value)
+
+        self.branch_1 = DummyOperator(task_id='branch_1', dag=self.dag)
+        self.branch_1.set_upstream(self.short_op)
+        self.upstream = DummyOperator(task_id='upstream', dag=self.dag)
+        self.upstream.set_downstream(self.short_op)
+        self.dag.clear()
+
+        self.value = True
+
+    def test_without_dag_run(self):
+        """This checks the defensive against non existent tasks in a dag run"""
+        self.value = False
+        self.short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        session = Session()
+        tis = session.query(TI).filter(
+            TI.dag_id == self.dag.dag_id,
+            TI.execution_date == DEFAULT_DATE
+        )
+
+        for ti in tis:
+            if ti.task_id == 'make_choice':
+                self.assertEquals(ti.state, State.SUCCESS)
+            elif ti.task_id == 'upstream':
+                # should not exist
+                raise
+            elif ti.task_id == 'branch_1':
+                self.assertEquals(ti.state, State.SKIPPED)
+            else:
+                raise
+
+        self.value = True
+        self.dag.clear()
+
+        self.short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+        for ti in tis:
+            if ti.task_id == 'make_choice':
+                self.assertEquals(ti.state, State.SUCCESS)
+            elif ti.task_id == 'upstream':
+                # should not exist
+                raise
+            elif ti.task_id == 'branch_1':
+                self.assertEquals(ti.state, State.NONE)
+            else:
+                raise
+
+        session.close()
+
+    def test_with_dag_run(self):
+        self.value = False
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            start_date=datetime.datetime.now(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+
+        self.upstream.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+        self.short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        tis = dr.get_task_instances()
+        for ti in tis:
+            if ti.task_id == 'make_choice':
+                self.assertEquals(ti.state, State.SUCCESS)
+            elif ti.task_id == 'upstream':
+                self.assertEquals(ti.state, State.SUCCESS)
+            elif ti.task_id == 'branch_1':
+                self.assertEquals(ti.state, State.SKIPPED)
+            else:
+                raise
+
+        self.value = True
+        self.dag.clear()
+
+        self.upstream.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+        self.short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        tis = dr.get_task_instances()
+        for ti in tis:
+            if ti.task_id == 'make_choice':
+                self.assertEquals(ti.state, State.SUCCESS)
+            elif ti.task_id == 'upstream':
+                self.assertEquals(ti.state, State.SUCCESS)
+            elif ti.task_id == 'branch_1':
+                self.assertEquals(ti.state, State.NONE)
+            else:
+                raise


[4/4] incubator-airflow git commit: Merge pull request #2195 from bolkedebruin/AIRFLOW-719

Posted by bo...@apache.org.
Merge pull request #2195 from bolkedebruin/AIRFLOW-719


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4a6bef69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4a6bef69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4a6bef69

Branch: refs/heads/master
Commit: 4a6bef69d1817a5fc3ddd6ffe14c2578eaa49cf0
Parents: f2dae7d 15fd4d9
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Apr 4 17:04:12 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Apr 4 17:04:12 2017 +0200

----------------------------------------------------------------------
 airflow/operators/latest_only_operator.py     |  30 +++-
 airflow/operators/python_operator.py          |  82 +++++++---
 airflow/ti_deps/deps/trigger_rule_dep.py      |   6 +-
 scripts/ci/requirements.txt                   |   1 +
 tests/dags/test_dagrun_short_circuit_false.py |  38 -----
 tests/models.py                               |  77 +++++-----
 tests/operators/__init__.py                   |   2 +
 tests/operators/latest_only_operator.py       |   2 +-
 tests/operators/python_operator.py            | 167 ++++++++++++++++++++-
 9 files changed, 301 insertions(+), 104 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-airflow git commit: Merge branch 'AIRFLOW-719' into AIRFLOW-719-3

Posted by bo...@apache.org.
Merge branch 'AIRFLOW-719' into AIRFLOW-719-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15fd4d98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15fd4d98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15fd4d98

Branch: refs/heads/master
Commit: 15fd4d98d141766f81552d270c8b5c43b15f4f44
Parents: f2dae7d eb705fd
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Apr 4 11:55:20 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Apr 4 11:55:20 2017 +0200

----------------------------------------------------------------------
 airflow/operators/latest_only_operator.py     |  30 +++-
 airflow/operators/python_operator.py          |  82 +++++++---
 airflow/ti_deps/deps/trigger_rule_dep.py      |   6 +-
 scripts/ci/requirements.txt                   |   1 +
 tests/dags/test_dagrun_short_circuit_false.py |  38 -----
 tests/models.py                               |  77 +++++-----
 tests/operators/__init__.py                   |   2 +
 tests/operators/latest_only_operator.py       |   2 +-
 tests/operators/python_operator.py            | 167 ++++++++++++++++++++-
 9 files changed, 301 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15fd4d98/tests/models.py
----------------------------------------------------------------------
diff --cc tests/models.py
index 43fccca,3e77894..a013f8a
--- a/tests/models.py
+++ b/tests/models.py
@@@ -223,59 -220,10 +220,43 @@@ class DagRunTest(unittest.TestCase)
      def test_id_for_date(self):
          run_id = models.DagRun.id_for_date(
              datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None))
 -        self.assertEqual('scheduled__2015-01-02T03:04:05', run_id,
 -                         msg='Generated run_id did not match expectations: {0}'
 -                         .format(run_id))
 +        self.assertEqual(
 +            'scheduled__2015-01-02T03:04:05', run_id,
 +            'Generated run_id did not match expectations: {0}'.format(run_id))
 +
 +    def test_dagrun_find(self):
 +        session = settings.Session()
 +        now = datetime.datetime.now()
 +
 +        dag_id1 = "test_dagrun_find_externally_triggered"
 +        dag_run = models.DagRun(
 +            dag_id=dag_id1,
 +            run_id='manual__' + now.isoformat(),
 +            execution_date=now,
 +            start_date=now,
 +            state=State.RUNNING,
 +            external_trigger=True,
 +        )
 +        session.add(dag_run)
 +
 +        dag_id2 = "test_dagrun_find_not_externally_triggered"
 +        dag_run = models.DagRun(
 +            dag_id=dag_id2,
 +            run_id='manual__' + now.isoformat(),
 +            execution_date=now,
 +            start_date=now,
 +            state=State.RUNNING,
 +            external_trigger=False,
 +        )
 +        session.add(dag_run)
 +
 +        session.commit()
 +
 +        self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id1, external_trigger=True)))
 +        self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id1, external_trigger=False)))
 +        self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id2, external_trigger=True)))
 +        self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id2, external_trigger=False)))
  
-     def test_dagrun_running_when_upstream_skipped(self):
-         """
-         Tests that a DAG run is not failed when an upstream task is skipped
-         """
-         initial_task_states = {
-             'test_short_circuit_false': State.SUCCESS,
-             'test_state_skipped1': State.SKIPPED,
-             'test_state_skipped2': State.NONE,
-         }
-         # dags/test_dagrun_short_circuit_false.py
-         dag_run = self.create_dag_run('test_dagrun_short_circuit_false',
-                                       state=State.RUNNING,
-                                       task_states=initial_task_states)
-         updated_dag_state = dag_run.update_state()
-         self.assertEqual(State.RUNNING, updated_dag_state)
- 
      def test_dagrun_success_when_all_skipped(self):
          """
          Tests that a DAG run succeeds when all tasks are skipped