You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/22 19:16:40 UTC

[airflow] 29/31: Reduce DB load incurred by Stale DAG deactivation (#21399)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 533dad73aabfdf06f12e74e18328ed6a5491f559
Author: Sam Wheating <sa...@shopify.com>
AuthorDate: Sun Mar 20 00:17:42 2022 -0700

    Reduce DB load incurred by Stale DAG deactivation (#21399)
    
    Deactivating stale DAGs periodically in bulk
    
    By moving this logic into the DagFileProcessorManager and running it across all processed file periodically, we can prevent the use of un-indexed queries.
    
    The basic logic is that we can look at the last processed time of a file (for a given processor) and compare that to the last_parsed_time of an entry in the dag table. If the file has been processed significantly more recently than the DAG has been updated, then its safe to assume that the DAG is missing and can be marked inactive.
    
    (cherry picked from commit f309ea78f7d8b62383bc41eac217681a0916382b)
---
 airflow/config_templates/config.yml          |  8 +++++
 airflow/config_templates/default_airflow.cfg |  4 +++
 airflow/dag_processing/manager.py            | 43 ++++++++++++++++++++++++-
 airflow/dag_processing/processor.py          | 11 -------
 tests/dag_processing/test_manager.py         | 48 +++++++++++++++++++++++++++-
 tests/dag_processing/test_processor.py       | 25 ---------------
 6 files changed, 101 insertions(+), 38 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 1e77041..cc5817a 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1767,6 +1767,14 @@
       type: string
       example: ~
       default: "30"
+    - name: deactivate_stale_dags_interval
+      description: |
+        How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
+        the expected files) which should be deactivated.
+      version_added: 2.3.0
+      type: integer
+      example: ~
+      default: "60"
     - name: dag_dir_list_interval
       description: |
         How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 826eaf4..8bd20ed 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -886,6 +886,10 @@ scheduler_idle_sleep_time = 1
 # this interval. Keeping this number low will increase CPU usage.
 min_file_process_interval = 30
 
+# How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
+# the expected files) which should be deactivated.
+deactivate_stale_dags_interval = 60
+
 # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
 dag_dir_list_interval = 300
 
diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 6c78aa6..315a3a5 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -468,9 +468,12 @@ class DagFileProcessorManager(LoggingMixin):
         self.last_stat_print_time = 0
         # TODO: Remove magic number
         self._zombie_query_interval = 10
+        # Last time we cleaned up DAGs which are no longer in files
+        self.last_deactivate_stale_dags_time = timezone.make_aware(datetime.fromtimestamp(0))
+        # How often to check for DAGs which are no longer in files
+        self.deactivate_stale_dags_interval = conf.getint('scheduler', 'deactivate_stale_dags_interval')
         # How long to wait before timing out a process to parse a DAG file
         self._processor_timeout = processor_timeout
-
         # How often to scan the DAGs directory for new files. Default to 5 minutes.
         self.dag_dir_list_interval = conf.getint('scheduler', 'dag_dir_list_interval')
 
@@ -519,6 +522,43 @@ class DagFileProcessorManager(LoggingMixin):
 
         return self._run_parsing_loop()
 
+    @provide_session
+    def _deactivate_stale_dags(self, session=None):
+        """Detects DAGs which are no longer present in files and deactivate them."""
+        now = timezone.utcnow()
+        elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
+        if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
+            last_parsed = {
+                fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
+            }
+            to_deactivate = set()
+            dags_parsed = (
+                session.query(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time)
+                .filter(DagModel.is_active)
+                .all()
+            )
+            for dag in dags_parsed:
+                # The largest valid difference between a DagFileStat's last_finished_time and a DAG's
+                # last_parsed_time is _processor_timeout. Longer than that indicates that the DAG is
+                # no longer present in the file.
+                if (
+                    dag.fileloc in last_parsed
+                    and (dag.last_parsed_time + self._processor_timeout) < last_parsed[dag.fileloc]
+                ):
+                    self.log.info(f"DAG {dag.dag_id} is missing and will be deactivated.")
+                    to_deactivate.add(dag.dag_id)
+
+            if to_deactivate:
+                deactivated = (
+                    session.query(DagModel)
+                    .filter(DagModel.dag_id.in_(to_deactivate))
+                    .update({DagModel.is_active: False}, synchronize_session="fetch")
+                )
+                if deactivated:
+                    self.log.info("Deactivated %i DAGs which are no longer present in file.", deactivated)
+
+            self.last_deactivate_stale_dags_time = timezone.utcnow()
+
     def _run_parsing_loop(self):
 
         # In sync mode we want timeout=None -- wait forever until a message is received
@@ -581,6 +621,7 @@ class DagFileProcessorManager(LoggingMixin):
                 self.waitables.pop(sentinel)
                 self._processors.pop(processor.file_path)
 
+            self._deactivate_stale_dags()
             self._refresh_dag_dir()
             self._find_zombies()
 
diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py
index e59c818..a6cf372 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -648,8 +648,6 @@ class DagFileProcessor(LoggingMixin):
             Stats.incr('dag_file_refresh_error', 1, 1)
             return 0, 0
 
-        self._deactivate_missing_dags(session, dagbag, file_path)
-
         if len(dagbag.dags) > 0:
             self.log.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path)
         else:
@@ -679,12 +677,3 @@ class DagFileProcessor(LoggingMixin):
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
-
-    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
-        deactivated = (
-            session.query(DagModel)
-            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
-            .update({DagModel.is_active: False}, synchronize_session="fetch")
-        )
-        if deactivated:
-            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index b549b2b..abba415 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -465,7 +465,6 @@ class TestDagFileProcessorManager:
             pickle_dags=False,
             async_mode=True,
         )
-
         dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
         with create_session() as session:
             session.query(LJ).delete()
@@ -596,6 +595,53 @@ class TestDagFileProcessorManager:
             child_pipe.close()
             parent_pipe.close()
 
+    def test_deactivate_stale_dags(self):
+        """
+        Ensure that DAGs are marked inactive when the file is parsed but the
+        DagModel.last_parsed_time is not updated.
+        """
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            max_runs=1,
+            processor_timeout=timedelta(minutes=10),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+        test_dag_path = str(TEST_DAG_FOLDER / 'test_example_bash_operator.py')
+        dagbag = DagBag(test_dag_path, read_dags_from_db=False)
+
+        with create_session() as session:
+            # Add stale DAG to the DB
+            dag = dagbag.get_dag('test_example_bash_operator')
+            dag.last_parsed_time = timezone.utcnow()
+            dag.sync_to_db()
+
+            # Add DAG to the file_parsing_stats
+            stat = DagFileStat(
+                num_dags=1,
+                import_errors=0,
+                last_finish_time=timezone.utcnow() + timedelta(hours=1),
+                last_duration=1,
+                run_count=1,
+            )
+            manager._file_paths = [test_dag_path]
+            manager._file_stats[test_dag_path] = stat
+
+            active_dags = (
+                session.query(DagModel).filter(DagModel.is_active, DagModel.fileloc == test_dag_path).all()
+            )
+            assert len(active_dags) == 1
+
+            manager._file_stats[test_dag_path] = stat
+            manager._deactivate_stale_dags()
+            active_dags = (
+                session.query(DagModel).filter(DagModel.is_active, DagModel.fileloc == test_dag_path).all()
+            )
+
+            assert len(active_dags) == 0
+
     @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
     @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.kill")
     def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
index c9ecfb0..c0d2267 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -700,31 +700,6 @@ class TestDagFileProcessor:
             assert import_error.stacktrace == expected_stacktrace.format(invalid_dag_filename)
             session.rollback()
 
-    def test_process_file_should_deactivate_missing_dags(self):
-
-        dag_file = os.path.join(
-            os.path.dirname(os.path.realpath(__file__)), '../dags/test_only_dummy_tasks.py'
-        )
-
-        # write a DAG into the DB which is not present in its specified file
-        with create_session() as session:
-            orm_dag = DagModel(dag_id='missing_dag', is_active=True, fileloc=dag_file)
-            session.merge(orm_dag)
-            session.commit()
-
-        session = settings.Session()
-
-        dags = session.query(DagModel).all()
-        assert [dag.dag_id for dag in dags if dag.is_active] == ['missing_dag']
-
-        # re-parse the file and see that the DAG is no longer there
-        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
-        dag_file_processor.process_file(dag_file, [])
-
-        dags = session.query(DagModel).all()
-        assert [dag.dag_id for dag in dags if dag.is_active] == ['test_only_dummy_tasks']
-        assert [dag.dag_id for dag in dags if not dag.is_active] == ['missing_dag']
-
 
 class TestProcessorAgent:
     @pytest.fixture(autouse=True)