You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/19 17:53:33 UTC

incubator-airflow git commit: [AIRFLOW-511][Airflow 511] add success/failure callbacks on dag level

Repository: incubator-airflow
Updated Branches:
  refs/heads/master dd2bc8cb9 -> c3c4a8fdc


[AIRFLOW-511][Airflow 511] add success/failure callbacks on dag level

Closes #2934 from Acehaidrey/AIRFLOW-511


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c3c4a8fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c3c4a8fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c3c4a8fd

Branch: refs/heads/master
Commit: c3c4a8fdce0574d42be44f070d43807238daa9e8
Parents: dd2bc8c
Author: Ace Haidrey <ah...@pandora.com>
Authored: Fri Jan 19 18:53:27 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jan 19 18:53:27 2018 +0100

----------------------------------------------------------------------
 airflow/jobs.py   |  2 ++
 airflow/models.py | 43 ++++++++++++++++++++++++++++++++++
 tests/models.py   | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 107 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3c4a8fd/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ae6969e..1b5e661 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -772,6 +772,8 @@ class SchedulerJob(BaseJob):
                         dr.start_date < timezone.utcnow() - dag.dagrun_timeout):
                     dr.state = State.FAILED
                     dr.end_date = timezone.utcnow()
+                    dag.handle_callback(dr, success=False, reason='dagrun_timeout',
+                                        session=session)
                     timedout_runs += 1
             session.commit()
             if len(active_runs) - timedout_runs >= dag.max_active_runs:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3c4a8fd/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index c5233ec..5de18b2 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2961,6 +2961,12 @@ class DAG(BaseDag, LoggingMixin):
     :type orientation: string
     :param catchup: Perform scheduler catchup (or only run latest)? Defaults to True
     :type catchup: bool
+    :param on_failure_callback: A function to be called when a DagRun of this dag fails.
+        A context dictionary is passed as a single parameter to this function.
+    :type on_failure_callback: callable
+    :param on_success_callback: Much like the ``on_failure_callback`` except
+        that it is executed when the dag succeeds.
+    :type on_success_callback: callable
     """
 
     def __init__(
@@ -2981,6 +2987,7 @@ class DAG(BaseDag, LoggingMixin):
             default_view=configuration.get('webserver', 'dag_default_view').lower(),
             orientation=configuration.get('webserver', 'dag_orientation'),
             catchup=configuration.getboolean('scheduler', 'catchup_by_default'),
+            on_success_callback=None, on_failure_callback=None,
             params=None):
 
         self.user_defined_macros = user_defined_macros
@@ -3053,6 +3060,8 @@ class DAG(BaseDag, LoggingMixin):
         self.is_subdag = False  # DagBag.bag_dag() will set this to True if appropriate
 
         self.partial = False
+        self.on_success_callback = on_success_callback
+        self.on_failure_callback = on_failure_callback
 
         self._comps = {
             'dag_id',
@@ -3315,6 +3324,35 @@ class DAG(BaseDag, LoggingMixin):
         return qry.value('is_paused')
 
     @provide_session
+    def handle_callback(self, dagrun, success=True, reason=None, session=None):
+        """
+        Triggers the appropriate callback depending on the value of success, namely the
+        on_failure_callback or on_success_callback. This method gets the context of a
+        single TaskInstance part of this DagRun and passes that to the callable along
+        with a 'reason', primarily to differentiate DagRun failures.
+        .. note::
+            The logs end up in $AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log
+        :param dagrun: DagRun object
+        :param success: Flag to specify if failure or success callback should be called
+        :param reason: Completion reason
+        :param session: Database session
+        """
+        callback = self.on_success_callback if success else self.on_failure_callback
+        if callback:
+            self.log.info('Executing dag callback function: {}'.format(callback))
+            tis = dagrun.get_task_instances(session=session)
+            ti = tis[-1]  # get first TaskInstance of DagRun
+            # certain task instance attributes are transient so must save them
+            # -- especially during timeouts theyre lost
+            if not hasattr(ti, 'task'):
+                d = dagrun.dag or DagBag().get_dag(dag_id=dagrun.dag_id)
+                task = d.get_task(ti.task_id)
+                ti.task = task
+            context = ti.get_template_context(session=session)
+            context.update({'reason': reason})
+            callback(context)
+
+    @provide_session
     def get_active_runs(self, session=None):
         """
         Returns a list of dag run execution dates currently running
@@ -4762,18 +4800,23 @@ class DagRun(Base, LoggingMixin):
                     any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
                 self.log.info('Marking run %s failed', self)
                 self.state = State.FAILED
+                dag.handle_callback(self, success=False, reason='task_failure',
+                                    session=session)
 
             # if all roots succeeded and no unfinished tasks, the run succeeded
             elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED)
                                               for r in roots):
                 self.log.info('Marking run %s successful', self)
                 self.state = State.SUCCESS
+                dag.handle_callback(self, success=True, reason='success', session=session)
 
             # if *all tasks* are deadlocked, the run failed
             elif (unfinished_tasks and none_depends_on_past and
                   none_task_concurrency and no_dependencies_met):
                 self.log.info('Deadlock; marking run %s failed', self)
                 self.state = State.FAILED
+                dag.handle_callback(self, success=False, reason='all_tasks_deadlocked',
+                                    session=session)
 
             # finally, if the roots aren't done, the dag is still running
             else:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3c4a8fd/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index f0879eb..11bf7c9 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -645,6 +645,68 @@ class DagRunTest(unittest.TestCase):
         self.assertEqual(dr.state, State.RUNNING)
         self.assertEqual(dr2.state, State.RUNNING)
 
+    def test_dagrun_success_callback(self):
+        def on_success_callable(context):
+            self.assertEqual(
+                context['dag_run'].dag_id,
+                'test_dagrun_success_callback'
+            )
+
+        dag = DAG(
+            dag_id='test_dagrun_success_callback',
+            start_date=datetime.datetime(2017, 1, 1),
+            on_success_callback=on_success_callable,
+        )
+        dag_task1 = DummyOperator(
+            task_id='test_state_succeeded1',
+            dag=dag)
+        dag_task2 = DummyOperator(
+            task_id='test_state_succeeded2',
+            dag=dag)
+        dag_task1.set_downstream(dag_task2)
+
+        initial_task_states = {
+            'test_state_succeeded1': State.SUCCESS,
+            'test_state_succeeded2': State.SUCCESS,
+        }
+
+        dag_run = self.create_dag_run(dag=dag,
+                                      state=State.RUNNING,
+                                      task_states=initial_task_states)
+        updated_dag_state = dag_run.update_state()
+        self.assertEqual(State.SUCCESS, updated_dag_state)
+
+    def test_dagrun_failure_callback(self):
+        def on_failure_callable(context):
+            self.assertEqual(
+                context['dag_run'].dag_id,
+                'test_dagrun_failure_callback'
+            )
+
+        dag = DAG(
+            dag_id='test_dagrun_failure_callback',
+            start_date=datetime.datetime(2017, 1, 1),
+            on_failure_callback=on_failure_callable,
+        )
+        dag_task1 = DummyOperator(
+            task_id='test_state_succeeded1',
+            dag=dag)
+        dag_task2 = DummyOperator(
+            task_id='test_state_failed2',
+            dag=dag)
+
+        initial_task_states = {
+            'test_state_succeeded1': State.SUCCESS,
+            'test_state_failed2': State.FAILED,
+        }
+        dag_task1.set_downstream(dag_task2)
+
+        dag_run = self.create_dag_run(dag=dag,
+                                      state=State.RUNNING,
+                                      task_states=initial_task_states)
+        updated_dag_state = dag_run.update_state()
+        self.assertEqual(State.FAILED, updated_dag_state)
+
     def test_get_task_instance_on_empty_dagrun(self):
         """
         Make sure that a proper value is returned when a dagrun has no task instances