You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bi...@apache.org on 2021/08/23 16:18:44 UTC
[airflow] branch main updated: Fix dag_processing.last_duration
metric random holes (#17769)
This is an automated email from the ASF dual-hosted git repository.
binh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1a929f7 Fix dag_processing.last_duration metric random holes (#17769)
1a929f7 is described below
commit 1a929f73398bdd201c8c92a08b398b41f9c2f591
Author: Vadim <va...@gmail.com>
AuthorDate: Mon Aug 23 19:17:25 2021 +0300
Fix dag_processing.last_duration metric random holes (#17769)
* Fix dag_processing.last_duration metric random holes
* Fix test
* Fix mssql+sqlite test
* move dag_processing.last_duration timing to _collect_results_from_processor
---
airflow/dag_processing/manager.py | 5 +++--
tests/dag_processing/test_manager.py | 35 +++++++++++++++++++++++++++++++++++
2 files changed, 38 insertions(+), 2 deletions(-)
diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index a38b68f..b235250 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -724,8 +724,6 @@ class DagFileProcessorManager(LoggingMixin):
if last_run:
seconds_ago = (now - last_run).total_seconds()
Stats.gauge(f'dag_processing.last_run.seconds_ago.{file_name}', seconds_ago)
- if runtime:
- Stats.timing(f'dag_processing.last_duration.{file_name}', runtime)
rows.append((file_path, processor_pid, runtime, num_dags, num_errors, last_runtime, last_run))
@@ -893,6 +891,9 @@ class DagFileProcessorManager(LoggingMixin):
)
self._file_stats[processor.file_path] = stat
+ file_name = os.path.splitext(os.path.basename(processor.file_path))[0].replace(os.sep, '.')
+ Stats.timing(f'dag_processing.last_duration.{file_name}', stat.last_duration)
+
def collect_results(self) -> None:
"""Collect the result from any finished DAG processors"""
ready = multiprocessing.connection.wait(self.waitables.keys() - [self._signal_conn], timeout=0)
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index 8d1c4f3..ac2d4dc 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -27,6 +27,7 @@ import threading
import unittest
from datetime import datetime, timedelta
from tempfile import TemporaryDirectory
+from textwrap import dedent
from unittest import mock
from unittest.mock import MagicMock, PropertyMock
@@ -694,6 +695,40 @@ class TestDagFileProcessorManager:
child_pipe.close()
thread.join(timeout=1.0)
+ @conf_vars({('core', 'load_examples'): 'False'})
+ @mock.patch('airflow.dag_processing.manager.Stats.timing')
+ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir):
+ filename_to_parse = tmpdir / 'temp_dag.py'
+ dag_code = dedent(
+ """
+ from airflow import DAG
+ dag = DAG(dag_id='temp_dag', schedule_interval='0 0 * * *')
+ """
+ )
+ with open(filename_to_parse, 'w') as file_to_parse:
+ file_to_parse.writelines(dag_code)
+
+ child_pipe, parent_pipe = multiprocessing.Pipe()
+
+ async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
+ manager = DagFileProcessorManager(
+ dag_directory=tmpdir,
+ max_runs=1,
+ processor_timeout=timedelta.max,
+ signal_conn=child_pipe,
+ dag_ids=[],
+ pickle_dags=False,
+ async_mode=async_mode,
+ )
+
+ self.run_processor_manager_one_loop(manager, parent_pipe)
+ last_runtime = manager.get_last_runtime(manager.file_paths[0])
+
+ child_pipe.close()
+ parent_pipe.close()
+
+ statsd_timing_mock.assert_called_with('dag_processing.last_duration.temp_dag', last_runtime)
+
class TestDagFileProcessorAgent(unittest.TestCase):
def setUp(self):