You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/02 22:35:20 UTC

[GitHub] [airflow] argibbs opened a new pull request, #25489: Dag processor manager queue overhaul

argibbs opened a new pull request, #25489:
URL: https://github.com/apache/airflow/pull/25489

   # The "I'm determined to fix SLAs real good, fix 'em real good" MR
   
   OK, so #25147 made a start in this direction. Summing up the, er, summary from _that_ MR, the problem was that SLA callbacks could keep occuring, and prevent the dag processor manager from ever processing more than 2 or 3 dags in the queue before the SAL callbacks re-upped and went to the front of the queue.
   
   Under the new behaviour, the SLA callbacks went to the _back_ of the queue. This guaranteed that the queue would be processed at least once. However, it turns out that dags on disk would only be re-added to the queue _once the queue was empty_. But with SLA callbacks arriving all. the. time. the queue would never drain, and we'd never re-read dags from disk. So if you updated the dag file, you'd have to bounce the scheduler to pick up the change, and then it would process all non-SLA-generating DAGs exactly once. And then you'd need to bounce again.
   
   # I've almost certainily missed some steps.
   
   Before I go into a bit more detail about the change, I'd like to acknowledge that as a (very) small time contributor to the project, I'm not familiar with all the done things when making more radical changes. In particular, I assume there's more doc changes needed than just a newsfragment. I've added a config flag, some metrics, _and_ the behaviour of the queue processing has subtly changed (for the better I hope!)
   
   I'd very much appreciate someone(s) leaving a comment :point_down: telling me what else I need to do.
   
   # TL;DR
   
   I mean, I see your point. Skip to the bottom for the summary.
   
   # Pay attention, here comes the science bit
   
   Ok, so to briefly recap the queue behaviour prior to this change:
   1. The scheduler spins up.
   2. The worker loop starts.
   3. It scans the disk to create a list of all known files. _It does not add these files to the queue_.
   4. If the queue is empty (on start up it will be) then all known files are added to the queue.
   5. It then spins up dag processors (via multiprocessing or in-line, depending on set up) to consume the first N entries in the queue (where N is a function of config settings, and includes a count of any files currently being processed from a previous pass of the loop)
   6. It then goes back to step 2.
   
   - Note that Step 3 is re-run periodically based on a time interval set in config (`dag_dir_list_interval`). It does not require that the queue be empty. Any changes to the set of files on disk e.g. dags being deleted will cause the manager to remove dags from the queue.
   - After a few iterations of the loop, the contents of the queue are processed, and step 4 kicks in, and the dag files present on disk are re-added to the queue again.
       - The default config is to do this no more than once every 30 seconds (`min_file_process_interval`), so if your dag files only take 10 seconds to process, there will be 20 seconds of idle time, but if your dag files take a minute to process, then the manager will be permanently busy, because as soon as the queue drains, it'll be well past time to reload the queue from disk.
       - This only works if the queue _actually empties_ - as foreshadowed above, if the queue never empties (because SLAs) then step 4 will never kick in, and we'll never reload the queue. Step 3 will ensure that within a short interval any deleted dags are removed from the queue (should they happen to be one of the ones generating SLAs) but updated and newly added dag files will not be picked up.
       - To be clear, this is the problem I alluded to in my comments at the end of the previous MR after it got merged.
   
   Locally, I tested a hacky fix whereby on receipt of a SLA callback I still add the callback, but I simply _didn't add the dag to the queue_. This works, but means that SLA callbacks are only processed when the queue drains, and is reprocessed (because then all dags are added to the queue, guaranteeing that we will process any outstanding SLA callbacks). However, If someone has specified a large wait time between loading dags from disk, this will affect how timely the SLA alerts are, which is fine for me, because I don't do that, but I wasn't getting a "this will be fine for everyone" vibe from the change (I did say it was hacky!).
   
   Also, there's another catch. While the problem is much more prevalent with SLAs, these are not the only callbacks. I could envisage a situation where someone configures a dag with a very small interval (e.g. a dag run every 10 seconds). While I think this is a much more theoretical problem that might not ever exist in the wild - this isn't really the use case Airflow is intended for - the upshot would be that a dag generating lots of dag callbacks would be spamming the queue. And those callbacks still go to the _front_ of the queue, i.e. you're back to the situation I tried to solve in the previous MR!
   
   # I don't think that word means what you think it means
   
   I decided that fundamentally, part of the the problem was that the queue should be FIFO. And it wasn't. But if I made it FIFO, then the higher priority DAG callbacks would have to wait their turn behind the dag files loaded from disk, and I'm pretty sure stopping that would eliminate some of the speed-ups Airflow 2 was trumpeted as solving. Airflow 1 used to have on average a 15 second gap (= 30/2) between one task completing and the downstream tasks being scheduled, because once the task completed, you had to wait for the manager to drain the queue, add the files to the queue from disk, and then process the dag. (And that's assuming you could even process all your dags in &lt;30 seconds...)
   
   I didn't want to be the guy who accidentally breaks _that_ particular speed up. :scream: 
   
   So I did two things:
   
   ## Thing 1: Tackling the FIFO issue aka gazumping callbacks.
   1. I split the existing queue into two: a priority queue, and a standard queue. the priority queue is drained before the standard queue.
   2. You can probably guess what goes where, but: the dag callbacks go into the priority queue, while the SLAs callbacks and dags found on disk are added to the standard queue.
   3. **Both queues are FIFO.** This means that rapidly updating dags can't gazump other slightly-less-rapidly updating dags, because they go to the back. Of course, most of the time, there's only one dag with callbacks in the priority queue, and there's no material change in behaviour. 
   4. Hypothetically, the priority queue could be permanently busy. If this happens, it'll be like it was with the SLAs; the standard queue would never get a look-in, and we'd stop processing new/updated dags.
   5. So **I also added a new config param**, This is called `max_file_process_interval` and it's the dual to the existing `min_file_process_interval`. It guarantees that if you do happen to have a permanently busy priority queue, eventually we'll take a breather, and process the files on disk anyway.
   6. By default, this flag is set to 120 seconds. Setting it to zero disables it, i.e. you can have a permanently busy priority queue.
   7. I am acutely aware / wary of the fact that this separation-of-the-queues-plus-config-param is purely solving a problem that is (for me at least) purely a theoretically-possible issue. I've never seen it, but I don't think it's premature optimisation to tackle it in this change.
   
   ## Thing 2: Handling the fact that SLAs stop the standard queue from ever being empty.
   1. As noted ... somewhere ... above, a hacky fix was to simply stop adding dags to the queue on SLA callback. But this wasn't perfect for people who don't happen to use airflow the way I do.
   2. So, the first half of Thing 2 was to make the standard queue FIFO. This is effectively upholding the change from the previous MR; SLA callbacks simply get treated the same as dags loaded from disk, and have to wait their turn.
   3. The next bit was to then add a `set` which tracked which dag files in the queue were still outstanding from the last refresh from disk. Once the queue was refreshed from disk, we'd work through every file eventually (because FIFO), and at that point the set would be empty, even if the queue wasn't (because SLAs).
   4. Once the set was empty, we'd refresh the queue with dags found on disk.
   5. Some of those dags from disk _might already be in the queue due to SLAs_ but we don't care; this is a good thing as it means loading dags from disk doesn't push existing SLAs to the back of the line. 
   
   # Notes:
   
   This doesn't materially change _how_ SLAs work; they are generated and consumed the same as before. It just means that we reliably consume the alerts once generated without breaking the rest of the system. In my experience at least, adding SLAs would simply cause the system to stop processing dag updates.
   
   In particular, I don't address issues with SLA timestamps (as raised by #22532), nor do I deal with other problems (e.g. now SLAs fire reliably, I have noticed that they fire during catch-up, and that the same alert can fire multiple times).
   
   This is not because I think the current approach is correct (I Have Opinions) but rather it is a sad-but-true fact that I don't have the time to take on a big project (hence I will keep My Opinions to myself), so if they're minor enough to live with, I'm just going to live with them.
   
   # Summary: I'm from the UK; politely waiting in line is what we do best.
   
   - Separation of manager's `_file_path_queue` to `_std_file_path_queue` (for SLAs and dags loaded from disk) and `priority_file_path_queue` (for DAG call backs)
   - New `max_file_process_interval` config to ensure files are read from disk every so often, even if the priority queue is always busy
   - Dags from disk will be added to std queue even if the std queue is still busy with SLAs.
   - Some extra stats/metrics to track queue depths / progress
   - Tests updated to reflect changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on pull request #25489: Dag processor manager queue split (fixes SLAs)

Posted by GitBox <gi...@apache.org>.
argibbs commented on PR #25489:
URL: https://github.com/apache/airflow/pull/25489#issuecomment-1258284677

   Hiya, yes, I hadn't forgotten either. :smile: 
   
   Have just found some time to refresh this, and am sorting the rebase now. Will re-raise the dev-list email once that's done (and I'll spell your name right this time too!)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25489: Dag processor manager queue split (fixes SLAs)

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25489:
URL: https://github.com/apache/airflow/pull/25489#issuecomment-1205217323

   > Sounds good to me. I'm running with this locally, so I don't really mind about timelines. I'm just trying to give back (and also push upstream so I don't have to keep patching each time there's a release smile )
   
   Cool! This is the RIGHT approach! Much appreciated!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #25489: Dag processor manager queue split

Posted by GitBox <gi...@apache.org>.
argibbs commented on code in PR #25489:
URL: https://github.com/apache/airflow/pull/25489#discussion_r936122004


##########
airflow/dag_processing/manager.py:
##########
@@ -993,19 +1027,56 @@ def _create_process(file_path, pickle_dags, dag_ids, callback_requests):
             file_path=file_path, pickle_dags=pickle_dags, dag_ids=dag_ids, callback_requests=callback_requests
         )
 
+    def is_overdue(self, file_path: str, as_of: datetime) -> bool:
+        if self._max_file_process_interval <= 0:
+            return False
+        last_finish_time = self.get_last_finish_time(file_path)
+        return (
+            last_finish_time is not None
+            and (as_of - last_finish_time).total_seconds() > self._max_file_process_interval
+        )
+
     def start_new_processes(self):
         """Start more processors if we have enough slots and files to process"""
-        while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
-            file_path = self._file_path_queue.pop(0)
-            # Stop creating duplicate processor i.e. processor with the same filepath
+        # check for files stuck on the std queue for too long; these need to be processed first, as they
+        # suggest that we found files on disk a long time ago and still haven't processed it
+        now = timezone.utcnow()
+        overdue_std_files_list = [p for p in self._file_paths if self.is_overdue(p, now)]

Review Comment:
   Note we're checking all known files (`self._file_paths`) rather than the std queue. This is because if the priority queue is spinning, then the std queue won't empty, and won't reload from disk, and there might be files on disk not in the std queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #25489: Dag processor manager queue split

Posted by GitBox <gi...@apache.org>.
argibbs commented on code in PR #25489:
URL: https://github.com/apache/airflow/pull/25489#discussion_r936122004


##########
airflow/dag_processing/manager.py:
##########
@@ -993,19 +1027,56 @@ def _create_process(file_path, pickle_dags, dag_ids, callback_requests):
             file_path=file_path, pickle_dags=pickle_dags, dag_ids=dag_ids, callback_requests=callback_requests
         )
 
+    def is_overdue(self, file_path: str, as_of: datetime) -> bool:
+        if self._max_file_process_interval <= 0:
+            return False
+        last_finish_time = self.get_last_finish_time(file_path)
+        return (
+            last_finish_time is not None
+            and (as_of - last_finish_time).total_seconds() > self._max_file_process_interval
+        )
+
     def start_new_processes(self):
         """Start more processors if we have enough slots and files to process"""
-        while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
-            file_path = self._file_path_queue.pop(0)
-            # Stop creating duplicate processor i.e. processor with the same filepath
+        # check for files stuck on the std queue for too long; these need to be processed first, as they
+        # suggest that we found files on disk a long time ago and still haven't processed it
+        now = timezone.utcnow()
+        overdue_std_files_list = [p for p in self._file_paths if self.is_overdue(p, now)]

Review Comment:
   Note we're checking all known files (`self._file_paths`) rather than the std queue. This is because if the priority queue is spinning, then the std queue won't empty, and won't reload from disk, and there might be files on disk not in the std queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25489: Dag processor manager queue split (fixes SLAs)

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25489:
URL: https://github.com/apache/airflow/pull/25489#issuecomment-1229061586

   I think it is a bit risky for 2.4 - so this one might stay open for a while until we branch off for 2.4 (@argibbs - it's not forgotten I can assure you). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25489: Dag processor manager queue split (fixes SLAs)

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25489:
URL: https://github.com/apache/airflow/pull/25489#issuecomment-1205190941

   Thanks for so thorough description and your willingness to improve SLA (I really, really appreciate it as this is one of the things that we had on our backburner for quite a while).
   
   However, I think @argibbs this is the kind of change and desciption is such big and potential of breaking things is such big that it requires a devlist discussion IMHO to drag attention of people who should be dragged rather than dicussing it only in PR.  I know you wrote ("probably not make it in 2.3.4"), but I also think "probably not make it in 2.4.0 either" is a better assesment.
   
   I strongly suggest to make a small digest of the description you did - with most important parts of "why" and "how" extracted (less is more) and send it to the devlist https://airflow.apache.org/community/, with link to this PR and inviting people to express their thoughts.
   
   I think this is not going to be merged before 2.4 branch is cut. Too big risks, it needs to be likely very thoroughly tested and justified that we know what we are doing here) so there is stiil a lot of time to discuss it. 
   
   I also think some people who are now looking at finalizing [AIP-48](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling) so that it is ready for prime time for 2.4 (and who should be part of the discussiosn), have no time and energy to look at it. And we definiitely want to avoid of adding too many potentially high-impact changes in one release. We've already added too much in 2.3.0 and I think we learned a lesson there.
   
   So regardless of what you propose it might take a while
   
   I strongly encourage you to describe the case/problem/solution in concise way and send it to the devlist. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Jorricks commented on pull request #25489: Dag processor manager queue split (fixes SLAs)

Posted by GitBox <gi...@apache.org>.
Jorricks commented on PR #25489:
URL: https://github.com/apache/airflow/pull/25489#issuecomment-1209121259

   @argibbs these changes look great! Very well described as well.
   
   >  am acutely aware / wary of the fact that this separation-of-the-queues-plus-config-param is purely solving a problem that is (for me at least) purely a theoretically-possible issue. I've never seen it, but I don't think it's premature optimisation to tackle it in this change.
   
   We have had the case where because of a crucial singel point of failure, hardly any DAG was able to continue (as we use a lot of ExternalTaskSensors). This meant that all of our schedulers were down and handling SLA fires for a long time so I am very happy to see this optimisation 👍 
   
   ### Note: I don't think this should be included in this MR but given you worked on this, you probably have a good view on this.
   I was thinking. Part of what we can speed up is to instead of fire each SLA for the same DAG individually, group SLAs per DAG. Thereby, we add a dag with an SLA to the queue instead of an actual SLA. This prevents us from parsing a DAG file 10 times, for 10 different SLAs of the same DAG. I guess you could optimise this by including a dummy operator at the end that requires all the important tasks to succeed, but that's not super user friendly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25489: Dag processor manager queue split (fixes SLAs)

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25489:
URL: https://github.com/apache/airflow/pull/25489#issuecomment-1250386654

   Some rebase is needed after the __future__annotation change, but I think we are pretty close to the "focused" 2.4.0 release effort. I think this is a good time to rebase/fix and maybe re-raise a devlist discussion about it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on pull request #25489: Dag processor manager queue split (fixes SLAs)

Posted by GitBox <gi...@apache.org>.
argibbs commented on PR #25489:
URL: https://github.com/apache/airflow/pull/25489#issuecomment-1211303956

   @Jorricks thank you for the kind words.
   
   I agree, SLAs could/should be made more efficient. I found this change easier/simpler to make & test, possibly because it's a problem I've tackled many times in my "real" job. Also, as you note, I think this change defends against a range of possible problems, rather than just SLAs.
   
   I may try to improve SLA firing in the future, but I will be upfront about my motivations; I'm not looking to be "The SLA guy", as I simply don't have the spare time. If I end up with a load of spare time, that will doubtless change...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25489: Dag processor manager queue split (fixes SLAs)

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25489:
URL: https://github.com/apache/airflow/pull/25489#issuecomment-1205197801

   But I do feel that direction you took is sound. We need some prioritisation of callbacks. It just need deeper look and discussions that fits a PR opened maybe 2 weeks before substantial release which is already undergoing quite substantial testing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #25489: Dag processor manager queue overhaul

Posted by GitBox <gi...@apache.org>.
argibbs commented on code in PR #25489:
URL: https://github.com/apache/airflow/pull/25489#discussion_r936094863


##########
newsfragments/25489.bugfix.rst:
##########
@@ -0,0 +1 @@
+DAGProcessorManager queue made fully FIFO, split into two (priority and std), and dags guaranteed to be read from disk periodically (fixes/improves SLA alert reliability)

Review Comment:
   I suspect "bugfix" undersells the change. As per PR description, will happily change the classification to whatever I'm told is the right one to use for a change like this.



##########
airflow/config_templates/config.yml:
##########
@@ -1913,6 +1913,21 @@
       type: string
       example: ~
       default: "30"
+    - name: max_file_process_interval
+      description: |
+        Number of seconds since the previous parse time after which a DAG file will be parsed
+        again. This is only useful if your system contains a lot of dags / slas which update
+        exceedingly frequently, as this may prevent the system from scanning the file system
+        for changes to your dag files. This operates on a best efforts basis; if you have
+        many dags, and it takes 10 minutes to scan and parse them all, this will not make
+        it parse any more often.
+        Setting this to <= 0 disables the behaviour, in case it's important to you that those
+        frequently updating dags / slas always take priority at the cost of delaying updates
+        from disk
+      version_added: 2.4.0

Review Comment:
   I mean, I'm assuming that this PR will be "subject to discussion" to put it mildly and has no chance of making 2.3.4. If it sails through, I'll happily correct this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #25489: Dag processor manager queue split

Posted by GitBox <gi...@apache.org>.
argibbs commented on code in PR #25489:
URL: https://github.com/apache/airflow/pull/25489#discussion_r936119659


##########
airflow/dag_processing/manager.py:
##########
@@ -600,11 +620,14 @@ def _run_parsing_loop(self):
 
             self._kill_timed_out_processors()
 
-            # Generate more file paths to process if we processed all the files
-            # already.
-            if not self._file_path_queue:
+            # Generate more file paths to process if we processed all the files already. Note for this
+            # to clear down, we must have first either:
+            # 1. cleared the priority queue then cleared this set while draining the std queue, or
+            # 2. been unable to clear the priority queue, hit max_file_process_interval, and drained the set
+            #    while clearing overdue files
+            if not self._outstanding_std_file_paths:

Review Comment:
   This is a key change as mentioned in the PR description: we refresh files from disk once the _`set`_ is empty. The queue might still have SLA callbacks in it, but that shouldn't stop us refreshing from disk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #25489: Dag processor manager queue overhaul

Posted by GitBox <gi...@apache.org>.
argibbs commented on code in PR #25489:
URL: https://github.com/apache/airflow/pull/25489#discussion_r936089352


##########
airflow/dag_processing/manager.py:
##########
@@ -378,8 +378,17 @@ def __init__(
         async_mode: bool = True,
     ):
         super().__init__()
+        self._log = logging.getLogger('airflow.processor_manager')

Review Comment:
   I moved this up from further down `__init__` because I added a log message which came before it in the old location.
   
   It seems ... weird ... that this doesn't just use the logging mixin. Is this an artefact from Ye Olden Times?



##########
airflow/dag_processing/manager.py:
##########
@@ -378,8 +378,17 @@ def __init__(
         async_mode: bool = True,
     ):
         super().__init__()
+        self._log = logging.getLogger('airflow.processor_manager')
+
+        # known files; this will be updated every `dag_dir_list_interval` and stuff added/removed accordingly
         self._file_paths: List[str] = []
-        self._file_path_queue: List[str] = []
+
+        # we maintain 2 queues: stuff requiring rapid response due to scheduler updates, and stuff that
+        # should be serviced once the priority stuff has all been worked through, e.g. periodic dir scans
+        # additionally there's a set to track which files on disk still haven't been refreshed yet
+        self._priority_file_path_queue: Deque[str] = deque()

Review Comment:
   I changed these from `list`s to `deque`s because teeeeechnically it's more efficient.
   
   Also, I like seeing people try to work out how to pronounce 'deque'.



##########
airflow/dag_processing/manager.py:
##########
@@ -539,7 +559,7 @@ def _run_parsing_loop(self):
             poll_time = None
 
         self._refresh_dag_dir()
-        self.prepare_file_path_queue()
+        self.populate_std_file_queue_from_dir()

Review Comment:
   Same method, different name. I thought this was more descriptive.



##########
tests/dag_processing/test_manager.py:
##########
@@ -299,9 +299,9 @@ def test_file_paths_in_queue_sorted_alphabetically(
         )
 
         manager.set_file_paths(dag_files)
-        assert manager._file_path_queue == []
-        manager.prepare_file_path_queue()
-        assert manager._file_path_queue == ['file_1.py', 'file_2.py', 'file_3.py', 'file_4.py']
+        assert list(manager._std_file_path_queue) == []

Review Comment:
   Have to convert all the `deque`s to lists. This is clunky.



##########
airflow/dag_processing/manager.py:
##########
@@ -921,8 +944,18 @@ def set_file_paths(self, new_file_paths):
         :param new_file_paths: list of paths to DAG definition files
         :return: None
         """
+        # store the new paths
         self._file_paths = new_file_paths
-        self._file_path_queue = [x for x in self._file_path_queue if x in new_file_paths]
+
+        # clean up the queues; remove anything queued which no longer in the list, including callbacks
+        self._priority_file_path_queue = deque(
+            x for x in self._priority_file_path_queue if x in new_file_paths
+        )
+        self._std_file_path_queue = deque(x for x in self._std_file_path_queue if x in new_file_paths)
+        callback_paths_to_del = list(x for x in self._callback_to_execute.keys() if x not in new_file_paths)
+        for path_to_del in callback_paths_to_del:

Review Comment:
   This more thorough clean-up is new. Having the old callbacks lying around was a form of memory leak, but in practice not a serious one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on pull request #25489: Dag processor manager queue split (fixes SLAs)

Posted by GitBox <gi...@apache.org>.
argibbs commented on PR #25489:
URL: https://github.com/apache/airflow/pull/25489#issuecomment-1205215917

   Sounds good to me. I'm running with this locally, so I don't really mind about timelines. I'm just trying to give back (and also push upstream so I don't have to keep patching each time there's a release :smile: )
   
   Will do as you suggested re: the digest etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on pull request #25489: Dag processor manager queue overhaul

Posted by GitBox <gi...@apache.org>.
argibbs commented on PR #25489:
URL: https://github.com/apache/airflow/pull/25489#issuecomment-1203281610

   @potiuk you may also be interested in this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on pull request #25489: Dag processor manager queue split (fixes SLAs)

Posted by GitBox <gi...@apache.org>.
argibbs commented on PR #25489:
URL: https://github.com/apache/airflow/pull/25489#issuecomment-1258291307

   Hiya, yes, I hadn't forgotten either. :smile: 
   
   Have just found some time to refresh this, and am sorting the rebase now. Will re-raise the dev-list email once that's done (and I'll spell your name right this time too!)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs closed pull request #25489: Dag processor manager queue split (fixes SLAs)

Posted by GitBox <gi...@apache.org>.
argibbs closed pull request #25489: Dag processor manager queue split (fixes SLAs)
URL: https://github.com/apache/airflow/pull/25489


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on pull request #25489: Dag processor manager queue split (fixes SLAs)

Posted by GitBox <gi...@apache.org>.
argibbs commented on PR #25489:
URL: https://github.com/apache/airflow/pull/25489#issuecomment-1293332170

   I have no idea why, but after rebasing, github seems convinced there's no changed in the branch, and has closed and won't let me reopen it ... I've created a new PR (from the same branch - see #27317 - it's the exact same change)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org