You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/07/05 14:47:20 UTC

[airflow] 01/16: Fix StatD timing metric units (#21106)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 40f16612f9f7a8bbac609397abf236895dba58c2
Author: viktorvia <86...@users.noreply.github.com>
AuthorDate: Wed Jun 1 07:40:36 2022 +0300

    Fix StatD timing metric units (#21106)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    Co-authored-by: Tzu-ping Chung <tp...@astronomer.io>
    (cherry picked from commit 1507ca48d7c211799129ce7956c11f4c45fee5bc)
---
 airflow/dag_processing/manager.py           |  6 +++---
 airflow/sensors/smart_sensor.py             |  9 +++++----
 tests/dag_processing/test_manager.py        |  6 ++++--
 tests/sensors/test_smart_sensor_operator.py | 23 +++++++++++++++++++++++
 4 files changed, 35 insertions(+), 9 deletions(-)

diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 0f900c63f0..cbbc2bfdaf 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -74,7 +74,7 @@ class DagFileStat(NamedTuple):
     num_dags: int
     import_errors: int
     last_finish_time: Optional[datetime]
-    last_duration: Optional[float]
+    last_duration: Optional[timedelta]
     run_count: int
 
 
@@ -834,7 +834,7 @@ class DagFileProcessorManager(LoggingMixin):
         :rtype: float
         """
         stat = self._file_stats.get(file_path)
-        return stat.last_duration if stat else None
+        return stat.last_duration.total_seconds() if stat and stat.last_duration else None
 
     def get_last_dag_count(self, file_path):
         """
@@ -927,7 +927,7 @@ class DagFileProcessorManager(LoggingMixin):
             count_import_errors = -1
             num_dags = 0
 
-        last_duration = (last_finish_time - processor.start_time).total_seconds()
+        last_duration = last_finish_time - processor.start_time
         stat = DagFileStat(
             num_dags=num_dags,
             import_errors=count_import_errors,
diff --git a/airflow/sensors/smart_sensor.py b/airflow/sensors/smart_sensor.py
index a45eb10d1c..bc22ab9c54 100644
--- a/airflow/sensors/smart_sensor.py
+++ b/airflow/sensors/smart_sensor.py
@@ -738,16 +738,17 @@ class SmartSensorOperator(BaseOperator, SkipMixin):
             for sensor_work in self.sensor_works:
                 self._execute_sensor_work(sensor_work)
 
-            duration = (timezone.utcnow() - poke_start_time).total_seconds()
+            duration = timezone.utcnow() - poke_start_time
+            duration_seconds = duration.total_seconds()
 
-            self.log.info("Taking %s to execute %s tasks.", duration, len(self.sensor_works))
+            self.log.info("Taking %s seconds to execute %s tasks.", duration_seconds, len(self.sensor_works))
 
             Stats.timing("smart_sensor_operator.loop_duration", duration)
             Stats.gauge("smart_sensor_operator.executed_tasks", len(self.sensor_works))
             self._emit_loop_stats()
 
-            if duration < self.poke_interval:
-                sleep(self.poke_interval - duration)
+            if duration_seconds < self.poke_interval:
+                sleep(self.poke_interval - duration_seconds)
             if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
                 self.log.info("Time is out for smart sensor.")
                 return
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index 6a65116d51..ed1b194a7b 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -426,7 +426,7 @@ class TestDagFileProcessorManager:
         # let's say the DAG was just parsed 2 seconds before the Freezed time
         last_finish_time = freezed_base_time - timedelta(seconds=10)
         manager._file_stats = {
-            "file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1),
+            "file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1),
         }
         with freeze_time(freezed_base_time):
             manager.set_file_paths(dag_files)
@@ -695,7 +695,9 @@ class TestDagFileProcessorManager:
         child_pipe.close()
         parent_pipe.close()
 
-        statsd_timing_mock.assert_called_with('dag_processing.last_duration.temp_dag', last_runtime)
+        statsd_timing_mock.assert_called_with(
+            'dag_processing.last_duration.temp_dag', timedelta(seconds=last_runtime)
+        )
 
     def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir):
         """Test DagFileProcessorManager._refresh_dag_dir method"""
diff --git a/tests/sensors/test_smart_sensor_operator.py b/tests/sensors/test_smart_sensor_operator.py
index 7c875a06f4..22c03918ca 100644
--- a/tests/sensors/test_smart_sensor_operator.py
+++ b/tests/sensors/test_smart_sensor_operator.py
@@ -20,6 +20,7 @@ import logging
 import os
 import time
 import unittest
+from unittest import mock
 from unittest.mock import Mock
 
 from freezegun import freeze_time
@@ -310,3 +311,25 @@ class SmartSensorTest(unittest.TestCase):
         assert sensor_instance is not None
         assert sensor_instance.state == State.SENSING
         assert sensor_instance.operator == "DummySensor"
+
+    @mock.patch('airflow.sensors.smart_sensor.Stats.timing')
+    @mock.patch('airflow.sensors.smart_sensor.timezone.utcnow')
+    def test_send_sensor_timing(self, timezone_utcnow_mock, statsd_timing_mock):
+        initial_time = timezone.datetime(2022, 1, 5, 0, 0, 0)
+        timezone_utcnow_mock.return_value = initial_time
+        self._make_sensor_dag_run()
+        smart = self._make_smart_operator(0)
+        smart.timeout = 0
+        duration = datetime.timedelta(seconds=3)
+        timezone_utcnow_mock.side_effect = [
+            # started_at
+            initial_time,
+            # poke_start_time
+            initial_time,
+            # duration
+            initial_time + duration,
+            # timeout check
+            initial_time + duration,
+        ]
+        smart.execute(None)
+        statsd_timing_mock.assert_called_with('smart_sensor_operator.loop_duration', duration)