You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/05/07 13:24:02 UTC

[airflow] branch master updated: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed (#8509)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e4f5fa  [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed (#8509)
6e4f5fa is described below

commit 6e4f5fa66ebe2d8252829c67e79f895fa5029b5a
Author: Lokesh Lal <lo...@gmail.com>
AuthorDate: Thu May 7 18:53:28 2020 +0530

    [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed (#8509)
    
    Added failed_states for ExternalTaskSensor to avoid waiting for the failure scenarios till timeout value
---
 .../example_external_task_marker_dag.py            | 14 +++++
 airflow/sensors/external_task_sensor.py            | 68 +++++++++++++++++-----
 docs/howto/operator/external_task_sensor.rst       |  5 ++
 tests/sensors/test_external_task_sensor.py         | 58 +++++++++++++++++-
 4 files changed, 130 insertions(+), 15 deletions(-)

diff --git a/airflow/example_dags/example_external_task_marker_dag.py b/airflow/example_dags/example_external_task_marker_dag.py
index ac1085a..8fe6bb8 100644
--- a/airflow/example_dags/example_external_task_marker_dag.py
+++ b/airflow/example_dags/example_external_task_marker_dag.py
@@ -24,6 +24,17 @@ In this example, child_task1 in example_external_task_marker_child depends on pa
 example_external_task_marker_parent. When parent_task is cleared with "Recursive" selected,
 the presence of ExternalTaskMarker tells Airflow to clear child_task1 and its
 downstream tasks.
+
+ExternalTaskSensor will keep poking for the status of remote ExternalTaskMarker task at a regular
+interval till one of the following will happen:
+1. ExternalTaskMarker reaches the states mentioned in the allowed_states list
+    In this case, ExternalTaskSensor will exit with a succes status code
+2. ExternalTaskMarker reaches the states mentioned in the failed_states list
+    In this case, ExternalTaskSensor will raise an AirflowException and user need to handle this
+    with multiple downstream tasks
+3. ExternalTaskSensor times out
+    In this case, ExternalTaskSensor will raise AirflowSkipException or AirflowSensorTimeout
+    exception
 """
 
 import datetime
@@ -56,6 +67,9 @@ with DAG(
     child_task1 = ExternalTaskSensor(task_id="child_task1",
                                      external_dag_id=parent_dag.dag_id,
                                      external_task_id=parent_task.task_id,
+                                     timeout=600,
+                                     allowed_states=['success'],
+                                     failed_states=['failed', 'skipped'],
                                      mode="reschedule")
     # [END howto_operator_external_task_sensor]
     child_task2 = DummyOperator(task_id="child_task2")
diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py
index b088015..3d4d673 100644
--- a/airflow/sensors/external_task_sensor.py
+++ b/airflow/sensors/external_task_sensor.py
@@ -44,6 +44,8 @@ class ExternalTaskSensor(BaseSensorOperator):
     :type external_task_id: str or None
     :param allowed_states: list of allowed states, default is ``['success']``
     :type allowed_states: list
+    :param failed_states: list of failed or dis-allowed states, default is ``None``
+    :type failed_states: list
     :param execution_delta: time difference with the previous execution to
         look at, the default is the same execution_date as the current task or DAG.
         For yesterday, use [positive!] datetime.timedelta(days=1). Either
@@ -68,6 +70,7 @@ class ExternalTaskSensor(BaseSensorOperator):
                  external_dag_id,
                  external_task_id=None,
                  allowed_states=None,
+                 failed_states=None,
                  execution_delta=None,
                  execution_date_fn=None,
                  check_existence=False,
@@ -75,16 +78,26 @@ class ExternalTaskSensor(BaseSensorOperator):
                  **kwargs):
         super().__init__(*args, **kwargs)
         self.allowed_states = allowed_states or [State.SUCCESS]
+        self.failed_states = failed_states or []
+
+        total_states = self.allowed_states + self.failed_states
+        total_states = set(total_states)
+
+        if set(self.failed_states).intersection(set(self.allowed_states)):
+            raise AirflowException("Duplicate values provided as allowed "
+                                   "`{}` and failed states `{}`"
+                                   .format(self.allowed_states, self.failed_states))
+
         if external_task_id:
-            if not set(self.allowed_states) <= set(State.task_states):
+            if not total_states <= set(State.task_states):
                 raise ValueError(
-                    'Valid values for `allowed_states` '
+                    'Valid values for `allowed_states` and `failed_states` '
                     'when `external_task_id` is not `None`: {}'.format(State.task_states)
                 )
         else:
-            if not set(self.allowed_states) <= set(State.dag_states):
+            if not total_states <= set(State.dag_states):
                 raise ValueError(
-                    'Valid values for `allowed_states` '
+                    'Valid values for `allowed_states` and `failed_states` '
                     'when `external_task_id` is `None`: {}'.format(State.dag_states)
                 )
 
@@ -120,9 +133,6 @@ class ExternalTaskSensor(BaseSensorOperator):
         )
 
         DM = DagModel
-        TI = TaskInstance
-        DR = DagRun
-
         # we only do the check for 1st time, no need for subsequent poke
         if self.check_existence and not self.has_checked_existence:
             dag_to_wait = session.query(DM).filter(
@@ -141,28 +151,58 @@ class ExternalTaskSensor(BaseSensorOperator):
                 refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
                 if not refreshed_dag_info.has_task(self.external_task_id):
                     raise AirflowException('The external task'
-                                           '{} in DAG {} does not exist.'.format(self.external_task_id,
-                                                                                 self.external_dag_id))
+                                           '{} in DAG {} does not exist.'
+                                           .format(self.external_task_id,
+                                                   self.external_dag_id))
             self.has_checked_existence = True
 
+        count_allowed = self.get_count(dttm_filter, session, self.allowed_states)
+
+        count_failed = -1
+        if len(self.failed_states) > 0:
+            count_failed = self.get_count(dttm_filter, session, self.failed_states)
+
+        session.commit()
+        if count_failed == len(dttm_filter):
+            if self.external_task_id:
+                raise AirflowException('The external task {} in DAG {} failed.'
+                                       .format(self.external_task_id, self.external_dag_id))
+            else:
+                raise AirflowException('The external DAG {} failed.'
+                                       .format(self.external_dag_id))
+
+        return count_allowed == len(dttm_filter)
+
+    def get_count(self, dttm_filter, session, states):
+        """
+        get the count of records against dttm filter and states
+        :param dttm_filter: date time filter for execution date
+        :type dttm_filter: list
+        :param session: airflow session object
+        :type session: SASession
+        :param states: task or dag states
+        :type states: list
+        :return: count of record against the filters
+        """
+        TI = TaskInstance
+        DR = DagRun
+
         if self.external_task_id:
             # .count() is inefficient
             count = session.query(func.count()).filter(
                 TI.dag_id == self.external_dag_id,
                 TI.task_id == self.external_task_id,
-                TI.state.in_(self.allowed_states),
+                TI.state.in_(states),  # pylint: disable=no-member
                 TI.execution_date.in_(dttm_filter),
             ).scalar()
         else:
             # .count() is inefficient
             count = session.query(func.count()).filter(
                 DR.dag_id == self.external_dag_id,
-                DR.state.in_(self.allowed_states),  # pylint: disable=no-member
+                DR.state.in_(states),  # pylint: disable=no-member
                 DR.execution_date.in_(dttm_filter),
             ).scalar()
-
-        session.commit()
-        return count == len(dttm_filter)
+        return count
 
 
 class ExternalTaskMarker(DummyOperator):
diff --git a/docs/howto/operator/external_task_sensor.rst b/docs/howto/operator/external_task_sensor.rst
index be1ce81..ef68a3d 100644
--- a/docs/howto/operator/external_task_sensor.rst
+++ b/docs/howto/operator/external_task_sensor.rst
@@ -41,11 +41,16 @@ ExternalTaskSensor
 Use the :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor` to make tasks on a DAG
 wait for another task on a different DAG for a specific ``execution_date``.
 
+ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed
+via ``allowed_states`` and ``failed_states`` parameters.
+
 .. exampleinclude:: ../../../airflow/example_dags/example_external_task_marker_dag.py
     :language: python
     :start-after: [START howto_operator_external_task_sensor]
     :end-before: [END howto_operator_external_task_sensor]
 
+
+
 ExternalTaskMarker
 ^^^^^^^^^^^^^^^^^^
 If it is desirable that whenever ``parent_task`` on ``parent_dag`` is cleared, ``child_task1``
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 1054a91..0386a77 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -72,8 +72,64 @@ class TestExternalTaskSensor(unittest.TestCase):
             ignore_ti_state=True
         )
 
-    def test_external_dag_sensor(self):
+    def test_catch_overlap_allowed_failed_state(self):
+        with self.assertRaises(AirflowException):
+            ExternalTaskSensor(
+                task_id='test_external_task_sensor_check',
+                external_dag_id=TEST_DAG_ID,
+                external_task_id=TEST_TASK_ID,
+                allowed_states=[State.SUCCESS],
+                failed_states=[State.SUCCESS],
+                dag=self.dag
+            )
+
+    def test_external_task_sensor_wrong_failed_states(self):
+        with self.assertRaises(ValueError):
+            ExternalTaskSensor(
+                task_id='test_external_task_sensor_check',
+                external_dag_id=TEST_DAG_ID,
+                external_task_id=TEST_TASK_ID,
+                failed_states=["invalid_state"],
+                dag=self.dag
+            )
 
+    def test_external_task_sensor_failed_states(self):
+        self.test_time_sensor()
+        op = ExternalTaskSensor(
+            task_id='test_external_task_sensor_check',
+            external_dag_id=TEST_DAG_ID,
+            external_task_id=TEST_TASK_ID,
+            failed_states=["failed"],
+            dag=self.dag
+        )
+        op.run(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            ignore_ti_state=True
+        )
+
+    def test_external_task_sensor_failed_states_as_success(self):
+        self.test_time_sensor()
+        op = ExternalTaskSensor(
+            task_id='test_external_task_sensor_check',
+            external_dag_id=TEST_DAG_ID,
+            external_task_id=TEST_TASK_ID,
+            allowed_states=["failed"],
+            failed_states=["success"],
+            dag=self.dag
+        )
+        with self.assertRaises(AirflowException) as cm:
+            op.run(
+                start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE,
+                ignore_ti_state=True
+            )
+        self.assertEqual(str(cm.exception),
+                         "The external task "
+                         "time_sensor_check in DAG "
+                         "unit_test_dag failed.")
+
+    def test_external_dag_sensor(self):
         other_dag = DAG(
             'other_dag',
             default_args=self.args,