You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/07 19:20:49 UTC

[GitHub] [airflow] SamWheating opened a new pull request #21399: (WIP) Reduce DB load incurred by Stale DAG deactivation

SamWheating opened a new pull request #21399:
URL: https://github.com/apache/airflow/pull/21399


   Re: https://github.com/apache/airflow/issues/21397
   
   By moving this logic into the `DagFileProcessorManager` and running it across all processed file periodically, we can prevent the use of un-indexed queries. 
   
   The basic logic is that we can look at the last processed time of a file (for a given processor) and compare that to the `last_parsed_time` of an entry in the `dag` table. If the file has been processed significantly more recently than the DAG has been updated, then its safe to assume that the DAG is missing and can be marked inactive.
   
   ### Todo:
   
   - [ ] Improve test coverage
   - [ ] Exposed new tuneable parameters in the config


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on pull request #21399: Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
SamWheating commented on pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#issuecomment-1039715500


   OK, this is now ready for a proper review - I will patch this into our production 2.2.2 container sometime this week and confirm that it fixes the original performance issue while still managing to clean up stale DAGs. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #21399: Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#issuecomment-1046996015


   @SamWheating Did you manage to get this running in prod?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on pull request #21399: Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
SamWheating commented on pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#issuecomment-1048297282


   Ok, I have created a patched version of Airflow 2.2.2 with this change and deployed it in our prod-scale staging environment (Airflow 2.2.2). I can confirm that:
   
    - DB CPU utilization and Queries/second is approximately the same before and after the change
    - DAGs are correctly cleaned up after being removed from a file (this takes longer than it did with the previous change, but its eventually consistent)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #21399: (WIP) Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#discussion_r802157302



##########
File path: airflow/dag_processing/manager.py
##########
@@ -503,6 +507,40 @@ def start(self):
 
         return self._run_parsing_loop()
 
+    @provide_session
+    def _deactivate_stale_dags(self, session=None):
+        now = timezone.utcnow()
+        elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
+        if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
+            last_parsed = {
+                fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
+            }
+            to_deactivate = set()
+            dags_parsed = (
+                session.query(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time)
+                .filter(DagModel.is_active)
+                .all()
+            )
+            for dag in dags_parsed:
+                if (
+                    dag.fileloc in last_parsed
+                    and (dag.last_parsed_time + timedelta(seconds=self._processor_timeout))

Review comment:
       Yup, will do (probably won't have time to clean up this PR until next week though)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #21399: (WIP) Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#discussion_r806346597



##########
File path: tests/dag_processing/test_manager.py
##########
@@ -570,7 +618,7 @@ def fake_processor_(*args, **kwargs):
             manager = DagFileProcessorManager(
                 dag_directory=test_dag_path,
                 max_runs=1,
-                processor_timeout=timedelta.max,
+                processor_timeout=timedelta(hours=10),

Review comment:
       Setting this value to `max` caused issues due to the following line of code, which led to an overflow:
   ```python
                       and (dag.last_parsed_time + self._processor_timeout) < last_parsed[dag.fileloc]
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #21399: Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#issuecomment-1054264191


   @jedcunningham I've marked this for possible inclusion in 2.2.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21399: (WIP) Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#discussion_r802100433



##########
File path: airflow/dag_processing/manager.py
##########
@@ -503,6 +507,40 @@ def start(self):
 
         return self._run_parsing_loop()
 
+    @provide_session
+    def _deactivate_stale_dags(self, session=None):
+        now = timezone.utcnow()
+        elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
+        if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
+            last_parsed = {
+                fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
+            }
+            to_deactivate = set()
+            dags_parsed = (
+                session.query(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time)
+                .filter(DagModel.is_active)
+                .all()
+            )
+            for dag in dags_parsed:
+                if (
+                    dag.fileloc in last_parsed
+                    and (dag.last_parsed_time + timedelta(seconds=self._processor_timeout))

Review comment:
       Ohhhhh! Right yeah that makes sense. Could you try and distil some of this down to a short comment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21399: (WIP) Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#discussion_r801455157



##########
File path: airflow/dag_processing/manager.py
##########
@@ -503,6 +507,40 @@ def start(self):
 
         return self._run_parsing_loop()
 
+    @provide_session
+    def _deactivate_stale_dags(self, session=None):
+        now = timezone.utcnow()
+        elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
+        if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
+            last_parsed = {
+                fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
+            }
+            to_deactivate = set()
+            dags_parsed = (
+                session.query(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time)
+                .filter(DagModel.is_active)
+                .all()
+            )
+            for dag in dags_parsed:
+                if (
+                    dag.fileloc in last_parsed
+                    and (dag.last_parsed_time + timedelta(seconds=self._processor_timeout))

Review comment:
       This feels like the wrong timeout to use -- processor timeout is how long each file should take to process:
   
   ```
   # How long before timing out a DagFileProcessor, which processes a dag file
   dag_file_processor_timeout = 50
   ```
   
   But that doesn't mean that every dag file should be "reparsed" every 50 seconds




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #21399: (WIP) Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#discussion_r801881889



##########
File path: airflow/dag_processing/manager.py
##########
@@ -503,6 +507,40 @@ def start(self):
 
         return self._run_parsing_loop()
 
+    @provide_session
+    def _deactivate_stale_dags(self, session=None):
+        now = timezone.utcnow()
+        elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
+        if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
+            last_parsed = {
+                fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
+            }
+            to_deactivate = set()
+            dags_parsed = (
+                session.query(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time)
+                .filter(DagModel.is_active)
+                .all()
+            )
+            for dag in dags_parsed:
+                if (
+                    dag.fileloc in last_parsed
+                    and (dag.last_parsed_time + timedelta(seconds=self._processor_timeout))

Review comment:
       So there's actually a reason for this. 
   
   We're comparing the parse time as reported by the processor manager to the last_parsed_time as seen in the DAG table, however these values are taken independently:
   
   `DagModel.last_parsed_time` is decided here, when the DAG is written to the DB:
   https://github.com/apache/airflow/blob/960f573615b5357677c10bd9f7ec11811a0355c6/airflow/models/dag.py#L2427
   
   whereas the `DagParsingStat.last_finish_time` is decided when the file processor finishes:
   https://github.com/apache/airflow/blob/dbe723da95143f6d33e5d2594bc2017c4164e687/airflow/dag_processing/manager.py#L915
   
   So because of this, `DagParsingStat.last_finish_time` is always going to be _slightly_ later than `DagModel.last_parsed_time` (typically on the order of milliseconds). Thus in order to be certain that the file was processed more recently than the DAG was last observed we can't directly compare the two timestamps and instead have to do something like:
   
   ```
   DagParsingStat.last_finish_time > (SOME_BUFFER + DagModel.last_parsed_time)
   ```
   
   I chose to use the `processor_timeout` here because it represents the absolute upper bound on the difference between `DagParsingStat.last_finish_time` and ` DagModel.last_parsed_time`, and thus we favour false negatives (not deactivating a DAG which is actually gone) over false positives (incorrectly deactivating a DAG because the file processor was blocking for a few seconds after updating the DB)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on pull request #21399: Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
SamWheating commented on pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#issuecomment-1039715500


   OK, this is now ready for a proper review - I will patch this into our production 2.2.2 container sometime this week and confirm that it fixes the original performance issue while still managing to clean up stale DAGs. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #21399: (WIP) Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#discussion_r801881889



##########
File path: airflow/dag_processing/manager.py
##########
@@ -503,6 +507,40 @@ def start(self):
 
         return self._run_parsing_loop()
 
+    @provide_session
+    def _deactivate_stale_dags(self, session=None):
+        now = timezone.utcnow()
+        elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
+        if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
+            last_parsed = {
+                fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
+            }
+            to_deactivate = set()
+            dags_parsed = (
+                session.query(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time)
+                .filter(DagModel.is_active)
+                .all()
+            )
+            for dag in dags_parsed:
+                if (
+                    dag.fileloc in last_parsed
+                    and (dag.last_parsed_time + timedelta(seconds=self._processor_timeout))

Review comment:
       So there's actually a reason for this. 
   
   We're comparing the parse time as reported by the processor manager to the last_parsed_time as seen in the DAG table, however these values are taken independently:
   
   `DagModel.last_parsed_time` is decided here, when the DAG is written to the DB:
   https://github.com/apache/airflow/blob/960f573615b5357677c10bd9f7ec11811a0355c6/airflow/models/dag.py#L2427
   
   whereas the `DagParsingStat.last_finish_time` is decided when the file processor finishes:
   https://github.com/apache/airflow/blob/dbe723da95143f6d33e5d2594bc2017c4164e687/airflow/dag_processing/manager.py#L915
   
   So because of this, `DagParsingStat.last_finish_time` is always going to be _slightly_ later than `DagModel.last_parsed_time` (typically on the order of milliseconds). Thus in order to be certain that the file was processed more recently than the DAG was last observed we can't directly compare the two timestamps and instead have to do something like:
   
   ```
   DagParsingStat.last_finish_time > (SOME_BUFFER + DagModel.last_parsed_time)
   ```
   
   I chose to use the `processor_timeout` here because it represents the absolute upper bound on the difference between `DagParsingStat.last_finish_time` and ` DagModel.last_parsed_time`, and thus we favour false negatives (not deactivating a DAG which is actually gone) over false positives (incorrectly deactivating a DAG because the file processor was blocking for a few seconds after updating the DB)
   
   Let me know what you think - from my testing in `breeze` this approach appears to work reliably, but it also adds a lot of complexity.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #21399: (WIP) Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#discussion_r806346597



##########
File path: tests/dag_processing/test_manager.py
##########
@@ -570,7 +618,7 @@ def fake_processor_(*args, **kwargs):
             manager = DagFileProcessorManager(
                 dag_directory=test_dag_path,
                 max_runs=1,
-                processor_timeout=timedelta.max,
+                processor_timeout=timedelta(hours=10),

Review comment:
       Setting this value to `max` caused issues due to the following line of code, which led to an overflow:
   ```python
                       and (dag.last_parsed_time + self._processor_timeout) < last_parsed[dag.fileloc]
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #21399: Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #21399:
URL: https://github.com/apache/airflow/pull/21399


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on pull request #21399: Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
SamWheating commented on pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#issuecomment-1047060834


   Not yet, I've built a patched version of 2.2.2 with this change but haven't had a chance to roll it out in any large-scale environments.
   
   
   Will do it tomorrow and report back wednesday.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #21399: Reduce DB load incurred by Stale DAG deactivation

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#issuecomment-1052505270


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org