You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/08 17:23:17 UTC

[GitHub] [airflow] SamWheating commented on a change in pull request #21399: (WIP) Reduce DB load incurred by Stale DAG deactivation

SamWheating commented on a change in pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#discussion_r801881889



##########
File path: airflow/dag_processing/manager.py
##########
@@ -503,6 +507,40 @@ def start(self):
 
         return self._run_parsing_loop()
 
+    @provide_session
+    def _deactivate_stale_dags(self, session=None):
+        now = timezone.utcnow()
+        elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
+        if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
+            last_parsed = {
+                fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
+            }
+            to_deactivate = set()
+            dags_parsed = (
+                session.query(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time)
+                .filter(DagModel.is_active)
+                .all()
+            )
+            for dag in dags_parsed:
+                if (
+                    dag.fileloc in last_parsed
+                    and (dag.last_parsed_time + timedelta(seconds=self._processor_timeout))

Review comment:
       So there's actually a reason for this. 
   
   We're comparing the parse time as reported by the processor manager to the last_parsed_time as seen in the DAG table, however these values are taken independently:
   
   `DagModel.last_parsed_time` is decided here, when the DAG is written to the DB:
   https://github.com/apache/airflow/blob/960f573615b5357677c10bd9f7ec11811a0355c6/airflow/models/dag.py#L2427
   
   whereas the `DagParsingStat.last_finish_time` is decided when the file processor finishes:
   https://github.com/apache/airflow/blob/dbe723da95143f6d33e5d2594bc2017c4164e687/airflow/dag_processing/manager.py#L915
   
   So because of this, `DagParsingStat.last_finish_time` is always going to be _slightly_ later than `DagModel.last_parsed_time` (typically on the order of milliseconds). Thus in order to be certain that the file was processed more recently than the DAG was last observed we can't directly compare the two timestamps and instead have to do something like:
   
   ```
   DagParsingStat.last_finish_time > (SOME_BUFFER + DagModel.last_parsed_time)
   ```
   
   I chose to use the `processor_timeout` here because it represents the absolute upper bound on the difference between `DagParsingStat.last_finish_time` and ` DagModel.last_parsed_time`, and thus we favour false negatives (not deactivating a DAG which is actually gone) over false positives (incorrectly deactivating a DAG because the file processor was blocking for a few seconds after updating the DB)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org