You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2016/06/30 23:42:48 UTC

[2/4] incubator-airflow git commit: [AIRFLOW-100] Add execution_date_fn to ExternalTaskSensor

[AIRFLOW-100] Add execution_date_fn to ExternalTaskSensor

Currently, ExternalTaskSensor only supports querying execution_dates
that are either the same as the ExternalTaskSensor's execution_date
or a fixed interval from that date (using `execution_delta`). This
adds the ability to provide a fn (`execution_date_fn`) that accepts
the current execution_date and can return any desired date for
querying. This is much more flexible. For example, it could
supply the last date of the previous month.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/efdbbb5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/efdbbb5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/efdbbb5d

Branch: refs/heads/master
Commit: efdbbb5d3beba49f9b633f0a25ce768f896c0a6a
Parents: 0965648
Author: jlowin <jl...@users.noreply.github.com>
Authored: Thu Jun 30 16:55:16 2016 -0400
Committer: jlowin <jl...@users.noreply.github.com>
Committed: Thu Jun 30 18:34:16 2016 -0400

----------------------------------------------------------------------
 airflow/operators/sensors.py | 17 ++++++++++++++++-
 airflow/utils/tests.py       |  5 ++++-
 tests/core.py                | 39 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index e9b8885..9f7f380 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -179,8 +179,14 @@ class ExternalTaskSensor(BaseSensorOperator):
     :type allowed_states: list
     :param execution_delta: time difference with the previous execution to
         look at, the default is the same execution_date as the current task.
-        For yesterday, use [positive!] datetime.timedelta(days=1)
+        For yesterday, use [positive!] datetime.timedelta(days=1). Either
+        execution_delta or execution_date_fn can be passed to
+        ExternalTaskSensor, but not both.
     :type execution_delta: datetime.timedelta
+    :param execution_date_fn: function that receives the current execution date
+        and returns the desired execution date to query. Either execution_delta
+        or execution_date_fn can be passed to ExternalTaskSensor, but not both.
+    :type execution_date_fn: callable
     """
 
     @apply_defaults
@@ -190,16 +196,25 @@ class ExternalTaskSensor(BaseSensorOperator):
             external_task_id,
             allowed_states=None,
             execution_delta=None,
+            execution_date_fn=None,
             *args, **kwargs):
         super(ExternalTaskSensor, self).__init__(*args, **kwargs)
         self.allowed_states = allowed_states or [State.SUCCESS]
+        if execution_delta is not None and execution_date_fn is not None:
+            raise ValueError(
+                'Only one of `execution_date` or `execution_date_fn` may'
+                'be provided to ExternalTaskSensor; not both.')
+
         self.execution_delta = execution_delta
+        self.execution_date_fn = execution_date_fn
         self.external_dag_id = external_dag_id
         self.external_task_id = external_task_id
 
     def poke(self, context):
         if self.execution_delta:
             dttm = context['execution_date'] - self.execution_delta
+        elif self.execution_date_fn:
+            dttm = self.execution_date_fn(context['execution_date'])
         else:
             dttm = context['execution_date']
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/airflow/utils/tests.py
----------------------------------------------------------------------
diff --git a/airflow/utils/tests.py b/airflow/utils/tests.py
index 50490d3..83db6e8 100644
--- a/airflow/utils/tests.py
+++ b/airflow/utils/tests.py
@@ -16,7 +16,10 @@ import unittest
 
 def skipUnlessImported(module, obj):
     import importlib
-    m = importlib.import_module(module)
+    try:
+        m = importlib.import_module(module)
+    except ImportError:
+        m = None
     return unittest.skipUnless(
         obj in dir(m),
         "Skipping test because {} could not be imported from {}".format(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 24d2938..4f3197d 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -437,6 +437,45 @@ class CoreTest(unittest.TestCase):
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
+    def test_external_task_sensor_fn(self):
+        self.test_time_sensor()
+        # check that the execution_fn works
+        t = sensors.ExternalTaskSensor(
+            task_id='test_external_task_sensor_check_delta',
+            external_dag_id=TEST_DAG_ID,
+            external_task_id='time_sensor_check',
+            execution_date_fn=lambda dt: dt + timedelta(0),
+            allowed_states=['success'],
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        # double check that the execution is being called by failing the test
+        t2 = sensors.ExternalTaskSensor(
+            task_id='test_external_task_sensor_check_delta',
+            external_dag_id=TEST_DAG_ID,
+            external_task_id='time_sensor_check',
+            execution_date_fn=lambda dt: dt + timedelta(days=1),
+            allowed_states=['success'],
+            timeout=1,
+            poke_interval=1,
+            dag=self.dag)
+        with self.assertRaises(exceptions.AirflowSensorTimeout):
+            t2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+    def test_external_task_sensor_error_delta_and_fn(self):
+        """
+        Test that providing execution_delta and a function raises an error
+        """
+        with self.assertRaises(ValueError):
+            t = sensors.ExternalTaskSensor(
+                task_id='test_external_task_sensor_check_delta',
+                external_dag_id=TEST_DAG_ID,
+                external_task_id='time_sensor_check',
+                execution_delta=timedelta(0),
+                execution_date_fn=lambda dt: dt,
+                allowed_states=['success'],
+                dag=self.dag)
+
     def test_timeout(self):
         t = PythonOperator(
             task_id='test_timeout',