You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ephraimbuddy (via GitHub)" <gi...@apache.org> on 2023/02/09 19:21:38 UTC

[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #29446: Scheduler, make stale DAG deactivation threshold configurable instead of using dag processing timeout

ephraimbuddy commented on code in PR #29446:
URL: https://github.com/apache/airflow/pull/29446#discussion_r1101909468


##########
airflow/config_templates/default_airflow.cfg:
##########
@@ -1045,8 +1045,16 @@ min_file_process_interval = 30
 # referenced and should be marked as orphaned.
 parsing_cleanup_interval = 60
 
+# How long (in seconds) to wait after we've reparsed a DAG file before deactivating stale
+# DAGs (DAGs which are no longer present in the expected files). The reason why we need
+# this threshold is to account for the time between when the file is parsed and when the
+# DAG is loaded. The absolute maximum that this could take is `dag_file_processor_timeout`,
+# but when you have a long timeout configured, it results in a significant delay in the
+# deactivation of stale dags.
+stale_dag_threshold = 30
+
 # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
-dag_dir_list_interval = 300
+dag_dir_list_interval = 10

Review Comment:
   Why this change?



##########
airflow/dag_processing/manager.py:
##########
@@ -433,6 +433,8 @@ def __init__(
         self.last_deactivate_stale_dags_time = timezone.make_aware(datetime.fromtimestamp(0))
         # How often to check for DAGs which are no longer in files
         self.parsing_cleanup_interval = conf.getint("scheduler", "parsing_cleanup_interval")
+        # How long to wait for a DAG to be reparsed after it's file has been parsed before disabling

Review Comment:
   ```suggestion
           # How long to wait for a DAG to be reparsed after its file has been parsed before disabling
   ```



##########
airflow/config_templates/config.yml:
##########
@@ -2049,6 +2049,18 @@ scheduler:
       type: integer
       example: ~
       default: "60"
+    stale_dag_threshold:
+      description: |
+        How long (in seconds) to wait after we've reparsed a DAG file before deactivating stale
+        DAGs (DAGs which are no longer present in the expected files). The reason why we need
+        this threshold is to account for the time between when the file is parsed and when the
+        DAG is loaded. The absolute maximum that this could take is `dag_file_processor_timeout`,
+        but when you have a long timeout configured, it results in a significant delay in the
+        deactivation of stale dags.
+      version_added: 2.6.0
+      type: integer
+      example: ~
+      default: "30"

Review Comment:
   Should we keep this the same default as `dag_file_processor_timeout`? That way, there will be no change in behaviour



##########
airflow/dag_processing/manager.py:
##########
@@ -523,13 +525,16 @@ def deactivate_stale_dags(
             query = query.filter(DagModel.processor_subdir == dag_directory)
         dags_parsed = query.all()
 
+
+
         for dag in dags_parsed:
             # The largest valid difference between a DagFileStat's last_finished_time and a DAG's
-            # last_parsed_time is _processor_timeout. Longer than that indicates that the DAG is
-            # no longer present in the file.
+            # last_parsed_time is thg processor_timeout. Longer than that indicates that the DAG is

Review Comment:
   ```suggestion
               # last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org