You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/21 09:15:45 UTC

[GitHub] [airflow] argibbs commented on a diff in pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

argibbs commented on code in PR #25147:
URL: https://github.com/apache/airflow/pull/25147#discussion_r926448536


##########
airflow/dag_processing/manager.py:
##########
@@ -679,16 +679,37 @@ def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION):
             guard.commit()
 
     def _add_callback_to_queue(self, request: CallbackRequest):
-        self._callback_to_execute[request.full_filepath].append(request)
-        # Callback has a higher priority over DAG Run scheduling
-        if request.full_filepath in self._file_path_queue:
-            # Remove file paths matching request.full_filepath from self._file_path_queue
-            # Since we are already going to use that filepath to run callback,
-            # there is no need to have same file path again in the queue
-            self._file_path_queue = [
-                file_path for file_path in self._file_path_queue if file_path != request.full_filepath
-            ]
-        self._file_path_queue.insert(0, request.full_filepath)
+
+        # requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled
+        # task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives,
+        # goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to
+        # the front of the queue, and we never get round to picking stuff off the back of the queue
+        if isinstance(request, SlaCallbackRequest):
+            if request in self._callback_to_execute[request.full_filepath]:
+                self.log.debug("Skipping already queued SlaCallbackRequest")
+                return
+
+            # not already queued, queue the file _at the back_, and add the request to the file's callbacks
+            self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id)
+            self._callback_to_execute[request.full_filepath].append(request)
+            if request.full_filepath not in self._file_path_queue:
+                self._file_path_queue.append(request.full_filepath)
+
+        # Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if
+        # already in the queue
+        else:
+            self.log.debug(
+                "Queuing %s CallbackRequest: %s", type(request).__name__.rsplit('.', 1)[-1], request

Review Comment:
   Good point. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org