You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/07/05 14:47:20 UTC
[airflow] 01/16: Fix StatD timing metric units (#21106)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 40f16612f9f7a8bbac609397abf236895dba58c2
Author: viktorvia <86...@users.noreply.github.com>
AuthorDate: Wed Jun 1 07:40:36 2022 +0300
Fix StatD timing metric units (#21106)
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
Co-authored-by: Tzu-ping Chung <tp...@astronomer.io>
(cherry picked from commit 1507ca48d7c211799129ce7956c11f4c45fee5bc)
---
airflow/dag_processing/manager.py | 6 +++---
airflow/sensors/smart_sensor.py | 9 +++++----
tests/dag_processing/test_manager.py | 6 ++++--
tests/sensors/test_smart_sensor_operator.py | 23 +++++++++++++++++++++++
4 files changed, 35 insertions(+), 9 deletions(-)
diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 0f900c63f0..cbbc2bfdaf 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -74,7 +74,7 @@ class DagFileStat(NamedTuple):
num_dags: int
import_errors: int
last_finish_time: Optional[datetime]
- last_duration: Optional[float]
+ last_duration: Optional[timedelta]
run_count: int
@@ -834,7 +834,7 @@ class DagFileProcessorManager(LoggingMixin):
:rtype: float
"""
stat = self._file_stats.get(file_path)
- return stat.last_duration if stat else None
+ return stat.last_duration.total_seconds() if stat and stat.last_duration else None
def get_last_dag_count(self, file_path):
"""
@@ -927,7 +927,7 @@ class DagFileProcessorManager(LoggingMixin):
count_import_errors = -1
num_dags = 0
- last_duration = (last_finish_time - processor.start_time).total_seconds()
+ last_duration = last_finish_time - processor.start_time
stat = DagFileStat(
num_dags=num_dags,
import_errors=count_import_errors,
diff --git a/airflow/sensors/smart_sensor.py b/airflow/sensors/smart_sensor.py
index a45eb10d1c..bc22ab9c54 100644
--- a/airflow/sensors/smart_sensor.py
+++ b/airflow/sensors/smart_sensor.py
@@ -738,16 +738,17 @@ class SmartSensorOperator(BaseOperator, SkipMixin):
for sensor_work in self.sensor_works:
self._execute_sensor_work(sensor_work)
- duration = (timezone.utcnow() - poke_start_time).total_seconds()
+ duration = timezone.utcnow() - poke_start_time
+ duration_seconds = duration.total_seconds()
- self.log.info("Taking %s to execute %s tasks.", duration, len(self.sensor_works))
+ self.log.info("Taking %s seconds to execute %s tasks.", duration_seconds, len(self.sensor_works))
Stats.timing("smart_sensor_operator.loop_duration", duration)
Stats.gauge("smart_sensor_operator.executed_tasks", len(self.sensor_works))
self._emit_loop_stats()
- if duration < self.poke_interval:
- sleep(self.poke_interval - duration)
+ if duration_seconds < self.poke_interval:
+ sleep(self.poke_interval - duration_seconds)
if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
self.log.info("Time is out for smart sensor.")
return
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index 6a65116d51..ed1b194a7b 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -426,7 +426,7 @@ class TestDagFileProcessorManager:
# let's say the DAG was just parsed 2 seconds before the Freezed time
last_finish_time = freezed_base_time - timedelta(seconds=10)
manager._file_stats = {
- "file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1),
+ "file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1),
}
with freeze_time(freezed_base_time):
manager.set_file_paths(dag_files)
@@ -695,7 +695,9 @@ class TestDagFileProcessorManager:
child_pipe.close()
parent_pipe.close()
- statsd_timing_mock.assert_called_with('dag_processing.last_duration.temp_dag', last_runtime)
+ statsd_timing_mock.assert_called_with(
+ 'dag_processing.last_duration.temp_dag', timedelta(seconds=last_runtime)
+ )
def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir):
"""Test DagFileProcessorManager._refresh_dag_dir method"""
diff --git a/tests/sensors/test_smart_sensor_operator.py b/tests/sensors/test_smart_sensor_operator.py
index 7c875a06f4..22c03918ca 100644
--- a/tests/sensors/test_smart_sensor_operator.py
+++ b/tests/sensors/test_smart_sensor_operator.py
@@ -20,6 +20,7 @@ import logging
import os
import time
import unittest
+from unittest import mock
from unittest.mock import Mock
from freezegun import freeze_time
@@ -310,3 +311,25 @@ class SmartSensorTest(unittest.TestCase):
assert sensor_instance is not None
assert sensor_instance.state == State.SENSING
assert sensor_instance.operator == "DummySensor"
+
+ @mock.patch('airflow.sensors.smart_sensor.Stats.timing')
+ @mock.patch('airflow.sensors.smart_sensor.timezone.utcnow')
+ def test_send_sensor_timing(self, timezone_utcnow_mock, statsd_timing_mock):
+ initial_time = timezone.datetime(2022, 1, 5, 0, 0, 0)
+ timezone_utcnow_mock.return_value = initial_time
+ self._make_sensor_dag_run()
+ smart = self._make_smart_operator(0)
+ smart.timeout = 0
+ duration = datetime.timedelta(seconds=3)
+ timezone_utcnow_mock.side_effect = [
+ # started_at
+ initial_time,
+ # poke_start_time
+ initial_time,
+ # duration
+ initial_time + duration,
+ # timeout check
+ initial_time + duration,
+ ]
+ smart.execute(None)
+ statsd_timing_mock.assert_called_with('smart_sensor_operator.loop_duration', duration)