You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/21 21:03:26 UTC
[airflow] branch main updated: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue (#25147)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 17ec6dbcfe Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue (#25147)
17ec6dbcfe is described below
commit 17ec6dbcfe4a4d60092644e1b373d80789802b49
Author: Andrew Gibbs <gi...@andrew.gibbs.io>
AuthorDate: Thu Jul 21 22:03:20 2022 +0100
Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue (#25147)
---
airflow/dag_processing/manager.py | 41 +++++++++++++++++-------
newsfragments/25147.bugfix.rst | 1 +
tests/dag_processing/test_manager.py | 62 ++++++++++++++++++++++++++++++++++++
3 files changed, 93 insertions(+), 11 deletions(-)
diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index b1d23034bf..cc6a38cb1d 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -38,7 +38,7 @@ from sqlalchemy.orm import Session
from tabulate import tabulate
import airflow.models
-from airflow.callbacks.callback_requests import CallbackRequest
+from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.models import DagModel, DagWarning, DbCallbackRequest, errors
@@ -679,16 +679,35 @@ class DagFileProcessorManager(LoggingMixin):
guard.commit()
def _add_callback_to_queue(self, request: CallbackRequest):
- self._callback_to_execute[request.full_filepath].append(request)
- # Callback has a higher priority over DAG Run scheduling
- if request.full_filepath in self._file_path_queue:
- # Remove file paths matching request.full_filepath from self._file_path_queue
- # Since we are already going to use that filepath to run callback,
- # there is no need to have same file path again in the queue
- self._file_path_queue = [
- file_path for file_path in self._file_path_queue if file_path != request.full_filepath
- ]
- self._file_path_queue.insert(0, request.full_filepath)
+
+ # requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled
+ # task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives,
+ # goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to
+ # the front of the queue, and we never get round to picking stuff off the back of the queue
+ if isinstance(request, SlaCallbackRequest):
+ if request in self._callback_to_execute[request.full_filepath]:
+ self.log.debug("Skipping already queued SlaCallbackRequest")
+ return
+
+ # not already queued, queue the file _at the back_, and add the request to the file's callbacks
+ self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id)
+ self._callback_to_execute[request.full_filepath].append(request)
+ if request.full_filepath not in self._file_path_queue:
+ self._file_path_queue.append(request.full_filepath)
+
+ # Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if
+ # already in the queue
+ else:
+ self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request)
+ self._callback_to_execute[request.full_filepath].append(request)
+ if request.full_filepath in self._file_path_queue:
+ # Remove file paths matching request.full_filepath from self._file_path_queue
+ # Since we are already going to use that filepath to run callback,
+ # there is no need to have same file path again in the queue
+ self._file_path_queue = [
+ file_path for file_path in self._file_path_queue if file_path != request.full_filepath
+ ]
+ self._file_path_queue.insert(0, request.full_filepath)
def _refresh_dag_dir(self):
"""Refresh file paths from dag dir if we haven't done it for too long."""
diff --git a/newsfragments/25147.bugfix.rst b/newsfragments/25147.bugfix.rst
new file mode 100644
index 0000000000..2d4523604c
--- /dev/null
+++ b/newsfragments/25147.bugfix.rst
@@ -0,0 +1 @@
+``DagProcessorManager`` callback queue changed to queue SLAs at the back (stops DAG processing stalling due to SLAs)
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index 891df4ec64..bcca03d559 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -867,6 +867,68 @@ class TestDagFileProcessorManager:
with create_session() as session:
assert session.query(DbCallbackRequest).count() == 1
+ def test_callback_queue(self, tmpdir):
+ # given
+ manager = DagFileProcessorManager(
+ dag_directory=TEST_DAG_FOLDER,
+ max_runs=1,
+ processor_timeout=timedelta(days=365),
+ signal_conn=MagicMock(),
+ dag_ids=[],
+ pickle_dags=False,
+ async_mode=True,
+ )
+
+ dag1_req1 = DagCallbackRequest(
+ full_filepath="/green_eggs/ham/file1.py",
+ dag_id="dag1",
+ run_id="run1",
+ is_failure_callback=False,
+ msg=None,
+ )
+ dag1_req2 = DagCallbackRequest(
+ full_filepath="/green_eggs/ham/file1.py",
+ dag_id="dag1",
+ run_id="run1",
+ is_failure_callback=False,
+ msg=None,
+ )
+ dag1_sla1 = SlaCallbackRequest(full_filepath="/green_eggs/ham/file1.py", dag_id="dag1")
+ dag1_sla2 = SlaCallbackRequest(full_filepath="/green_eggs/ham/file1.py", dag_id="dag1")
+
+ dag2_req1 = DagCallbackRequest(
+ full_filepath="/green_eggs/ham/file2.py",
+ dag_id="dag2",
+ run_id="run1",
+ is_failure_callback=False,
+ msg=None,
+ )
+
+ # when
+ manager._add_callback_to_queue(dag1_req1)
+ manager._add_callback_to_queue(dag1_sla1)
+ manager._add_callback_to_queue(dag2_req1)
+
+ # then - requests should be in manager's queue, with dag2 ahead of dag1 (because it was added last)
+ assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath]
+ assert set(manager._callback_to_execute.keys()) == {dag1_req1.full_filepath, dag2_req1.full_filepath}
+ assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1]
+ assert manager._callback_to_execute[dag2_req1.full_filepath] == [dag2_req1]
+
+ # when
+ manager._add_callback_to_queue(dag1_sla2)
+
+ # then - since sla2 == sla1, should not have brought dag1 to the fore
+ assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath]
+ assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1]
+
+ # when
+ manager._add_callback_to_queue(dag1_req2)
+
+ # then - non-sla callback should have brought dag1 to the fore
+ assert manager._file_path_queue == [dag1_req1.full_filepath, dag2_req1.full_filepath]
+ assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1, dag1_req2]
+
class TestDagFileProcessorAgent(unittest.TestCase):
def setUp(self):