You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/05/07 13:24:02 UTC
[airflow] branch master updated: [AIRFLOW-4568]The
ExternalTaskSensor should be configurable to raise an Airflow Exception in
case the poked external task reaches a disallowed state,
such as f.i. failed (#8509)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 6e4f5fa [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed (#8509)
6e4f5fa is described below
commit 6e4f5fa66ebe2d8252829c67e79f895fa5029b5a
Author: Lokesh Lal <lo...@gmail.com>
AuthorDate: Thu May 7 18:53:28 2020 +0530
[AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed (#8509)
Added failed_states for ExternalTaskSensor to avoid waiting for the failure scenarios till timeout value
---
.../example_external_task_marker_dag.py | 14 +++++
airflow/sensors/external_task_sensor.py | 68 +++++++++++++++++-----
docs/howto/operator/external_task_sensor.rst | 5 ++
tests/sensors/test_external_task_sensor.py | 58 +++++++++++++++++-
4 files changed, 130 insertions(+), 15 deletions(-)
diff --git a/airflow/example_dags/example_external_task_marker_dag.py b/airflow/example_dags/example_external_task_marker_dag.py
index ac1085a..8fe6bb8 100644
--- a/airflow/example_dags/example_external_task_marker_dag.py
+++ b/airflow/example_dags/example_external_task_marker_dag.py
@@ -24,6 +24,17 @@ In this example, child_task1 in example_external_task_marker_child depends on pa
example_external_task_marker_parent. When parent_task is cleared with "Recursive" selected,
the presence of ExternalTaskMarker tells Airflow to clear child_task1 and its
downstream tasks.
+
+ExternalTaskSensor will keep poking for the status of remote ExternalTaskMarker task at a regular
+interval till one of the following will happen:
+1. ExternalTaskMarker reaches the states mentioned in the allowed_states list
+ In this case, ExternalTaskSensor will exit with a succes status code
+2. ExternalTaskMarker reaches the states mentioned in the failed_states list
+ In this case, ExternalTaskSensor will raise an AirflowException and user need to handle this
+ with multiple downstream tasks
+3. ExternalTaskSensor times out
+ In this case, ExternalTaskSensor will raise AirflowSkipException or AirflowSensorTimeout
+ exception
"""
import datetime
@@ -56,6 +67,9 @@ with DAG(
child_task1 = ExternalTaskSensor(task_id="child_task1",
external_dag_id=parent_dag.dag_id,
external_task_id=parent_task.task_id,
+ timeout=600,
+ allowed_states=['success'],
+ failed_states=['failed', 'skipped'],
mode="reschedule")
# [END howto_operator_external_task_sensor]
child_task2 = DummyOperator(task_id="child_task2")
diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py
index b088015..3d4d673 100644
--- a/airflow/sensors/external_task_sensor.py
+++ b/airflow/sensors/external_task_sensor.py
@@ -44,6 +44,8 @@ class ExternalTaskSensor(BaseSensorOperator):
:type external_task_id: str or None
:param allowed_states: list of allowed states, default is ``['success']``
:type allowed_states: list
+ :param failed_states: list of failed or dis-allowed states, default is ``None``
+ :type failed_states: list
:param execution_delta: time difference with the previous execution to
look at, the default is the same execution_date as the current task or DAG.
For yesterday, use [positive!] datetime.timedelta(days=1). Either
@@ -68,6 +70,7 @@ class ExternalTaskSensor(BaseSensorOperator):
external_dag_id,
external_task_id=None,
allowed_states=None,
+ failed_states=None,
execution_delta=None,
execution_date_fn=None,
check_existence=False,
@@ -75,16 +78,26 @@ class ExternalTaskSensor(BaseSensorOperator):
**kwargs):
super().__init__(*args, **kwargs)
self.allowed_states = allowed_states or [State.SUCCESS]
+ self.failed_states = failed_states or []
+
+ total_states = self.allowed_states + self.failed_states
+ total_states = set(total_states)
+
+ if set(self.failed_states).intersection(set(self.allowed_states)):
+ raise AirflowException("Duplicate values provided as allowed "
+ "`{}` and failed states `{}`"
+ .format(self.allowed_states, self.failed_states))
+
if external_task_id:
- if not set(self.allowed_states) <= set(State.task_states):
+ if not total_states <= set(State.task_states):
raise ValueError(
- 'Valid values for `allowed_states` '
+ 'Valid values for `allowed_states` and `failed_states` '
'when `external_task_id` is not `None`: {}'.format(State.task_states)
)
else:
- if not set(self.allowed_states) <= set(State.dag_states):
+ if not total_states <= set(State.dag_states):
raise ValueError(
- 'Valid values for `allowed_states` '
+ 'Valid values for `allowed_states` and `failed_states` '
'when `external_task_id` is `None`: {}'.format(State.dag_states)
)
@@ -120,9 +133,6 @@ class ExternalTaskSensor(BaseSensorOperator):
)
DM = DagModel
- TI = TaskInstance
- DR = DagRun
-
# we only do the check for 1st time, no need for subsequent poke
if self.check_existence and not self.has_checked_existence:
dag_to_wait = session.query(DM).filter(
@@ -141,28 +151,58 @@ class ExternalTaskSensor(BaseSensorOperator):
refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
if not refreshed_dag_info.has_task(self.external_task_id):
raise AirflowException('The external task'
- '{} in DAG {} does not exist.'.format(self.external_task_id,
- self.external_dag_id))
+ '{} in DAG {} does not exist.'
+ .format(self.external_task_id,
+ self.external_dag_id))
self.has_checked_existence = True
+ count_allowed = self.get_count(dttm_filter, session, self.allowed_states)
+
+ count_failed = -1
+ if len(self.failed_states) > 0:
+ count_failed = self.get_count(dttm_filter, session, self.failed_states)
+
+ session.commit()
+ if count_failed == len(dttm_filter):
+ if self.external_task_id:
+ raise AirflowException('The external task {} in DAG {} failed.'
+ .format(self.external_task_id, self.external_dag_id))
+ else:
+ raise AirflowException('The external DAG {} failed.'
+ .format(self.external_dag_id))
+
+ return count_allowed == len(dttm_filter)
+
+ def get_count(self, dttm_filter, session, states):
+ """
+ get the count of records against dttm filter and states
+ :param dttm_filter: date time filter for execution date
+ :type dttm_filter: list
+ :param session: airflow session object
+ :type session: SASession
+ :param states: task or dag states
+ :type states: list
+ :return: count of record against the filters
+ """
+ TI = TaskInstance
+ DR = DagRun
+
if self.external_task_id:
# .count() is inefficient
count = session.query(func.count()).filter(
TI.dag_id == self.external_dag_id,
TI.task_id == self.external_task_id,
- TI.state.in_(self.allowed_states),
+ TI.state.in_(states), # pylint: disable=no-member
TI.execution_date.in_(dttm_filter),
).scalar()
else:
# .count() is inefficient
count = session.query(func.count()).filter(
DR.dag_id == self.external_dag_id,
- DR.state.in_(self.allowed_states), # pylint: disable=no-member
+ DR.state.in_(states), # pylint: disable=no-member
DR.execution_date.in_(dttm_filter),
).scalar()
-
- session.commit()
- return count == len(dttm_filter)
+ return count
class ExternalTaskMarker(DummyOperator):
diff --git a/docs/howto/operator/external_task_sensor.rst b/docs/howto/operator/external_task_sensor.rst
index be1ce81..ef68a3d 100644
--- a/docs/howto/operator/external_task_sensor.rst
+++ b/docs/howto/operator/external_task_sensor.rst
@@ -41,11 +41,16 @@ ExternalTaskSensor
Use the :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor` to make tasks on a DAG
wait for another task on a different DAG for a specific ``execution_date``.
+ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed
+via ``allowed_states`` and ``failed_states`` parameters.
+
.. exampleinclude:: ../../../airflow/example_dags/example_external_task_marker_dag.py
:language: python
:start-after: [START howto_operator_external_task_sensor]
:end-before: [END howto_operator_external_task_sensor]
+
+
ExternalTaskMarker
^^^^^^^^^^^^^^^^^^
If it is desirable that whenever ``parent_task`` on ``parent_dag`` is cleared, ``child_task1``
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 1054a91..0386a77 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -72,8 +72,64 @@ class TestExternalTaskSensor(unittest.TestCase):
ignore_ti_state=True
)
- def test_external_dag_sensor(self):
+ def test_catch_overlap_allowed_failed_state(self):
+ with self.assertRaises(AirflowException):
+ ExternalTaskSensor(
+ task_id='test_external_task_sensor_check',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id=TEST_TASK_ID,
+ allowed_states=[State.SUCCESS],
+ failed_states=[State.SUCCESS],
+ dag=self.dag
+ )
+
+ def test_external_task_sensor_wrong_failed_states(self):
+ with self.assertRaises(ValueError):
+ ExternalTaskSensor(
+ task_id='test_external_task_sensor_check',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id=TEST_TASK_ID,
+ failed_states=["invalid_state"],
+ dag=self.dag
+ )
+ def test_external_task_sensor_failed_states(self):
+ self.test_time_sensor()
+ op = ExternalTaskSensor(
+ task_id='test_external_task_sensor_check',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id=TEST_TASK_ID,
+ failed_states=["failed"],
+ dag=self.dag
+ )
+ op.run(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ ignore_ti_state=True
+ )
+
+ def test_external_task_sensor_failed_states_as_success(self):
+ self.test_time_sensor()
+ op = ExternalTaskSensor(
+ task_id='test_external_task_sensor_check',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id=TEST_TASK_ID,
+ allowed_states=["failed"],
+ failed_states=["success"],
+ dag=self.dag
+ )
+ with self.assertRaises(AirflowException) as cm:
+ op.run(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ ignore_ti_state=True
+ )
+ self.assertEqual(str(cm.exception),
+ "The external task "
+ "time_sensor_check in DAG "
+ "unit_test_dag failed.")
+
+ def test_external_dag_sensor(self):
other_dag = DAG(
'other_dag',
default_args=self.args,