You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/07/23 19:12:55 UTC

incubator-airflow git commit: [AIRFLOW-1398] Allow ExternalTaskSensor to wait on multiple runs of a task

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 2de4b7cfb -> 6d348903a


[AIRFLOW-1398] Allow ExternalTaskSensor to wait on multiple runs of a task

Currently using the execution_date_fn parameter of
the ExternalTaskSensor
sensors only allows to wait for the completion of
one given run of the
task the ExternalTaskSensor is sensing.

However, this prevents users to have setups where
dags don't have the same
schedule frequency but still depend on one
another. For example, let's say
you have a dag scheduled hourly that transforms
log data and is owned by
the team in charge of logging. In the current
setup you cannot have other
higher level teams, that want to use this
transformed data, create
dags processing transformed log data in daily
batches, while making sure
the logged transformed data was properly created.
Note that simply waiting for the data to be
present (using e.g. the
HivePartitionSensor if the data is in hive) might
not be satisfactory
because the data being present doesn't mean it is
ready to be used.

This commit makes it possible to do exactly that
by being able to have
an ExternalTaskSensor wait for multiple runs of
the task it is sensing to
have finished. Now higher level teams can setup
dags with an
ExternalTaskSensor sensing the end task of the dag
that transforms the
log data and waiting for the successful completion
of 24 of its hourly runs.

Closes #2431 from rlk-ama/pr/multiple-dates-
external-task-sensor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6d348903
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6d348903
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6d348903

Branch: refs/heads/master
Commit: 6d348903a2374251d2733219ae14c71208c61a7a
Parents: 2de4b7c
Author: Raphael Lopez Kaufman <ra...@booking.com>
Authored: Sun Jul 23 21:11:29 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Jul 23 21:11:32 2017 +0200

----------------------------------------------------------------------
 airflow/operators/sensors.py |  12 +++--
 tests/operators/sensors.py   | 110 +++++++++++++++++++++++++++++++++++++-
 2 files changed, 116 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6d348903/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 4f276ad..bfa2ef4 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -196,7 +196,7 @@ class ExternalTaskSensor(BaseSensorOperator):
         ExternalTaskSensor, but not both.
     :type execution_delta: datetime.timedelta
     :param execution_date_fn: function that receives the current execution date
-        and returns the desired execution date to query. Either execution_delta
+        and returns the desired execution dates to query. Either execution_delta
         or execution_date_fn can be passed to ExternalTaskSensor, but not both.
     :type execution_date_fn: callable
     """
@@ -231,11 +231,15 @@ class ExternalTaskSensor(BaseSensorOperator):
         else:
             dttm = context['execution_date']
 
+        dttm_filter = dttm if isinstance(dttm, list) else [dttm]
+        serialized_dttm_filter = ','.join(
+            [datetime.isoformat() for datetime in dttm_filter])
+
         logging.info(
             'Poking for '
             '{self.external_dag_id}.'
             '{self.external_task_id} on '
-            '{dttm} ... '.format(**locals()))
+            '{} ... '.format(serialized_dttm_filter, **locals()))
         TI = TaskInstance
 
         session = settings.Session()
@@ -243,11 +247,11 @@ class ExternalTaskSensor(BaseSensorOperator):
             TI.dag_id == self.external_dag_id,
             TI.task_id == self.external_task_id,
             TI.state.in_(self.allowed_states),
-            TI.execution_date == dttm,
+            TI.execution_date.in_(dttm_filter),
         ).count()
         session.commit()
         session.close()
-        return count
+        return count == len(dttm_filter)
 
 
 class NamedHivePartitionSensor(BaseSensorOperator):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6d348903/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
index 242abac..9a40a05 100644
--- a/tests/operators/sensors.py
+++ b/tests/operators/sensors.py
@@ -22,12 +22,18 @@ import unittest
 from mock import patch
 from datetime import datetime, timedelta
 
-from airflow import DAG, configuration
-from airflow.operators.sensors import HttpSensor, BaseSensorOperator, HdfsSensor
+from airflow import DAG, configuration, jobs, settings
+from airflow.jobs import BackfillJob, SchedulerJob
+from airflow.models import TaskInstance, DagModel, DagBag
+from airflow.operators.sensors import HttpSensor, BaseSensorOperator, HdfsSensor, ExternalTaskSensor
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.exceptions import (AirflowException,
                                 AirflowSensorTimeout,
                                 AirflowSkipException)
+from airflow.utils.state import State
+from tests.core import TEST_DAG_FOLDER
 configuration.load_test_config()
 
 DEFAULT_DATE = datetime(2015, 1, 1)
@@ -254,3 +260,103 @@ class HdfsSensorTests(unittest.TestCase):
         # Then
         with self.assertRaises(AirflowSensorTimeout):
             task.execute(None)
+
+
+class ExternalTaskSensorTests(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+        self.args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE,
+            'depends_on_past': False}
+
+    def test_external_task_sensor_fn_multiple_execution_dates(self):
+        bash_command_code = """
+{% set s=execution_date.time().second %}
+echo "second is {{ s }}"
+if [[ $(( {{ s }} % 60 )) == 1 ]]
+    then
+        exit 1
+fi
+exit 0
+"""
+        dag_external_id = TEST_DAG_ID + '_external'
+        dag_external = DAG(
+            dag_external_id,
+            default_args=self.args,
+            schedule_interval=timedelta(seconds=1))
+        task_external_with_failure = BashOperator(
+            task_id="task_external_with_failure",
+            bash_command=bash_command_code,
+            retries=0,
+            dag=dag_external)
+        task_external_without_failure = DummyOperator(
+            task_id="task_external_without_failure",
+            retries=0,
+            dag=dag_external)
+
+        task_external_without_failure.run(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + timedelta(seconds=1),
+            ignore_ti_state=True)
+
+        session = settings.Session()
+        TI = TaskInstance
+        try:
+            task_external_with_failure.run(
+                start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE + timedelta(seconds=1),
+                ignore_ti_state=True)
+            # The test_with_failure task is excepted to fail
+            # once per minute (the run on the first second of
+            # each minute).
+        except Exception as e:
+            failed_tis = session.query(TI).filter(
+                TI.dag_id == dag_external_id,
+                TI.state == State.FAILED,
+                TI.execution_date == DEFAULT_DATE + timedelta(seconds=1)).all()
+            if (len(failed_tis) == 1 and
+                    failed_tis[0].task_id == 'task_external_with_failure'):
+                pass
+            else:
+                raise e
+
+        dag_id = TEST_DAG_ID
+        dag = DAG(
+            dag_id,
+            default_args=self.args,
+            schedule_interval=timedelta(minutes=1))
+        task_without_failure = ExternalTaskSensor(
+            task_id='task_without_failure',
+            external_dag_id=dag_external_id,
+            external_task_id='task_external_without_failure',
+            execution_date_fn=lambda dt: [dt + timedelta(seconds=i)
+                                          for i in range(2)],
+            allowed_states=['success'],
+            retries=0,
+            timeout=1,
+            poke_interval=1,
+            dag=dag)
+        task_with_failure = ExternalTaskSensor(
+            task_id='task_with_failure',
+            external_dag_id=dag_external_id,
+            external_task_id='task_external_with_failure',
+            execution_date_fn=lambda dt: [dt + timedelta(seconds=i)
+                                          for i in range(2)],
+            allowed_states=['success'],
+            retries=0,
+            timeout=1,
+            poke_interval=1,
+            dag=dag)
+
+        task_without_failure.run(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            ignore_ti_state=True)
+
+        with self.assertRaises(AirflowSensorTimeout):
+            task_with_failure.run(
+                start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE,
+                ignore_ti_state=True)