You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/07 13:55:00 UTC
[airflow] branch v1-10-test updated: Don't emit
first_task_scheduling_delay metric for only-once dags (#12835)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 2c92b02 Don't emit first_task_scheduling_delay metric for only-once dags (#12835)
2c92b02 is described below
commit 2c92b02a3d89009fa43c9143ab5392d17919f859
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Sat Dec 5 21:56:51 2020 +0000
Don't emit first_task_scheduling_delay metric for only-once dags (#12835)
Dags with a schedule interval of None, or `@once` don't have a following
schedule, so we can't realistically calculate this metric.
Additionally, this changes the emitted metric from seconds to
milliseconds -- all timers to statsd should be in milliseconds -- this
is what Statsd and apps that consume data from there expect. See #10629
for more details.
This will be a "breaking" change from 1.10.14, where the metric was
back-ported to, but was (incorrectly) emitting seconds.
(cherry picked from commit 4a02e0a287f880eab98979de565c061747b35f27)
---
airflow/models/dagrun.py | 22 +++++++++------
docs/metrics.rst | 2 +-
tests/models/test_dagrun.py | 67 +++++++++++++++++++++++++++++++--------------
3 files changed, 61 insertions(+), 30 deletions(-)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 8ba7f4e..02e9c89 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -371,14 +371,20 @@ class DagRun(Base, LoggingMixin):
Note, the stat will only be emitted if the DagRun is a scheduler triggered one
(i.e. external_trigger is False).
"""
+ if self.state == State.RUNNING:
+ return
+ if self.external_trigger:
+ return
+ if not finished_tis:
+ return
+
try:
- if self.state == State.RUNNING:
- return
- if self.external_trigger:
- return
- if not finished_tis:
- return
dag = self.get_dag()
+
+ if not self.dag.schedule_interval or self.dag.schedule_interval == "@once":
+ # We can't emit this metric if there is no following schedule to cacluate from!
+ return
+
ordered_tis_by_start_date = [ti for ti in finished_tis if ti.start_date]
ordered_tis_by_start_date.sort(key=lambda ti: ti.start_date, reverse=False)
first_start_date = ordered_tis_by_start_date[0].start_date
@@ -386,8 +392,8 @@ class DagRun(Base, LoggingMixin):
# dag.following_schedule calculates the expected start datetime for a scheduled dagrun
# i.e. a daily flow for execution date 1/1/20 actually runs on 1/2/20 hh:mm:ss,
# and ti.start_date will be 1/2/20 hh:mm:ss so the following schedule is comparison
- true_delay = (first_start_date - dag.following_schedule(self.execution_date)).total_seconds()
- if true_delay >= 0:
+ true_delay = first_start_date - dag.following_schedule(self.execution_date)
+ if true_delay.total_seconds() > 0:
Stats.timing('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay)
except Exception as e:
self.log.warning('Failed to record first_task_scheduling_delay metric:\n', e)
diff --git a/docs/metrics.rst b/docs/metrics.rst
index 82e62b0..ee42525 100644
--- a/docs/metrics.rst
+++ b/docs/metrics.rst
@@ -108,5 +108,5 @@ Name Description
``dagrun.duration.failed.<dag_id>`` Milliseconds taken for a DagRun to reach failed state
``dagrun.schedule_delay.<dag_id>`` Milliseconds of delay between the scheduled DagRun
start date and the actual DagRun start date
-``dagrun.<dag_id>.first_task_scheduling_delay`` Seconds elapsed between first task start_date and dagrun expected start
+``dagrun.<dag_id>.first_task_scheduling_delay`` Milliseconds elapsed between first task start_date and dagrun expected start
================================================= =======================================================================
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 1f627c5..98980a1 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -34,6 +34,7 @@ from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
from tests.compat import mock, call
from tests.models import DEFAULT_DATE
+from tests.test_utils import db
class DagRunTest(unittest.TestCase):
@@ -628,31 +629,55 @@ class DagRunTest(unittest.TestCase):
self.assertNotIn(call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id)),
stats_mock.mock_calls)
- @mock.patch.object(Stats, 'timing')
- def test_emit_scheduling_delay(self, stats_mock):
+ @parameterized.expand(
+ [
+ ("*/5 * * * *", True),
+ (None, False),
+ ("@once", False),
+ ]
+ )
+ def test_emit_scheduling_delay(self, schedule_interval, expected):
"""
Tests that dag scheduling delay stat is set properly once running scheduled dag.
dag_run.update_state() invokes the _emit_true_scheduling_delay_stats_for_finished_state method.
"""
- dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1))
+ # Cleanup
+ db.clear_db_runs()
+ db.clear_db_dags()
+
+ dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1), schedule_interval=schedule_interval)
dag_task = DummyOperator(task_id='dummy', dag=dag, owner='airflow')
session = settings.Session()
- orm_dag = DagModel(dag_id=dag.dag_id, is_active=True)
- session.add(orm_dag)
- session.flush()
- dag_run = dag.create_dagrun(
- run_id="test",
- state=State.SUCCESS,
- execution_date=dag.start_date,
- start_date=dag.start_date,
- session=session,
- )
- ti = dag_run.get_task_instance(dag_task.task_id)
- ti.set_state(State.SUCCESS, session)
- session.commit()
- session.close()
- dag_run.update_state()
- true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date)).total_seconds()
- sched_delay_stat_call = call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay)
- self.assertIn(sched_delay_stat_call, stats_mock.mock_calls)
+ try:
+ orm_dag = DagModel(dag_id=dag.dag_id, is_active=True)
+ session.add(orm_dag)
+ session.flush()
+ dag_run = dag.create_dagrun(
+ run_id="test",
+ state=State.SUCCESS,
+ execution_date=dag.start_date,
+ start_date=dag.start_date,
+ session=session,
+ )
+ ti = dag_run.get_task_instance(dag_task.task_id, session)
+ ti.set_state(State.SUCCESS, session)
+ session.flush()
+
+ with mock.patch.object(Stats, 'timing') as stats_mock:
+ dag_run.update_state(session)
+
+ metric_name = 'dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id)
+
+ if expected:
+ true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date))
+ sched_delay_stat_call = call(metric_name, true_delay)
+ assert sched_delay_stat_call in stats_mock.mock_calls
+ else:
+ # Assert that we never passed the metric
+ sched_delay_stat_call = call(metric_name, mock.ANY)
+ assert sched_delay_stat_call not in stats_mock.mock_calls
+ finally:
+ # Don't write anything to the DB
+ session.rollback()
+ session.close()