You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/10/02 20:52:36 UTC
incubator-airflow git commit: [AIRFLOW-988] Fix repeating SLA miss
callbacks
Repository: incubator-airflow
Updated Branches:
refs/heads/master 3c3a65a3f -> ebc02fb53
[AIRFLOW-988] Fix repeating SLA miss callbacks
When a callback is passed to `sla_miss_callback` but an email address
is not specified, the callback will be continuously called. This is due
to the logic used when pulling the slas in `SchedulerJob.manage_slas`.
By filtering on `notification_sent` only we will still handle the cases
where email is used, but it will prevent the continuous callbacks.
Closes #2415 from cjonesy/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ebc02fb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ebc02fb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ebc02fb5
Branch: refs/heads/master
Commit: ebc02fb53e0faa59cb2dcbc1db13adc58a88646c
Parents: 3c3a65a
Author: Charlie Jones <cj...@gmail.com>
Authored: Mon Oct 2 13:51:40 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Mon Oct 2 13:51:51 2017 -0700
----------------------------------------------------------------------
airflow/jobs.py | 5 ++---
tests/jobs.py | 41 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebc02fb5/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 6f4cf97..d697f2d 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -655,8 +655,7 @@ class SchedulerJob(BaseJob):
slas = (
session
.query(SlaMiss)
- .filter(or_(SlaMiss.email_sent == False,
- SlaMiss.notification_sent == False))
+ .filter(SlaMiss.notification_sent == False)
.filter(SlaMiss.dag_id == dag.dag_id)
.all()
)
@@ -696,7 +695,7 @@ class SchedulerJob(BaseJob):
dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
notification_sent = True
email_content = """\
- Here's a list of tasks thas missed their SLAs:
+ Here's a list of tasks that missed their SLAs:
<pre><code>{task_list}\n<code></pre>
Blocking tasks:
<pre><code>{blocking_task_list}\n{bug}<code></pre>
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebc02fb5/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index fc2a6b7..0a7f213 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -37,6 +37,7 @@ from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.task_runner.base_task_runner import BaseTaskRunner
+from airflow.utils.dates import days_ago
from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.timeout import timeout
@@ -2162,6 +2163,46 @@ class SchedulerJobTest(unittest.TestCase):
do_schedule()
self.assertEquals(2, len(executor.queued_tasks))
+ def test_scheduler_sla_miss_callback(self):
+ """
+ Test that the scheduler does not call the sla_miss_callback when a notification has already been sent
+ """
+ session = settings.Session()
+
+ # Mock the callback function so we can verify that it was not called
+ sla_callback = mock.MagicMock()
+
+ # Create dag with a start of 2 days ago, but an sla of 1 day ago so we'll already have an sla_miss on the books
+ test_start_date = days_ago(2)
+ dag = DAG(dag_id='test_sla_miss',
+ sla_miss_callback=sla_callback,
+ default_args={'start_date': test_start_date,
+ 'sla': datetime.timedelta(days=1)})
+
+ task = DummyOperator(task_id='dummy',
+ dag=dag,
+ owner='airflow')
+
+ # Create a TaskInstance for two days ago
+ session.merge(models.TaskInstance(task=task,
+ execution_date=test_start_date,
+ state='success'))
+
+ # Create an SlaMiss where notification was sent, but email was not
+ session.merge(models.SlaMiss(task_id='dummy',
+ dag_id='test_sla_miss',
+ execution_date=test_start_date,
+ email_sent=False,
+ notification_sent=True))
+
+ # Now call manage_slas and see if the sla_miss callback gets called
+ scheduler = SchedulerJob(dag_id='test_sla_miss',
+ num_runs=1,
+ **self.default_scheduler_args)
+ scheduler.manage_slas(dag=dag, session=session)
+
+ sla_callback.assert_not_called()
+
def test_retry_still_in_executor(self):
"""
Checks if the scheduler does not put a task in limbo, when a task is retried