You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2022/07/06 09:36:08 UTC
[airflow] branch main updated: Send DAG timeout callbacks to processor outside of prohibit_commit (#24366)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 438d13e73a Send DAG timeout callbacks to processor outside of prohibit_commit (#24366)
438d13e73a is described below
commit 438d13e73aeba752c5f65f8fe7c0cee082fbcc42
Author: Tanel Kiis <ta...@users.noreply.github.com>
AuthorDate: Wed Jul 6 12:35:58 2022 +0300
Send DAG timeout callbacks to processor outside of prohibit_commit (#24366)
---
airflow/jobs/scheduler_job.py | 5 +---
tests/jobs/test_scheduler_job.py | 52 +++++++++++++++++++++++++++++++++-------
2 files changed, 44 insertions(+), 13 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 9bcb0c9176..8452950e04 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1171,10 +1171,7 @@ class SchedulerJob(BaseJob):
msg='timed_out',
)
- # Send SLA & DAG Success/Failure Callbacks to be executed
- self._send_dag_callbacks_to_processor(dag, callback_to_execute)
- # Because we send the callback here, we need to return None
- return callback
+ return callback_to_execute
if dag_run.execution_date > timezone.utcnow() and not dag.allow_future_exec_dates:
self.log.error("Execution date is in future: %s", dag_run.execution_date)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index db6df7dfeb..27931d460a 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -46,7 +46,7 @@ from airflow.jobs.backfill_job import BackfillJob
from airflow.jobs.base_job import BaseJob
from airflow.jobs.local_task_job import LocalTaskJob
from airflow.jobs.scheduler_job import SchedulerJob
-from airflow.models import DAG, DagBag, DagModel, Pool, TaskInstance
+from airflow.models import DAG, DagBag, DagModel, DbCallbackRequest, Pool, TaskInstance
from airflow.models.dagrun import DagRun
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKey
@@ -1613,7 +1613,6 @@ class TestSchedulerJob:
self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.dagbag = dag_maker.dagbag
- self.scheduler_job.executor = MockExecutor()
session = settings.Session()
orm_dag = session.query(DagModel).get(dag.dag_id)
@@ -1639,7 +1638,7 @@ class TestSchedulerJob:
# Mock that processor_agent is started
self.scheduler_job.processor_agent = mock.Mock()
- self.scheduler_job._schedule_dag_run(dr, session)
+ callback = self.scheduler_job._schedule_dag_run(dr, session)
session.flush()
session.refresh(dr)
@@ -1658,8 +1657,8 @@ class TestSchedulerJob:
msg="timed_out",
)
- # Verify dag failure callback request is sent to file processor
- self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback)
+ # Verify dag failure callback request is sent
+ assert callback == expected_callback
session.rollback()
session.close()
@@ -1680,12 +1679,11 @@ class TestSchedulerJob:
self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.dagbag = dag_maker.dagbag
- self.scheduler_job.executor = MockExecutor()
# Mock that processor_agent is started
self.scheduler_job.processor_agent = mock.Mock()
- self.scheduler_job._schedule_dag_run(dr, session)
+ callback = self.scheduler_job._schedule_dag_run(dr, session)
session.flush()
session.refresh(dr)
@@ -1699,8 +1697,8 @@ class TestSchedulerJob:
msg="timed_out",
)
- # Verify dag failure callback request is sent to file processor
- self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback)
+ # Verify dag failure callback request is sent
+ assert callback == expected_callback
session.rollback()
session.close()
@@ -1780,6 +1778,42 @@ class TestSchedulerJob:
session.rollback()
session.close()
+ def test_dagrun_timeout_callbacks_are_stored_in_database(self, dag_maker, session):
+ with dag_maker(
+ dag_id='test_dagrun_timeout_callbacks_are_stored_in_database',
+ on_failure_callback=lambda x: print("failed"),
+ dagrun_timeout=timedelta(hours=1),
+ ) as dag:
+ EmptyOperator(task_id='empty')
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.executor = MockExecutor()
+ self.scheduler_job.executor.callback_sink = DatabaseCallbackSink()
+ self.scheduler_job.dagbag = dag_maker.dagbag
+ self.scheduler_job.processor_agent = mock.Mock()
+
+ dr = dag_maker.create_dagrun(start_date=DEFAULT_DATE)
+
+ with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
+ self.scheduler_job._do_scheduling(session)
+
+ callback = (
+ session.query(DbCallbackRequest)
+ .order_by(DbCallbackRequest.id.desc())
+ .first()
+ .get_callback_request()
+ )
+
+ expected_callback = DagCallbackRequest(
+ full_filepath=dag.fileloc,
+ dag_id=dr.dag_id,
+ is_failure_callback=True,
+ run_id=dr.run_id,
+ msg='timed_out',
+ )
+
+ assert callback == expected_callback
+
def test_dagrun_callbacks_commited_before_sent(self, dag_maker):
"""
Tests that before any callbacks are sent to the processor, the session is committed. This ensures