You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/22 22:14:53 UTC
[airflow] branch v1-10-stable updated: Ensure all statsd timers use
millisecond values. (#10633)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-stable by this push:
new 48daabc Ensure all statsd timers use millisecond values. (#10633)
48daabc is described below
commit 48daabce99476c218b6d0d1a5ed5c6941074497c
Author: Martijn Pieters <mj...@zopatista.com>
AuthorDate: Fri Jan 22 22:14:40 2021 +0000
Ensure all statsd timers use millisecond values. (#10633)
* Backport pull request #6682 to v1-10
* Backport pull request #10629 to v1-10
---
airflow/models/dagbag.py | 4 +---
airflow/models/dagrun.py | 2 +-
airflow/models/taskinstance.py | 2 +-
airflow/utils/dag_processing.py | 4 ++--
4 files changed, 5 insertions(+), 7 deletions(-)
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 88be05d..f68c420 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -434,8 +434,6 @@ class DagBag(BaseDagBag, LoggingMixin):
dag_id_names = str(dag_ids)
td = timezone.utcnow() - ts
- td = td.total_seconds() + (
- float(td.microseconds) / 1000000)
stats.append(FileLoadStat(
filepath.replace(settings.DAGS_FOLDER, ''),
td,
@@ -487,7 +485,7 @@ class DagBag(BaseDagBag, LoggingMixin):
stats = self.dagbag_stats
return report.format(
dag_folder=self.dag_folder,
- duration=sum([o.duration for o in stats]),
+ duration=sum([o.duration for o in stats], timedelta()).total_seconds(),
dag_num=sum([o.dag_num for o in stats]),
task_num=sum([o.task_num for o in stats]),
table=pprinttable(stats),
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 02e9c89..94bafc7 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -285,7 +285,7 @@ class DagRun(Base, LoggingMixin):
are_runnable_tasks = ready_tis or self._are_premature_tis(
unfinished_tasks, finished_tasks, session) or changed_tis
- duration = (timezone.utcnow() - start_dttm).total_seconds() * 1000
+ duration = (timezone.utcnow() - start_dttm)
Stats.timing("dagrun.dependency-check.{}".format(self.dag_id), duration)
leaf_task_ids = {t.task_id for t in dag.leaves}
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 7c1caef..a9459da 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -990,7 +990,7 @@ class TaskInstance(Base, LoggingMixin):
task_copy.post_execute(context=context, result=result)
end_time = time.time()
- duration = end_time - start_time
+ duration = timedelta(seconds=end_time - start_time)
Stats.timing(
'dag.{dag_id}.{task_id}.duration'.format(
dag_id=task_copy.dag_id,
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 881a8ce..58e6af0 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -1022,7 +1022,7 @@ class DagFileProcessorManager(LoggingMixin):
processor_pid = self.get_pid(file_path)
processor_start_time = self.get_start_time(file_path)
- runtime = ((now - processor_start_time).total_seconds() if processor_start_time else None)
+ runtime = ((now - processor_start_time) if processor_start_time else None)
last_run = self.get_last_finish_time(file_path)
if last_run:
seconds_ago = (now - last_run).total_seconds()
@@ -1047,7 +1047,7 @@ class DagFileProcessorManager(LoggingMixin):
for file_path, pid, runtime, num_dags, num_errors, last_runtime, last_run in rows:
formatted_rows.append((file_path,
pid,
- "{:.2f}s".format(runtime) if runtime else None,
+ "{:.2f}s".format(runtime.total_seconds()) if runtime else None,
num_dags,
num_errors,
"{:.2f}s".format(last_runtime) if last_runtime else None,