You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/22 22:14:53 UTC

[airflow] branch v1-10-stable updated: Ensure all statsd timers use millisecond values. (#10633)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-stable by this push:
     new 48daabc  Ensure all statsd timers use millisecond values. (#10633)
48daabc is described below

commit 48daabce99476c218b6d0d1a5ed5c6941074497c
Author: Martijn Pieters <mj...@zopatista.com>
AuthorDate: Fri Jan 22 22:14:40 2021 +0000

    Ensure all statsd timers use millisecond values. (#10633)
    
    * Backport pull request #6682 to v1-10
    * Backport pull request #10629 to v1-10
---
 airflow/models/dagbag.py        | 4 +---
 airflow/models/dagrun.py        | 2 +-
 airflow/models/taskinstance.py  | 2 +-
 airflow/utils/dag_processing.py | 4 ++--
 4 files changed, 5 insertions(+), 7 deletions(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 88be05d..f68c420 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -434,8 +434,6 @@ class DagBag(BaseDagBag, LoggingMixin):
                 dag_id_names = str(dag_ids)
 
                 td = timezone.utcnow() - ts
-                td = td.total_seconds() + (
-                    float(td.microseconds) / 1000000)
                 stats.append(FileLoadStat(
                     filepath.replace(settings.DAGS_FOLDER, ''),
                     td,
@@ -487,7 +485,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         stats = self.dagbag_stats
         return report.format(
             dag_folder=self.dag_folder,
-            duration=sum([o.duration for o in stats]),
+            duration=sum([o.duration for o in stats], timedelta()).total_seconds(),
             dag_num=sum([o.dag_num for o in stats]),
             task_num=sum([o.task_num for o in stats]),
             table=pprinttable(stats),
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 02e9c89..94bafc7 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -285,7 +285,7 @@ class DagRun(Base, LoggingMixin):
                 are_runnable_tasks = ready_tis or self._are_premature_tis(
                     unfinished_tasks, finished_tasks, session) or changed_tis
 
-        duration = (timezone.utcnow() - start_dttm).total_seconds() * 1000
+        duration = (timezone.utcnow() - start_dttm)
         Stats.timing("dagrun.dependency-check.{}".format(self.dag_id), duration)
 
         leaf_task_ids = {t.task_id for t in dag.leaves}
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 7c1caef..a9459da 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -990,7 +990,7 @@ class TaskInstance(Base, LoggingMixin):
                 task_copy.post_execute(context=context, result=result)
 
                 end_time = time.time()
-                duration = end_time - start_time
+                duration = timedelta(seconds=end_time - start_time)
                 Stats.timing(
                     'dag.{dag_id}.{task_id}.duration'.format(
                         dag_id=task_copy.dag_id,
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 881a8ce..58e6af0 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -1022,7 +1022,7 @@ class DagFileProcessorManager(LoggingMixin):
 
             processor_pid = self.get_pid(file_path)
             processor_start_time = self.get_start_time(file_path)
-            runtime = ((now - processor_start_time).total_seconds() if processor_start_time else None)
+            runtime = ((now - processor_start_time) if processor_start_time else None)
             last_run = self.get_last_finish_time(file_path)
             if last_run:
                 seconds_ago = (now - last_run).total_seconds()
@@ -1047,7 +1047,7 @@ class DagFileProcessorManager(LoggingMixin):
         for file_path, pid, runtime, num_dags, num_errors, last_runtime, last_run in rows:
             formatted_rows.append((file_path,
                                    pid,
-                                   "{:.2f}s".format(runtime) if runtime else None,
+                                   "{:.2f}s".format(runtime.total_seconds()) if runtime else None,
                                    num_dags,
                                    num_errors,
                                    "{:.2f}s".format(last_runtime) if last_runtime else None,