You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/19 17:53:33 UTC
incubator-airflow git commit: [AIRFLOW-511][Airflow 511] add
success/failure callbacks on dag level
Repository: incubator-airflow
Updated Branches:
refs/heads/master dd2bc8cb9 -> c3c4a8fdc
[AIRFLOW-511][Airflow 511] add success/failure callbacks on dag level
Closes #2934 from Acehaidrey/AIRFLOW-511
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c3c4a8fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c3c4a8fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c3c4a8fd
Branch: refs/heads/master
Commit: c3c4a8fdce0574d42be44f070d43807238daa9e8
Parents: dd2bc8c
Author: Ace Haidrey <ah...@pandora.com>
Authored: Fri Jan 19 18:53:27 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jan 19 18:53:27 2018 +0100
----------------------------------------------------------------------
airflow/jobs.py | 2 ++
airflow/models.py | 43 ++++++++++++++++++++++++++++++++++
tests/models.py | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 107 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3c4a8fd/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ae6969e..1b5e661 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -772,6 +772,8 @@ class SchedulerJob(BaseJob):
dr.start_date < timezone.utcnow() - dag.dagrun_timeout):
dr.state = State.FAILED
dr.end_date = timezone.utcnow()
+ dag.handle_callback(dr, success=False, reason='dagrun_timeout',
+ session=session)
timedout_runs += 1
session.commit()
if len(active_runs) - timedout_runs >= dag.max_active_runs:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3c4a8fd/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index c5233ec..5de18b2 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2961,6 +2961,12 @@ class DAG(BaseDag, LoggingMixin):
:type orientation: string
:param catchup: Perform scheduler catchup (or only run latest)? Defaults to True
:type catchup: bool
+ :param on_failure_callback: A function to be called when a DagRun of this dag fails.
+ A context dictionary is passed as a single parameter to this function.
+ :type on_failure_callback: callable
+ :param on_success_callback: Much like the ``on_failure_callback`` except
+ that it is executed when the dag succeeds.
+ :type on_success_callback: callable
"""
def __init__(
@@ -2981,6 +2987,7 @@ class DAG(BaseDag, LoggingMixin):
default_view=configuration.get('webserver', 'dag_default_view').lower(),
orientation=configuration.get('webserver', 'dag_orientation'),
catchup=configuration.getboolean('scheduler', 'catchup_by_default'),
+ on_success_callback=None, on_failure_callback=None,
params=None):
self.user_defined_macros = user_defined_macros
@@ -3053,6 +3060,8 @@ class DAG(BaseDag, LoggingMixin):
self.is_subdag = False # DagBag.bag_dag() will set this to True if appropriate
self.partial = False
+ self.on_success_callback = on_success_callback
+ self.on_failure_callback = on_failure_callback
self._comps = {
'dag_id',
@@ -3315,6 +3324,35 @@ class DAG(BaseDag, LoggingMixin):
return qry.value('is_paused')
@provide_session
+ def handle_callback(self, dagrun, success=True, reason=None, session=None):
+ """
+ Triggers the appropriate callback depending on the value of success, namely the
+ on_failure_callback or on_success_callback. This method gets the context of a
+ single TaskInstance part of this DagRun and passes that to the callable along
+ with a 'reason', primarily to differentiate DagRun failures.
+ .. note::
+ The logs end up in $AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log
+ :param dagrun: DagRun object
+ :param success: Flag to specify if failure or success callback should be called
+ :param reason: Completion reason
+ :param session: Database session
+ """
+ callback = self.on_success_callback if success else self.on_failure_callback
+ if callback:
+ self.log.info('Executing dag callback function: {}'.format(callback))
+ tis = dagrun.get_task_instances(session=session)
+ ti = tis[-1] # get first TaskInstance of DagRun
+ # certain task instance attributes are transient so must save them
+ # -- especially during timeouts theyre lost
+ if not hasattr(ti, 'task'):
+ d = dagrun.dag or DagBag().get_dag(dag_id=dagrun.dag_id)
+ task = d.get_task(ti.task_id)
+ ti.task = task
+ context = ti.get_template_context(session=session)
+ context.update({'reason': reason})
+ callback(context)
+
+ @provide_session
def get_active_runs(self, session=None):
"""
Returns a list of dag run execution dates currently running
@@ -4762,18 +4800,23 @@ class DagRun(Base, LoggingMixin):
any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
self.log.info('Marking run %s failed', self)
self.state = State.FAILED
+ dag.handle_callback(self, success=False, reason='task_failure',
+ session=session)
# if all roots succeeded and no unfinished tasks, the run succeeded
elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED)
for r in roots):
self.log.info('Marking run %s successful', self)
self.state = State.SUCCESS
+ dag.handle_callback(self, success=True, reason='success', session=session)
# if *all tasks* are deadlocked, the run failed
elif (unfinished_tasks and none_depends_on_past and
none_task_concurrency and no_dependencies_met):
self.log.info('Deadlock; marking run %s failed', self)
self.state = State.FAILED
+ dag.handle_callback(self, success=False, reason='all_tasks_deadlocked',
+ session=session)
# finally, if the roots aren't done, the dag is still running
else:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3c4a8fd/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index f0879eb..11bf7c9 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -645,6 +645,68 @@ class DagRunTest(unittest.TestCase):
self.assertEqual(dr.state, State.RUNNING)
self.assertEqual(dr2.state, State.RUNNING)
+ def test_dagrun_success_callback(self):
+ def on_success_callable(context):
+ self.assertEqual(
+ context['dag_run'].dag_id,
+ 'test_dagrun_success_callback'
+ )
+
+ dag = DAG(
+ dag_id='test_dagrun_success_callback',
+ start_date=datetime.datetime(2017, 1, 1),
+ on_success_callback=on_success_callable,
+ )
+ dag_task1 = DummyOperator(
+ task_id='test_state_succeeded1',
+ dag=dag)
+ dag_task2 = DummyOperator(
+ task_id='test_state_succeeded2',
+ dag=dag)
+ dag_task1.set_downstream(dag_task2)
+
+ initial_task_states = {
+ 'test_state_succeeded1': State.SUCCESS,
+ 'test_state_succeeded2': State.SUCCESS,
+ }
+
+ dag_run = self.create_dag_run(dag=dag,
+ state=State.RUNNING,
+ task_states=initial_task_states)
+ updated_dag_state = dag_run.update_state()
+ self.assertEqual(State.SUCCESS, updated_dag_state)
+
+ def test_dagrun_failure_callback(self):
+ def on_failure_callable(context):
+ self.assertEqual(
+ context['dag_run'].dag_id,
+ 'test_dagrun_failure_callback'
+ )
+
+ dag = DAG(
+ dag_id='test_dagrun_failure_callback',
+ start_date=datetime.datetime(2017, 1, 1),
+ on_failure_callback=on_failure_callable,
+ )
+ dag_task1 = DummyOperator(
+ task_id='test_state_succeeded1',
+ dag=dag)
+ dag_task2 = DummyOperator(
+ task_id='test_state_failed2',
+ dag=dag)
+
+ initial_task_states = {
+ 'test_state_succeeded1': State.SUCCESS,
+ 'test_state_failed2': State.FAILED,
+ }
+ dag_task1.set_downstream(dag_task2)
+
+ dag_run = self.create_dag_run(dag=dag,
+ state=State.RUNNING,
+ task_states=initial_task_states)
+ updated_dag_state = dag_run.update_state()
+ self.assertEqual(State.FAILED, updated_dag_state)
+
def test_get_task_instance_on_empty_dagrun(self):
"""
Make sure that a proper value is returned when a dagrun has no task instances