You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2016/06/30 23:42:48 UTC
[2/4] incubator-airflow git commit: [AIRFLOW-100] Add
execution_date_fn to ExternalTaskSensor
[AIRFLOW-100] Add execution_date_fn to ExternalTaskSensor
Currently, ExternalTaskSensor only supports querying execution_dates
that are either the same as the ExternalTaskSensor's execution_date
or a fixed interval from that date (using `execution_delta`). This
adds the ability to provide a fn (`execution_date_fn`) that accepts
the current execution_date and can return any desired date for
querying. This is much more flexible. For example, it could
supply the last date of the previous month.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/efdbbb5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/efdbbb5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/efdbbb5d
Branch: refs/heads/master
Commit: efdbbb5d3beba49f9b633f0a25ce768f896c0a6a
Parents: 0965648
Author: jlowin <jl...@users.noreply.github.com>
Authored: Thu Jun 30 16:55:16 2016 -0400
Committer: jlowin <jl...@users.noreply.github.com>
Committed: Thu Jun 30 18:34:16 2016 -0400
----------------------------------------------------------------------
airflow/operators/sensors.py | 17 ++++++++++++++++-
airflow/utils/tests.py | 5 ++++-
tests/core.py | 39 +++++++++++++++++++++++++++++++++++++++
3 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index e9b8885..9f7f380 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -179,8 +179,14 @@ class ExternalTaskSensor(BaseSensorOperator):
:type allowed_states: list
:param execution_delta: time difference with the previous execution to
look at, the default is the same execution_date as the current task.
- For yesterday, use [positive!] datetime.timedelta(days=1)
+ For yesterday, use [positive!] datetime.timedelta(days=1). Either
+ execution_delta or execution_date_fn can be passed to
+ ExternalTaskSensor, but not both.
:type execution_delta: datetime.timedelta
+ :param execution_date_fn: function that receives the current execution date
+ and returns the desired execution date to query. Either execution_delta
+ or execution_date_fn can be passed to ExternalTaskSensor, but not both.
+ :type execution_date_fn: callable
"""
@apply_defaults
@@ -190,16 +196,25 @@ class ExternalTaskSensor(BaseSensorOperator):
external_task_id,
allowed_states=None,
execution_delta=None,
+ execution_date_fn=None,
*args, **kwargs):
super(ExternalTaskSensor, self).__init__(*args, **kwargs)
self.allowed_states = allowed_states or [State.SUCCESS]
+ if execution_delta is not None and execution_date_fn is not None:
+ raise ValueError(
+ 'Only one of `execution_date` or `execution_date_fn` may'
+ 'be provided to ExternalTaskSensor; not both.')
+
self.execution_delta = execution_delta
+ self.execution_date_fn = execution_date_fn
self.external_dag_id = external_dag_id
self.external_task_id = external_task_id
def poke(self, context):
if self.execution_delta:
dttm = context['execution_date'] - self.execution_delta
+ elif self.execution_date_fn:
+ dttm = self.execution_date_fn(context['execution_date'])
else:
dttm = context['execution_date']
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/airflow/utils/tests.py
----------------------------------------------------------------------
diff --git a/airflow/utils/tests.py b/airflow/utils/tests.py
index 50490d3..83db6e8 100644
--- a/airflow/utils/tests.py
+++ b/airflow/utils/tests.py
@@ -16,7 +16,10 @@ import unittest
def skipUnlessImported(module, obj):
import importlib
- m = importlib.import_module(module)
+ try:
+ m = importlib.import_module(module)
+ except ImportError:
+ m = None
return unittest.skipUnless(
obj in dir(m),
"Skipping test because {} could not be imported from {}".format(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 24d2938..4f3197d 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -437,6 +437,45 @@ class CoreTest(unittest.TestCase):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+ def test_external_task_sensor_fn(self):
+ self.test_time_sensor()
+ # check that the execution_fn works
+ t = sensors.ExternalTaskSensor(
+ task_id='test_external_task_sensor_check_delta',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id='time_sensor_check',
+ execution_date_fn=lambda dt: dt + timedelta(0),
+ allowed_states=['success'],
+ dag=self.dag)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+ # double check that the execution is being called by failing the test
+ t2 = sensors.ExternalTaskSensor(
+ task_id='test_external_task_sensor_check_delta',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id='time_sensor_check',
+ execution_date_fn=lambda dt: dt + timedelta(days=1),
+ allowed_states=['success'],
+ timeout=1,
+ poke_interval=1,
+ dag=self.dag)
+ with self.assertRaises(exceptions.AirflowSensorTimeout):
+ t2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+ def test_external_task_sensor_error_delta_and_fn(self):
+ """
+ Test that providing execution_delta and a function raises an error
+ """
+ with self.assertRaises(ValueError):
+ t = sensors.ExternalTaskSensor(
+ task_id='test_external_task_sensor_check_delta',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id='time_sensor_check',
+ execution_delta=timedelta(0),
+ execution_date_fn=lambda dt: dt,
+ allowed_states=['success'],
+ dag=self.dag)
+
def test_timeout(self):
t = PythonOperator(
task_id='test_timeout',