You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/21 21:03:26 UTC

[airflow] branch main updated: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue (#25147)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 17ec6dbcfe Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue (#25147)
17ec6dbcfe is described below

commit 17ec6dbcfe4a4d60092644e1b373d80789802b49
Author: Andrew Gibbs <gi...@andrew.gibbs.io>
AuthorDate: Thu Jul 21 22:03:20 2022 +0100

    Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue (#25147)
---
 airflow/dag_processing/manager.py    | 41 +++++++++++++++++-------
 newsfragments/25147.bugfix.rst       |  1 +
 tests/dag_processing/test_manager.py | 62 ++++++++++++++++++++++++++++++++++++
 3 files changed, 93 insertions(+), 11 deletions(-)

diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index b1d23034bf..cc6a38cb1d 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -38,7 +38,7 @@ from sqlalchemy.orm import Session
 from tabulate import tabulate
 
 import airflow.models
-from airflow.callbacks.callback_requests import CallbackRequest
+from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest
 from airflow.configuration import conf
 from airflow.dag_processing.processor import DagFileProcessorProcess
 from airflow.models import DagModel, DagWarning, DbCallbackRequest, errors
@@ -679,16 +679,35 @@ class DagFileProcessorManager(LoggingMixin):
             guard.commit()
 
     def _add_callback_to_queue(self, request: CallbackRequest):
-        self._callback_to_execute[request.full_filepath].append(request)
-        # Callback has a higher priority over DAG Run scheduling
-        if request.full_filepath in self._file_path_queue:
-            # Remove file paths matching request.full_filepath from self._file_path_queue
-            # Since we are already going to use that filepath to run callback,
-            # there is no need to have same file path again in the queue
-            self._file_path_queue = [
-                file_path for file_path in self._file_path_queue if file_path != request.full_filepath
-            ]
-        self._file_path_queue.insert(0, request.full_filepath)
+
+        # requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled
+        # task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives,
+        # goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to
+        # the front of the queue, and we never get round to picking stuff off the back of the queue
+        if isinstance(request, SlaCallbackRequest):
+            if request in self._callback_to_execute[request.full_filepath]:
+                self.log.debug("Skipping already queued SlaCallbackRequest")
+                return
+
+            # not already queued, queue the file _at the back_, and add the request to the file's callbacks
+            self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id)
+            self._callback_to_execute[request.full_filepath].append(request)
+            if request.full_filepath not in self._file_path_queue:
+                self._file_path_queue.append(request.full_filepath)
+
+        # Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if
+        # already in the queue
+        else:
+            self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request)
+            self._callback_to_execute[request.full_filepath].append(request)
+            if request.full_filepath in self._file_path_queue:
+                # Remove file paths matching request.full_filepath from self._file_path_queue
+                # Since we are already going to use that filepath to run callback,
+                # there is no need to have same file path again in the queue
+                self._file_path_queue = [
+                    file_path for file_path in self._file_path_queue if file_path != request.full_filepath
+                ]
+            self._file_path_queue.insert(0, request.full_filepath)
 
     def _refresh_dag_dir(self):
         """Refresh file paths from dag dir if we haven't done it for too long."""
diff --git a/newsfragments/25147.bugfix.rst b/newsfragments/25147.bugfix.rst
new file mode 100644
index 0000000000..2d4523604c
--- /dev/null
+++ b/newsfragments/25147.bugfix.rst
@@ -0,0 +1 @@
+``DagProcessorManager`` callback queue changed to queue SLAs at the back (stops DAG processing stalling due to SLAs)
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index 891df4ec64..bcca03d559 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -867,6 +867,68 @@ class TestDagFileProcessorManager:
         with create_session() as session:
             assert session.query(DbCallbackRequest).count() == 1
 
+    def test_callback_queue(self, tmpdir):
+        # given
+        manager = DagFileProcessorManager(
+            dag_directory=TEST_DAG_FOLDER,
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        dag1_req1 = DagCallbackRequest(
+            full_filepath="/green_eggs/ham/file1.py",
+            dag_id="dag1",
+            run_id="run1",
+            is_failure_callback=False,
+            msg=None,
+        )
+        dag1_req2 = DagCallbackRequest(
+            full_filepath="/green_eggs/ham/file1.py",
+            dag_id="dag1",
+            run_id="run1",
+            is_failure_callback=False,
+            msg=None,
+        )
+        dag1_sla1 = SlaCallbackRequest(full_filepath="/green_eggs/ham/file1.py", dag_id="dag1")
+        dag1_sla2 = SlaCallbackRequest(full_filepath="/green_eggs/ham/file1.py", dag_id="dag1")
+
+        dag2_req1 = DagCallbackRequest(
+            full_filepath="/green_eggs/ham/file2.py",
+            dag_id="dag2",
+            run_id="run1",
+            is_failure_callback=False,
+            msg=None,
+        )
+
+        # when
+        manager._add_callback_to_queue(dag1_req1)
+        manager._add_callback_to_queue(dag1_sla1)
+        manager._add_callback_to_queue(dag2_req1)
+
+        # then - requests should be in manager's queue, with dag2 ahead of dag1 (because it was added last)
+        assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath]
+        assert set(manager._callback_to_execute.keys()) == {dag1_req1.full_filepath, dag2_req1.full_filepath}
+        assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1]
+        assert manager._callback_to_execute[dag2_req1.full_filepath] == [dag2_req1]
+
+        # when
+        manager._add_callback_to_queue(dag1_sla2)
+
+        # then - since sla2 == sla1, should not have brought dag1 to the fore
+        assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath]
+        assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1]
+
+        # when
+        manager._add_callback_to_queue(dag1_req2)
+
+        # then - non-sla callback should have brought dag1 to the fore
+        assert manager._file_path_queue == [dag1_req1.full_filepath, dag2_req1.full_filepath]
+        assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1, dag1_req2]
+
 
 class TestDagFileProcessorAgent(unittest.TestCase):
     def setUp(self):