You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/06/29 13:21:40 UTC
[airflow] 34/37: Add context to execution_date_fn in
ExternalTaskSensor (#8702)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 595ae11246f9a585d326746bbfb3c62107a68367
Author: Ace Haidrey <ah...@pandora.com>
AuthorDate: Fri May 22 07:02:34 2020 -0700
Add context to execution_date_fn in ExternalTaskSensor (#8702)
Co-authored-by: Ace Haidrey <ah...@pinterest.com>
(cherry picked from commit b055151520843785635a8a00b259c51fa4ec76ac)
---
airflow/sensors/external_task_sensor.py | 22 +++++++++++++++++++++-
tests/sensors/test_external_task_sensor.py | 22 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 1 deletion(-)
diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py
index 1b93528..2dc0875 100644
--- a/airflow/sensors/external_task_sensor.py
+++ b/airflow/sensors/external_task_sensor.py
@@ -104,7 +104,7 @@ class ExternalTaskSensor(BaseSensorOperator):
if self.execution_delta:
dttm = context['execution_date'] - self.execution_delta
elif self.execution_date_fn:
- dttm = self.execution_date_fn(context['execution_date'])
+ dttm = self._handle_execution_date_fn(context=context)
else:
dttm = context['execution_date']
@@ -159,6 +159,26 @@ class ExternalTaskSensor(BaseSensorOperator):
session.commit()
return count == len(dttm_filter)
+ def _handle_execution_date_fn(self, context):
+ """
+ This function is to handle backwards compatibility with how this operator was
+ previously where it only passes the execution date, but also allow for the newer
+ implementation to pass all context through as well, to allow for more sophisticated
+ returns of dates to return.
+ Namely, this function check the number of arguments in the execution_date_fn
+ signature and if its 1, treat the legacy way, if it's 2, pass the context as
+ the 2nd argument, and if its more, throw an exception.
+ """
+ num_fxn_params = self.execution_date_fn.__code__.co_argcount
+ if num_fxn_params == 1:
+ return self.execution_date_fn(context['execution_date'])
+ elif num_fxn_params == 2:
+ return self.execution_date_fn(context['execution_date'], context)
+ else:
+ raise AirflowException(
+ 'execution_date_fn passed {} args but only allowed up to 2'.format(num_fxn_params)
+ )
+
class ExternalTaskMarker(DummyOperator):
"""
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 25ef8b7..0e5e960 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -251,6 +251,28 @@ exit 0
ignore_ti_state=True
)
+ def test_external_task_sensor_fn_multiple_args(self):
+ """Check this task sensor passes multiple args with full context. If no failure, means clean run."""
+ self.test_time_sensor()
+
+ def my_func(dt, context):
+ assert context['execution_date'] == dt
+ return dt + timedelta(0)
+
+ op1 = ExternalTaskSensor(
+ task_id='test_external_task_sensor_multiple_arg_fn',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id=TEST_TASK_ID,
+ execution_date_fn=my_func,
+ allowed_states=['success'],
+ dag=self.dag
+ )
+ op1.run(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ ignore_ti_state=True
+ )
+
def test_external_task_sensor_error_delta_and_fn(self):
self.test_time_sensor()
# Test that providing execution_delta and a function raises an error