You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2022/07/06 09:36:08 UTC

[airflow] branch main updated: Send DAG timeout callbacks to processor outside of prohibit_commit (#24366)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 438d13e73a Send DAG timeout callbacks to processor outside of prohibit_commit (#24366)
438d13e73a is described below

commit 438d13e73aeba752c5f65f8fe7c0cee082fbcc42
Author: Tanel Kiis <ta...@users.noreply.github.com>
AuthorDate: Wed Jul 6 12:35:58 2022 +0300

    Send DAG timeout callbacks to processor outside of prohibit_commit (#24366)
---
 airflow/jobs/scheduler_job.py    |  5 +---
 tests/jobs/test_scheduler_job.py | 52 +++++++++++++++++++++++++++++++++-------
 2 files changed, 44 insertions(+), 13 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 9bcb0c9176..8452950e04 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1171,10 +1171,7 @@ class SchedulerJob(BaseJob):
                 msg='timed_out',
             )
 
-            # Send SLA & DAG Success/Failure Callbacks to be executed
-            self._send_dag_callbacks_to_processor(dag, callback_to_execute)
-            # Because we send the callback here, we need to return None
-            return callback
+            return callback_to_execute
 
         if dag_run.execution_date > timezone.utcnow() and not dag.allow_future_exec_dates:
             self.log.error("Execution date is in future: %s", dag_run.execution_date)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index db6df7dfeb..27931d460a 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -46,7 +46,7 @@ from airflow.jobs.backfill_job import BackfillJob
 from airflow.jobs.base_job import BaseJob
 from airflow.jobs.local_task_job import LocalTaskJob
 from airflow.jobs.scheduler_job import SchedulerJob
-from airflow.models import DAG, DagBag, DagModel, Pool, TaskInstance
+from airflow.models import DAG, DagBag, DagModel, DbCallbackRequest, Pool, TaskInstance
 from airflow.models.dagrun import DagRun
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKey
@@ -1613,7 +1613,6 @@ class TestSchedulerJob:
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job.dagbag = dag_maker.dagbag
-        self.scheduler_job.executor = MockExecutor()
 
         session = settings.Session()
         orm_dag = session.query(DagModel).get(dag.dag_id)
@@ -1639,7 +1638,7 @@ class TestSchedulerJob:
         # Mock that processor_agent is started
         self.scheduler_job.processor_agent = mock.Mock()
 
-        self.scheduler_job._schedule_dag_run(dr, session)
+        callback = self.scheduler_job._schedule_dag_run(dr, session)
         session.flush()
 
         session.refresh(dr)
@@ -1658,8 +1657,8 @@ class TestSchedulerJob:
             msg="timed_out",
         )
 
-        # Verify dag failure callback request is sent to file processor
-        self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback)
+        # Verify dag failure callback request is sent
+        assert callback == expected_callback
 
         session.rollback()
         session.close()
@@ -1680,12 +1679,11 @@ class TestSchedulerJob:
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job.dagbag = dag_maker.dagbag
-        self.scheduler_job.executor = MockExecutor()
 
         # Mock that processor_agent is started
         self.scheduler_job.processor_agent = mock.Mock()
 
-        self.scheduler_job._schedule_dag_run(dr, session)
+        callback = self.scheduler_job._schedule_dag_run(dr, session)
         session.flush()
 
         session.refresh(dr)
@@ -1699,8 +1697,8 @@ class TestSchedulerJob:
             msg="timed_out",
         )
 
-        # Verify dag failure callback request is sent to file processor
-        self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback)
+        # Verify dag failure callback request is sent
+        assert callback == expected_callback
 
         session.rollback()
         session.close()
@@ -1780,6 +1778,42 @@ class TestSchedulerJob:
         session.rollback()
         session.close()
 
+    def test_dagrun_timeout_callbacks_are_stored_in_database(self, dag_maker, session):
+        with dag_maker(
+            dag_id='test_dagrun_timeout_callbacks_are_stored_in_database',
+            on_failure_callback=lambda x: print("failed"),
+            dagrun_timeout=timedelta(hours=1),
+        ) as dag:
+            EmptyOperator(task_id='empty')
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor()
+        self.scheduler_job.executor.callback_sink = DatabaseCallbackSink()
+        self.scheduler_job.dagbag = dag_maker.dagbag
+        self.scheduler_job.processor_agent = mock.Mock()
+
+        dr = dag_maker.create_dagrun(start_date=DEFAULT_DATE)
+
+        with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
+            self.scheduler_job._do_scheduling(session)
+
+        callback = (
+            session.query(DbCallbackRequest)
+            .order_by(DbCallbackRequest.id.desc())
+            .first()
+            .get_callback_request()
+        )
+
+        expected_callback = DagCallbackRequest(
+            full_filepath=dag.fileloc,
+            dag_id=dr.dag_id,
+            is_failure_callback=True,
+            run_id=dr.run_id,
+            msg='timed_out',
+        )
+
+        assert callback == expected_callback
+
     def test_dagrun_callbacks_commited_before_sent(self, dag_maker):
         """
         Tests that before any callbacks are sent to the processor, the session is committed. This ensures