You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/09 08:42:08 UTC

[GitHub] XD-DENG closed pull request #3688: [AIRFLOW-2843] ExternalTaskSensor-check if external task exists

XD-DENG closed pull request #3688: [AIRFLOW-2843] ExternalTaskSensor-check if external task exists
URL: https://github.com/apache/incubator-airflow/pull/3688
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py
index eda1a2d9d2..194472fcdf 100644
--- a/airflow/sensors/external_task_sensor.py
+++ b/airflow/sensors/external_task_sensor.py
@@ -22,6 +22,7 @@
 from airflow.utils.db import provide_session
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.state import State
+from airflow.exceptions import AirflowException
 
 
 class ExternalTaskSensor(BaseSensorOperator):
@@ -46,6 +47,10 @@ class ExternalTaskSensor(BaseSensorOperator):
         and returns the desired execution dates to query. Either execution_delta
         or execution_date_fn can be passed to ExternalTaskSensor, but not both.
     :type execution_date_fn: callable
+    :param check_existence: Default value is False. Set to True to check if the
+        external task exists, and immediately cease waiting if the external task
+        does not exist.
+    :type check_existence: bool
     """
     template_fields = ['external_dag_id', 'external_task_id']
     ui_color = '#19647e'
@@ -57,6 +62,7 @@ def __init__(self,
                  allowed_states=None,
                  execution_delta=None,
                  execution_date_fn=None,
+                 check_existence=False,
                  *args,
                  **kwargs):
         super(ExternalTaskSensor, self).__init__(*args, **kwargs)
@@ -70,9 +76,24 @@ def __init__(self,
         self.execution_date_fn = execution_date_fn
         self.external_dag_id = external_dag_id
         self.external_task_id = external_task_id
+        self.check_existence = check_existence
 
     @provide_session
     def poke(self, context, session=None):
+        TI = TaskInstance
+
+        if self.check_existence:
+            existence = session.query(TI).filter(
+                TI.dag_id == self.external_dag_id,
+                TI.task_id == self.external_task_id,
+            ).count()
+            session.commit()
+            if existence == 0:
+                raise AirflowException('The external task "' +
+                                       self.external_dag_id + " - " +
+                                       self.external_task_id +
+                                       '" does not exist.')
+
         if self.execution_delta:
             dttm = context['execution_date'] - self.execution_delta
         elif self.execution_date_fn:
@@ -89,7 +110,7 @@ def poke(self, context, session=None):
             '{self.external_dag_id}.'
             '{self.external_task_id} on '
             '{} ... '.format(serialized_dttm_filter, **locals()))
-        TI = TaskInstance
+
 
         count = session.query(TI).filter(
             TI.dag_id == self.external_dag_id,
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 403c892c23..71906dfef3 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -22,7 +22,7 @@
 
 from airflow import DAG, configuration, settings
 from airflow import exceptions
-from airflow.exceptions import AirflowSensorTimeout
+from airflow.exceptions import AirflowSensorTimeout, AirflowException
 from airflow.models import TaskInstance, DagBag
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dummy_operator import DummyOperator
@@ -247,3 +247,20 @@ def test_external_task_sensor_error_delta_and_fn(self):
                 allowed_states=['success'],
                 dag=self.dag
             )
+
+    def test_external_task_sensor_check_existence(self):
+        self.test_time_sensor()
+        t = ExternalTaskSensor(
+            task_id='test_external_task_sensor_check',
+            external_dag_id=TEST_DAG_ID,
+            external_task_id=TEST_TASK_ID + "-non-existing",
+            check_existence=True,
+            dag=self.dag
+        )
+
+        with self.assertRaises(AirflowException):
+            t.run(
+                start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE,
+                ignore_ti_state=True
+            )


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services