You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/10/02 20:52:36 UTC

incubator-airflow git commit: [AIRFLOW-988] Fix repeating SLA miss callbacks

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3c3a65a3f -> ebc02fb53


[AIRFLOW-988] Fix repeating SLA miss callbacks

When a callback is passed to `sla_miss_callback` but an email address
is not specified, the callback will be continuously called. This is due
to the logic used when pulling the slas in `SchedulerJob.manage_slas`.

By filtering on `notification_sent` only we will still handle the cases
where email is used, but it will prevent the continuous callbacks.

Closes #2415 from cjonesy/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ebc02fb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ebc02fb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ebc02fb5

Branch: refs/heads/master
Commit: ebc02fb53e0faa59cb2dcbc1db13adc58a88646c
Parents: 3c3a65a
Author: Charlie Jones <cj...@gmail.com>
Authored: Mon Oct 2 13:51:40 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Mon Oct 2 13:51:51 2017 -0700

----------------------------------------------------------------------
 airflow/jobs.py |  5 ++---
 tests/jobs.py   | 41 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebc02fb5/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 6f4cf97..d697f2d 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -655,8 +655,7 @@ class SchedulerJob(BaseJob):
         slas = (
             session
             .query(SlaMiss)
-            .filter(or_(SlaMiss.email_sent == False,
-                        SlaMiss.notification_sent == False))
+            .filter(SlaMiss.notification_sent == False)
             .filter(SlaMiss.dag_id == dag.dag_id)
             .all()
         )
@@ -696,7 +695,7 @@ class SchedulerJob(BaseJob):
                 dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
                 notification_sent = True
             email_content = """\
-            Here's a list of tasks thas missed their SLAs:
+            Here's a list of tasks that missed their SLAs:
             <pre><code>{task_list}\n<code></pre>
             Blocking tasks:
             <pre><code>{blocking_task_list}\n{bug}<code></pre>

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebc02fb5/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index fc2a6b7..0a7f213 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -37,6 +37,7 @@ from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.task_runner.base_task_runner import BaseTaskRunner
+from airflow.utils.dates import days_ago
 from airflow.utils.db import provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
@@ -2162,6 +2163,46 @@ class SchedulerJobTest(unittest.TestCase):
         do_schedule()
         self.assertEquals(2, len(executor.queued_tasks))
 
+    def test_scheduler_sla_miss_callback(self):
+        """
+        Test that the scheduler does not call the sla_miss_callback when a notification has already been sent
+        """
+        session = settings.Session()
+
+        # Mock the callback function so we can verify that it was not called
+        sla_callback = mock.MagicMock()
+
+        # Create dag with a start of 2 days ago, but an sla of 1 day ago so we'll already have an sla_miss on the books
+        test_start_date = days_ago(2)
+        dag = DAG(dag_id='test_sla_miss',
+                  sla_miss_callback=sla_callback,
+                  default_args={'start_date': test_start_date,
+                                'sla': datetime.timedelta(days=1)})
+
+        task = DummyOperator(task_id='dummy',
+                             dag=dag,
+                             owner='airflow')
+
+        # Create a TaskInstance for two days ago
+        session.merge(models.TaskInstance(task=task,
+                                          execution_date=test_start_date,
+                                          state='success'))
+
+        # Create an SlaMiss where notification was sent, but email was not
+        session.merge(models.SlaMiss(task_id='dummy',
+                                     dag_id='test_sla_miss',
+                                     execution_date=test_start_date,
+                                     email_sent=False,
+                                     notification_sent=True))
+
+        # Now call manage_slas and see if the sla_miss callback gets called
+        scheduler = SchedulerJob(dag_id='test_sla_miss',
+                                 num_runs=1,
+                                 **self.default_scheduler_args)
+        scheduler.manage_slas(dag=dag, session=session)
+
+        sla_callback.assert_not_called()
+
     def test_retry_still_in_executor(self):
         """
         Checks if the scheduler does not put a task in limbo, when a task is retried