You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/01/23 20:54:36 UTC
[airflow] branch main updated: Revert "Send SLA callback to processor when DagRun has completed" (#20997)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6886d4a Revert "Send SLA callback to processor when DagRun has completed" (#20997)
6886d4a is described below
commit 6886d4ae58cc92bb3db149d753f6c15881eb8b7e
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Sun Jan 23 21:53:59 2022 +0100
Revert "Send SLA callback to processor when DagRun has completed" (#20997)
It turns out that while processing time for dags with slas were solved,
the sla misses are not being recorded.
---
airflow/jobs/scheduler_job.py | 5 +----
tests/jobs/test_scheduler_job.py | 43 ----------------------------------------
2 files changed, 1 insertion(+), 47 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index e8f506d..cbda16e 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1072,7 +1072,6 @@ class SchedulerJob(BaseJob):
# Send SLA & DAG Success/Failure Callbacks to be executed
self._send_dag_callbacks_to_processor(dag, callback_to_execute)
- self._send_sla_callbacks_to_processor(dag)
# Because we send the callback here, we need to return None
return callback
@@ -1088,8 +1087,6 @@ class SchedulerJob(BaseJob):
# Work out if we should allow creating a new DagRun now?
if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
- # Send SLA Callbacks to be executed
- self._send_sla_callbacks_to_processor(dag)
# This will do one query per dag run. We "could" build up a complex
# query to update all the TIs across all the execution dates and dag
# IDs in a single query, but it turns out that can be _very very slow_
@@ -1117,7 +1114,7 @@ class SchedulerJob(BaseJob):
def _send_dag_callbacks_to_processor(self, dag: DAG, callback: Optional[DagCallbackRequest] = None):
if not self.processor_agent:
raise ValueError("Processor agent is not started.")
-
+ self._send_sla_callbacks_to_processor(dag)
if callback:
self.processor_agent.send_callback_to_execute(callback)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index bfca2e8..707f587 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2620,49 +2620,6 @@ class TestSchedulerJob:
full_filepath=dag.fileloc, dag_id=dag_id
)
- def test_sla_sent_to_processor_when_dagrun_completes(self, dag_maker, session):
- """Test that SLA is sent to the processor when the dagrun completes"""
- with dag_maker() as dag:
- DummyOperator(task_id='task', sla=timedelta(hours=1))
- self.scheduler_job = SchedulerJob(subdir=os.devnull)
- self.scheduler_job.executor = MockExecutor(do_update=False)
- self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
- mock_sla_callback = mock.MagicMock()
- self.scheduler_job._send_sla_callbacks_to_processor = mock_sla_callback
- assert session.query(DagRun).count() == 0
- dag_models = DagModel.dags_needing_dagruns(session).all()
- self.scheduler_job._create_dag_runs(dag_models, session)
- dr = session.query(DagRun).one()
- dr.state = DagRunState.SUCCESS
- ti = dr.get_task_instance('task', session)
- ti.state = TaskInstanceState.SUCCESS
- session.merge(ti)
- session.merge(dr)
- session.flush()
- self.scheduler_job._schedule_dag_run(dr, session)
- dag = self.scheduler_job.dagbag.get_dag(dag.dag_id)
- self.scheduler_job._send_sla_callbacks_to_processor.assert_called_once_with(dag)
-
- def test_sla_sent_to_processor_when_dagrun_timeout(self, dag_maker, session):
- """Test that SLA is sent to the processor when the dagrun timeout"""
- with dag_maker(dagrun_timeout=datetime.timedelta(seconds=60)) as dag:
- DummyOperator(task_id='task', sla=timedelta(hours=1))
- self.scheduler_job = SchedulerJob(subdir=os.devnull)
- self.scheduler_job.executor = MockExecutor(do_update=False)
- self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
- mock_sla_callback = mock.MagicMock()
- self.scheduler_job._send_sla_callbacks_to_processor = mock_sla_callback
- assert session.query(DagRun).count() == 0
- dag_models = DagModel.dags_needing_dagruns(session).all()
- self.scheduler_job._create_dag_runs(dag_models, session)
- dr = session.query(DagRun).one()
- dr.start_date = timezone.utcnow() - datetime.timedelta(days=1)
- session.merge(dr)
- session.flush()
- self.scheduler_job._schedule_dag_run(dr, session)
- dag = self.scheduler_job.dagbag.get_dag(dag.dag_id)
- self.scheduler_job._send_sla_callbacks_to_processor.assert_called_once_with(dag)
-
def test_create_dag_runs(self, dag_maker):
"""
Test various invariants of _create_dag_runs.