You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/01/23 20:54:36 UTC

[airflow] branch main updated: Revert "Send SLA callback to processor when DagRun has completed" (#20997)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6886d4a  Revert "Send SLA callback to processor when DagRun has completed" (#20997)
6886d4a is described below

commit 6886d4ae58cc92bb3db149d753f6c15881eb8b7e
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Sun Jan 23 21:53:59 2022 +0100

    Revert "Send SLA callback to processor when DagRun has completed" (#20997)
    
    It turns out that while processing time for dags with slas were solved,
    the sla misses are not being recorded.
---
 airflow/jobs/scheduler_job.py    |  5 +----
 tests/jobs/test_scheduler_job.py | 43 ----------------------------------------
 2 files changed, 1 insertion(+), 47 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index e8f506d..cbda16e 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1072,7 +1072,6 @@ class SchedulerJob(BaseJob):
 
             # Send SLA & DAG Success/Failure Callbacks to be executed
             self._send_dag_callbacks_to_processor(dag, callback_to_execute)
-            self._send_sla_callbacks_to_processor(dag)
             # Because we send the callback here, we need to return None
             return callback
 
@@ -1088,8 +1087,6 @@ class SchedulerJob(BaseJob):
             # Work out if we should allow creating a new DagRun now?
             if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
                 dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
-            # Send SLA Callbacks to be executed
-            self._send_sla_callbacks_to_processor(dag)
         # This will do one query per dag run. We "could" build up a complex
         # query to update all the TIs across all the execution dates and dag
         # IDs in a single query, but it turns out that can be _very very slow_
@@ -1117,7 +1114,7 @@ class SchedulerJob(BaseJob):
     def _send_dag_callbacks_to_processor(self, dag: DAG, callback: Optional[DagCallbackRequest] = None):
         if not self.processor_agent:
             raise ValueError("Processor agent is not started.")
-
+        self._send_sla_callbacks_to_processor(dag)
         if callback:
             self.processor_agent.send_callback_to_execute(callback)
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index bfca2e8..707f587 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2620,49 +2620,6 @@ class TestSchedulerJob:
                 full_filepath=dag.fileloc, dag_id=dag_id
             )
 
-    def test_sla_sent_to_processor_when_dagrun_completes(self, dag_maker, session):
-        """Test that SLA is sent to the processor when the dagrun completes"""
-        with dag_maker() as dag:
-            DummyOperator(task_id='task', sla=timedelta(hours=1))
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.executor = MockExecutor(do_update=False)
-        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
-        mock_sla_callback = mock.MagicMock()
-        self.scheduler_job._send_sla_callbacks_to_processor = mock_sla_callback
-        assert session.query(DagRun).count() == 0
-        dag_models = DagModel.dags_needing_dagruns(session).all()
-        self.scheduler_job._create_dag_runs(dag_models, session)
-        dr = session.query(DagRun).one()
-        dr.state = DagRunState.SUCCESS
-        ti = dr.get_task_instance('task', session)
-        ti.state = TaskInstanceState.SUCCESS
-        session.merge(ti)
-        session.merge(dr)
-        session.flush()
-        self.scheduler_job._schedule_dag_run(dr, session)
-        dag = self.scheduler_job.dagbag.get_dag(dag.dag_id)
-        self.scheduler_job._send_sla_callbacks_to_processor.assert_called_once_with(dag)
-
-    def test_sla_sent_to_processor_when_dagrun_timeout(self, dag_maker, session):
-        """Test that SLA is sent to the processor when the dagrun timeout"""
-        with dag_maker(dagrun_timeout=datetime.timedelta(seconds=60)) as dag:
-            DummyOperator(task_id='task', sla=timedelta(hours=1))
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.executor = MockExecutor(do_update=False)
-        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
-        mock_sla_callback = mock.MagicMock()
-        self.scheduler_job._send_sla_callbacks_to_processor = mock_sla_callback
-        assert session.query(DagRun).count() == 0
-        dag_models = DagModel.dags_needing_dagruns(session).all()
-        self.scheduler_job._create_dag_runs(dag_models, session)
-        dr = session.query(DagRun).one()
-        dr.start_date = timezone.utcnow() - datetime.timedelta(days=1)
-        session.merge(dr)
-        session.flush()
-        self.scheduler_job._schedule_dag_run(dr, session)
-        dag = self.scheduler_job.dagbag.get_dag(dag.dag_id)
-        self.scheduler_job._send_sla_callbacks_to_processor.assert_called_once_with(dag)
-
     def test_create_dag_runs(self, dag_maker):
         """
         Test various invariants of _create_dag_runs.