You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/19 08:02:30 UTC

[GitHub] [airflow] argibbs opened a new pull request, #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

argibbs opened a new pull request, #25147:
URL: https://github.com/apache/airflow/pull/25147

   # Problem
   
   When SLAs are enabled, DAG processing can grind to a halt. this manifests as updates to dag files being ignored: newly added dags do not appear, and changes to existing dags do not take effect.
   
   The behaviour will be seemingly random - some dags will update, others not. The reason is because internally the DagProcessorManager maintains a queue of dags to parse and update. Dags get into this queue by a couple of mechanisms. The first and obvious one is the scanning of the file system for dag files.
   
   However, dags can also get into this queue during evaluation of the dag's state by the scheduler (`scheduler_job.py`). Since these event-based callbacks presumably require more rapid reaction than a regular scan of the file system, they go to the front of the queue.
   
   And this is how SLAs break the system; prior to this MR they are treated the same as other callbacks, i.e. they cause their file to go to the front of the queue. The problem is that SLAs are evaluated per dag file, but a single dag may have many tasks with SLAs. Thus the evaluation of a single DAG may generate _many_ SLA callbacks.
   
   These cause the affected file to go to the front of the queue. It's re-evaluated, and then the SLA events are fired again.
   
   What this means in practice is that you will see the DagProcessorManager process a dag file with SLAs, move onto the next file in the queue, maybe even make it to 2 or 3 more dags ... and then more SLAs callbacks arrive from the first dag and reset the queue. The DagProcessorManager never makes it all the way to the end of its queue.
   
   # Solution
   
   It's pretty simple: the DagProcessorManager queue is altered s.t. SLA callbacks are added (if they don't already exist - remember they're processed per-dag, but generated one per task-with-SLA), and when added they do not change the place of the dag file in the queue. If it's not in the queue, it's added at the back.
   
   # Notes
   
   This may feel a bit sticky-tape-and-string; you could argue that the SLACallbacks shouldn't be generated so rapidly. However, the only thing that knows the state of the queue is the DagProcessorManager, and it's unreasonable to expect the DagProcessors to throttle themselves without knowing whether such throttling is necessary.
   
   To put it another way, more optimisations in the DagProcessors are possible, but having the queue gate the callbacks as they're added is necessary and sufficient to stop the SLAs spamming the queue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25147:
URL: https://github.com/apache/airflow/pull/25147#issuecomment-1191462181

   > Yeah, whatever permissions you need to trigger a re-run, I don't have them. No retry icon on mouse over in the logs view, and no re-run dropdown either. Not a biggie, I'm content to keep re-pushing the latest commit, this is more of an FYI. ![image](https://user-images.githubusercontent.com/4406514/180212239-ac81474b-9d57-4edb-ba01-c0d9cdcfab9f.png)
   
   :( 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25147:
URL: https://github.com/apache/airflow/pull/25147#issuecomment-1191932877

   Merging now - unless someone will have objections - it's simple enough change to be reverted/improved on in case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25147:
URL: https://github.com/apache/airflow/pull/25147#issuecomment-1191350803

   Cool. Let me know - we were so happy that it appeared in March, that we did not pay attention if it is only for maintainers or not.
   
   BTW. There is another option from the UI - you can close/reopen PR, but this will re-trigger the whole build again and there are cases it won't work - for example when image build failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
argibbs commented on PR #25147:
URL: https://github.com/apache/airflow/pull/25147#issuecomment-1191419937

   Yeah, whatever permissions you need to trigger a re-run, I don't have them. No retry icon on mouse over in the logs view, and no re-run dropdown either. Not a biggie, I'm content to keep re-pushing the latest commit, this is more of an FYI.
   ![image](https://user-images.githubusercontent.com/4406514/180212239-ac81474b-9d57-4edb-ba01-c0d9cdcfab9f.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
argibbs commented on PR #25147:
URL: https://github.com/apache/airflow/pull/25147#issuecomment-1191326307

   NW, I'll wait for a second opinion. 
   
   (I need to tickle the build checks anyway to get the tests green). Possibly a n00b question, but there's no way to re-run a failed check other than force-pushing the last commit again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #25147:
URL: https://github.com/apache/airflow/pull/25147


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
argibbs commented on code in PR #25147:
URL: https://github.com/apache/airflow/pull/25147#discussion_r926448536


##########
airflow/dag_processing/manager.py:
##########
@@ -679,16 +679,37 @@ def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION):
             guard.commit()
 
     def _add_callback_to_queue(self, request: CallbackRequest):
-        self._callback_to_execute[request.full_filepath].append(request)
-        # Callback has a higher priority over DAG Run scheduling
-        if request.full_filepath in self._file_path_queue:
-            # Remove file paths matching request.full_filepath from self._file_path_queue
-            # Since we are already going to use that filepath to run callback,
-            # there is no need to have same file path again in the queue
-            self._file_path_queue = [
-                file_path for file_path in self._file_path_queue if file_path != request.full_filepath
-            ]
-        self._file_path_queue.insert(0, request.full_filepath)
+
+        # requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled
+        # task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives,
+        # goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to
+        # the front of the queue, and we never get round to picking stuff off the back of the queue
+        if isinstance(request, SlaCallbackRequest):
+            if request in self._callback_to_execute[request.full_filepath]:
+                self.log.debug("Skipping already queued SlaCallbackRequest")
+                return
+
+            # not already queued, queue the file _at the back_, and add the request to the file's callbacks
+            self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id)
+            self._callback_to_execute[request.full_filepath].append(request)
+            if request.full_filepath not in self._file_path_queue:
+                self._file_path_queue.append(request.full_filepath)
+
+        # Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if
+        # already in the queue
+        else:
+            self.log.debug(
+                "Queuing %s CallbackRequest: %s", type(request).__name__.rsplit('.', 1)[-1], request

Review Comment:
   Good point. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
argibbs commented on PR #25147:
URL: https://github.com/apache/airflow/pull/25147#issuecomment-1192497766

   Legendary, ty!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
argibbs commented on PR #25147:
URL: https://github.com/apache/airflow/pull/25147#issuecomment-1192685888

   Good news / bad news: this fix works as intended, however, it simply clears the way for _another_ bug to manifest. 
   
   Just when I thought I was out, they drag me back in.
   
   I'm working on a fix locally (again), will raise an MR when I've tested, etc. etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
argibbs commented on PR #25147:
URL: https://github.com/apache/airflow/pull/25147#issuecomment-1189009120

   @potiuk you said to ping you when I had the PR ready :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25147:
URL: https://github.com/apache/airflow/pull/25147#issuecomment-1191341459

   (BTW. This option appears only after the whole workflow completes)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
argibbs commented on PR #25147:
URL: https://github.com/apache/airflow/pull/25147#issuecomment-1191346832

   Ah ty.
   
   I didn't see the drop down menu or retry button and I _thought_ I looked fairly thoroughly yesterday (builds failed on my first push too) ... but possible I checked before it was all done, or I missed it (the retry button in particular looks to maybe require you mouse over the right spot to find it). I will see how it goes and check if it fails again; I know what I'm looking for now. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25147:
URL: https://github.com/apache/airflow/pull/25147#issuecomment-1192942504

   > Good news / bad news: this fix works as intended, however, it simply clears the way for _another_ bug to manifest.
   > 
   > Just when I thought I was out, they drag me back in.
   > 
   > I'm working on a fix locally (again), will raise an MR when I've tested, etc. etc.
   
   Looking forward to it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25147:
URL: https://github.com/apache/airflow/pull/25147#discussion_r926379045


##########
airflow/dag_processing/manager.py:
##########
@@ -679,16 +679,37 @@ def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION):
             guard.commit()
 
     def _add_callback_to_queue(self, request: CallbackRequest):
-        self._callback_to_execute[request.full_filepath].append(request)
-        # Callback has a higher priority over DAG Run scheduling
-        if request.full_filepath in self._file_path_queue:
-            # Remove file paths matching request.full_filepath from self._file_path_queue
-            # Since we are already going to use that filepath to run callback,
-            # there is no need to have same file path again in the queue
-            self._file_path_queue = [
-                file_path for file_path in self._file_path_queue if file_path != request.full_filepath
-            ]
-        self._file_path_queue.insert(0, request.full_filepath)
+
+        # requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled
+        # task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives,
+        # goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to
+        # the front of the queue, and we never get round to picking stuff off the back of the queue
+        if isinstance(request, SlaCallbackRequest):
+            if request in self._callback_to_execute[request.full_filepath]:
+                self.log.debug("Skipping already queued SlaCallbackRequest")
+                return
+
+            # not already queued, queue the file _at the back_, and add the request to the file's callbacks
+            self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id)
+            self._callback_to_execute[request.full_filepath].append(request)
+            if request.full_filepath not in self._file_path_queue:
+                self._file_path_queue.append(request.full_filepath)
+
+        # Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if
+        # already in the queue
+        else:
+            self.log.debug(
+                "Queuing %s CallbackRequest: %s", type(request).__name__.rsplit('.', 1)[-1], request

Review Comment:
   small thing here. tTe rsplit will be evaluated every time you go trough that line - no matter debug level. We should avoid that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25147: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25147:
URL: https://github.com/apache/airflow/pull/25147#issuecomment-1191340999

   > NW, I'll wait for a second opinion.
   > 
   > (I need to tickle the build checks anyway to get the tests green). Possibly a n00b question, but there's no way to re-run a failed check other than force-pushing the last commit again?
   
   Do you see this https://github.blog/2022-03-16-save-time-partial-re-runs-github-actions/ - or is it maintainer-only feature (not sure)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org