You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bi...@apache.org on 2021/08/23 16:18:44 UTC

[airflow] branch main updated: Fix dag_processing.last_duration metric random holes (#17769)

This is an automated email from the ASF dual-hosted git repository.

binh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a929f7  Fix dag_processing.last_duration metric random holes (#17769)
1a929f7 is described below

commit 1a929f73398bdd201c8c92a08b398b41f9c2f591
Author: Vadim <va...@gmail.com>
AuthorDate: Mon Aug 23 19:17:25 2021 +0300

    Fix dag_processing.last_duration metric random holes (#17769)
    
    * Fix dag_processing.last_duration metric random holes
    
    * Fix test
    
    * Fix mssql+sqlite test
    
    * move dag_processing.last_duration timing to _collect_results_from_processor
---
 airflow/dag_processing/manager.py    |  5 +++--
 tests/dag_processing/test_manager.py | 35 +++++++++++++++++++++++++++++++++++
 2 files changed, 38 insertions(+), 2 deletions(-)

diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index a38b68f..b235250 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -724,8 +724,6 @@ class DagFileProcessorManager(LoggingMixin):
             if last_run:
                 seconds_ago = (now - last_run).total_seconds()
                 Stats.gauge(f'dag_processing.last_run.seconds_ago.{file_name}', seconds_ago)
-            if runtime:
-                Stats.timing(f'dag_processing.last_duration.{file_name}', runtime)
 
             rows.append((file_path, processor_pid, runtime, num_dags, num_errors, last_runtime, last_run))
 
@@ -893,6 +891,9 @@ class DagFileProcessorManager(LoggingMixin):
         )
         self._file_stats[processor.file_path] = stat
 
+        file_name = os.path.splitext(os.path.basename(processor.file_path))[0].replace(os.sep, '.')
+        Stats.timing(f'dag_processing.last_duration.{file_name}', stat.last_duration)
+
     def collect_results(self) -> None:
         """Collect the result from any finished DAG processors"""
         ready = multiprocessing.connection.wait(self.waitables.keys() - [self._signal_conn], timeout=0)
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index 8d1c4f3..ac2d4dc 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -27,6 +27,7 @@ import threading
 import unittest
 from datetime import datetime, timedelta
 from tempfile import TemporaryDirectory
+from textwrap import dedent
 from unittest import mock
 from unittest.mock import MagicMock, PropertyMock
 
@@ -694,6 +695,40 @@ class TestDagFileProcessorManager:
             child_pipe.close()
             thread.join(timeout=1.0)
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @mock.patch('airflow.dag_processing.manager.Stats.timing')
+    def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir):
+        filename_to_parse = tmpdir / 'temp_dag.py'
+        dag_code = dedent(
+            """
+        from airflow import DAG
+        dag = DAG(dag_id='temp_dag', schedule_interval='0 0 * * *')
+        """
+        )
+        with open(filename_to_parse, 'w') as file_to_parse:
+            file_to_parse.writelines(dag_code)
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
+        manager = DagFileProcessorManager(
+            dag_directory=tmpdir,
+            max_runs=1,
+            processor_timeout=timedelta.max,
+            signal_conn=child_pipe,
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=async_mode,
+        )
+
+        self.run_processor_manager_one_loop(manager, parent_pipe)
+        last_runtime = manager.get_last_runtime(manager.file_paths[0])
+
+        child_pipe.close()
+        parent_pipe.close()
+
+        statsd_timing_mock.assert_called_with('dag_processing.last_duration.temp_dag', last_runtime)
+
 
 class TestDagFileProcessorAgent(unittest.TestCase):
     def setUp(self):