You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/09 08:42:08 UTC
[GitHub] XD-DENG closed pull request #3688: [AIRFLOW-2843]
ExternalTaskSensor-check if external task exists
XD-DENG closed pull request #3688: [AIRFLOW-2843] ExternalTaskSensor-check if external task exists
URL: https://github.com/apache/incubator-airflow/pull/3688
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py
index eda1a2d9d2..194472fcdf 100644
--- a/airflow/sensors/external_task_sensor.py
+++ b/airflow/sensors/external_task_sensor.py
@@ -22,6 +22,7 @@
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
+from airflow.exceptions import AirflowException
class ExternalTaskSensor(BaseSensorOperator):
@@ -46,6 +47,10 @@ class ExternalTaskSensor(BaseSensorOperator):
and returns the desired execution dates to query. Either execution_delta
or execution_date_fn can be passed to ExternalTaskSensor, but not both.
:type execution_date_fn: callable
+ :param check_existence: Default value is False. Set to True to check if the
+ external task exists, and immediately cease waiting if the external task
+ does not exist.
+ :type check_existence: bool
"""
template_fields = ['external_dag_id', 'external_task_id']
ui_color = '#19647e'
@@ -57,6 +62,7 @@ def __init__(self,
allowed_states=None,
execution_delta=None,
execution_date_fn=None,
+ check_existence=False,
*args,
**kwargs):
super(ExternalTaskSensor, self).__init__(*args, **kwargs)
@@ -70,9 +76,24 @@ def __init__(self,
self.execution_date_fn = execution_date_fn
self.external_dag_id = external_dag_id
self.external_task_id = external_task_id
+ self.check_existence = check_existence
@provide_session
def poke(self, context, session=None):
+ TI = TaskInstance
+
+ if self.check_existence:
+ existence = session.query(TI).filter(
+ TI.dag_id == self.external_dag_id,
+ TI.task_id == self.external_task_id,
+ ).count()
+ session.commit()
+ if existence == 0:
+ raise AirflowException('The external task "' +
+ self.external_dag_id + " - " +
+ self.external_task_id +
+ '" does not exist.')
+
if self.execution_delta:
dttm = context['execution_date'] - self.execution_delta
elif self.execution_date_fn:
@@ -89,7 +110,7 @@ def poke(self, context, session=None):
'{self.external_dag_id}.'
'{self.external_task_id} on '
'{} ... '.format(serialized_dttm_filter, **locals()))
- TI = TaskInstance
+
count = session.query(TI).filter(
TI.dag_id == self.external_dag_id,
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 403c892c23..71906dfef3 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -22,7 +22,7 @@
from airflow import DAG, configuration, settings
from airflow import exceptions
-from airflow.exceptions import AirflowSensorTimeout
+from airflow.exceptions import AirflowSensorTimeout, AirflowException
from airflow.models import TaskInstance, DagBag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
@@ -247,3 +247,20 @@ def test_external_task_sensor_error_delta_and_fn(self):
allowed_states=['success'],
dag=self.dag
)
+
+ def test_external_task_sensor_check_existence(self):
+ self.test_time_sensor()
+ t = ExternalTaskSensor(
+ task_id='test_external_task_sensor_check',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id=TEST_TASK_ID + "-non-existing",
+ check_existence=True,
+ dag=self.dag
+ )
+
+ with self.assertRaises(AirflowException):
+ t.run(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ ignore_ti_state=True
+ )
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services