You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/17 23:51:00 UTC

[GitHub] stale[bot] closed pull request #2500: [AIRFLOW-1488] Add the DagRunSensor operator.

stale[bot] closed pull request #2500: [AIRFLOW-1488] Add the DagRunSensor operator.
URL: https://github.com/apache/incubator-airflow/pull/2500
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/operators/dagrun_sensor.py b/airflow/contrib/operators/dagrun_sensor.py
new file mode 100644
index 0000000000..f4465626af
--- /dev/null
+++ b/airflow/contrib/operators/dagrun_sensor.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from airflow import settings
+from airflow.utils.state import State
+from airflow.utils.decorators import apply_defaults
+from airflow.models import DagRun
+from airflow.operators.sensors import BaseSensorOperator
+
+
+class DagRunSensor(BaseSensorOperator):
+    """
+    Waits for a DAG run to complete.
+
+    :param external_dag_id: The dag_id that you want to wait for
+    :type external_dag_id: string
+    :param allowed_states: list of allowed states, default is ``['success']``
+    :type allowed_states: list
+    :param execution_delta: time difference with the previous execution to look
+    at, the default is the same execution_date as the current task.  For
+    yesterday, use [positive!] datetime.timedelta(days=1). Either
+    execution_delta or execution_date_fn can be passed to DagRunSensor, but not
+    both.
+    :type execution_delta: datetime.timedelta
+    :param execution_date_fn: function that receives the current execution date
+    and returns the desired execution dates to query. Either execution_delta or
+    execution_date_fn can be passed to DagRunSensor, but not both.
+    :type execution_date_fn: callable
+    """
+    @apply_defaults
+    def __init__(
+            self,
+            external_dag_id,
+            allowed_states=None,
+            execution_delta=None,
+            execution_date_fn=None,
+            *args, **kwargs):
+        super(DagRunSensor, self).__init__(*args, **kwargs)
+
+        if execution_delta is not None and execution_date_fn is not None:
+            raise ValueError(
+                'Only one of `execution_date` or `execution_date_fn` may'
+                'be provided to DagRunSensor; not both.')
+
+        self.allowed_states = allowed_states or [State.SUCCESS]
+        self.execution_delta = execution_delta
+        self.execution_date_fn = execution_date_fn
+        self.external_dag_id = external_dag_id
+
+    def poke(self, context):
+        if self.execution_delta:
+            dttm = context['execution_date'] - self.execution_delta
+        elif self.execution_date_fn:
+            dttm = self.execution_date_fn(context['execution_date'])
+        else:
+            dttm = context['execution_date']
+
+        dttm_filter = dttm if isinstance(dttm, list) else [dttm]
+        serialized_dttm_filter = ','.join([datetime.isoformat() for datetime in
+                                           dttm_filter])
+
+        logging.info(
+             'Poking for '
+             '{self.external_dag_id}.'
+             '{serialized_dttm_filter} ... '.format(**locals()))
+
+        session = settings.Session()
+        count = session.query(DagRun).filter(
+            DagRun.dag_id == self.external_dag_id,
+            DagRun.state.in_(self.allowed_states),
+            DagRun.execution_date.in_(dttm_filter),
+        ).count()
+        session.commit()
+        session.close()
+        return count == len(dttm_filter)
diff --git a/tests/contrib/operators/test_dagrun_sensor.py b/tests/contrib/operators/test_dagrun_sensor.py
new file mode 100644
index 0000000000..74e4d46ccb
--- /dev/null
+++ b/tests/contrib/operators/test_dagrun_sensor.py
@@ -0,0 +1,119 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+from os.path import dirname, realpath
+import unittest
+from datetime import timedelta, datetime
+
+from airflow.models import DagBag, TaskInstance, DagRun
+from airflow.settings import Session
+from airflow.utils.state import State
+from airflow import configuration
+from airflow.exceptions import AirflowException
+
+DEFAULT_DATE = datetime(2017, 1, 1)
+TEST_DAG_ID = 'test_dagrun_sensor_dag'
+TEST_DAG_FOLDER = os.path.join(
+    dirname(dirname(dirname(realpath(__file__)))), 'dags')
+
+
+class TestDagRunSensor(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+        self.default_scheduler_args = {
+            "file_process_interval": 0,
+            "processor_poll_interval": 0.5,
+            "num_runs": 1
+        }
+        self.dagbag = DagBag(dag_folder=TEST_DAG_FOLDER)
+
+    def test_poke(self):
+        dag_parent = self.dagbag.get_dag(TEST_DAG_ID+'_parent_clean')
+        dag_parent.run(
+            start_date=DEFAULT_DATE+timedelta(seconds=0),
+            end_date=DEFAULT_DATE+timedelta(seconds=8),
+        )
+
+        dag_child = self.dagbag.get_dag(TEST_DAG_ID+'_child_clean')
+
+        # One of the following two runs should succeed (00:00:00), while the
+        # other (00:00:05) should have its sensor time out, since 00:00:09
+        # will never be run for the parent dag.
+
+        # first (safe) run
+        dag_child.run(
+            start_date=DEFAULT_DATE+timedelta(seconds=0),
+            end_date=DEFAULT_DATE+timedelta(seconds=0),
+        )
+
+        sess = Session()
+        TI = TaskInstance
+        sensor_tis = sess.query(TI).filter(
+            TI.dag_id == TEST_DAG_ID+'_child_clean',
+            TI.task_id == 'sense_parent',
+            TI.state == State.SUCCESS,
+        ).all()
+        self.assertEqual(len(sensor_tis), 1)
+
+        do_stuff_tis = sess.query(TI).filter(
+            TI.dag_id == TEST_DAG_ID+'_child_clean',
+            TI.task_id == 'do_stuff',
+            TI.state == State.SUCCESS,
+        ).all()
+        self.assertEqual(len(do_stuff_tis), 1)
+
+        DR = DagRun
+        drs = sess.query(DR).filter(
+            DR.dag_id == TEST_DAG_ID+'_child_clean',
+            DR.state == State.SUCCESS,
+            DR.execution_date == DEFAULT_DATE,
+        ).all()
+        self.assertEqual(len(drs), 1)
+
+        # second run
+        with self.assertRaises(AirflowException):
+            # the AirflowTaskTimeout raised by the sensor is caught by
+            # the executor, and what we see is an AirflowException for
+            # the dependent task which fails because of a failed upstream
+            # task.
+            dag_child.run(
+                start_date=DEFAULT_DATE+timedelta(seconds=5),
+                end_date=DEFAULT_DATE+timedelta(seconds=5),
+            )
+
+        failed_tis = sess.query(TI).filter(
+            TI.state == State.FAILED,
+        ).all()
+        self.assertEqual(len(failed_tis), 1)
+        failed_ti = failed_tis[0]
+        self.assertEqual(failed_ti.task_id, 'sense_parent')
+        self.assertEqual(failed_ti.dag_id, TEST_DAG_ID+'_child_clean')
+        self.assertEqual(failed_ti.execution_date,
+                         DEFAULT_DATE+timedelta(seconds=5))
+
+        failed_drs = sess.query(DR).filter(
+            DR.dag_id == TEST_DAG_ID+'_child_clean',
+            DR.state == State.FAILED,
+        ).all()
+        self.assertEqual(len(failed_drs), 1)
+
+        self.assertEqual(failed_drs[0].execution_date,
+                         DEFAULT_DATE+timedelta(seconds=5))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/dags/test_dagrun_sensor.py b/tests/dags/test_dagrun_sensor.py
new file mode 100644
index 0000000000..daee9e0801
--- /dev/null
+++ b/tests/dags/test_dagrun_sensor.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.operators.bash_operator import BashOperator
+from airflow.contrib.operators.dagrun_sensor import DagRunSensor
+from tests.contrib.operators.test_dagrun_sensor import (DEFAULT_DATE,
+                                                        TEST_DAG_ID)
+
+args = {
+    'start_date': DEFAULT_DATE,
+    'owner': 'airflow',
+    'depends_on_past': False
+}
+
+with DAG(dag_id=TEST_DAG_ID+'_parent_clean',
+         default_args=args,
+         start_date=DEFAULT_DATE,
+         schedule_interval=timedelta(seconds=1)) as dag_parent:
+    t1 = BashOperator(
+        task_id='task_1',
+        bash_command="echo 'one'",
+    )
+    t2 = BashOperator(
+        task_id='task_2',
+        bash_command="echo 'two'",
+    )
+    t1 >> t2
+
+
+# A five-secondly workflow that depends on the 5 secondly runs of the parent
+# dag above.
+with DAG(dag_id=TEST_DAG_ID+'_child_clean',
+         default_args=args,
+         start_date=DEFAULT_DATE,
+         schedule_interval=timedelta(seconds=5)) as dag_child:
+    t1 = DagRunSensor(
+        task_id='sense_parent',
+        external_dag_id=TEST_DAG_ID+'_parent_clean',
+        execution_date_fn=lambda d: [d+timedelta(seconds=i) for i in range(5)],
+        timeout=5,
+        poke_interval=1,
+    )
+    t2 = BashOperator(
+        task_id='do_stuff',
+        bash_command="echo 'finished'",
+    )
+    t1 >> t2


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services