You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/07 13:55:00 UTC

[airflow] branch v1-10-test updated: Don't emit first_task_scheduling_delay metric for only-once dags (#12835)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 2c92b02  Don't emit first_task_scheduling_delay metric for only-once dags (#12835)
2c92b02 is described below

commit 2c92b02a3d89009fa43c9143ab5392d17919f859
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Sat Dec 5 21:56:51 2020 +0000

    Don't emit first_task_scheduling_delay metric for only-once dags (#12835)
    
    Dags with a schedule interval of None, or `@once` don't have a following
    schedule, so we can't realistically calculate this metric.
    
    Additionally, this changes the emitted metric from seconds to
    milliseconds -- all timers to statsd should be in milliseconds -- this
    is what Statsd and apps that consume data from there expect. See #10629
    for more details.
    
    This will be a "breaking" change from 1.10.14, where the metric was
    back-ported to, but was (incorrectly) emitting seconds.
    
    (cherry picked from commit 4a02e0a287f880eab98979de565c061747b35f27)
---
 airflow/models/dagrun.py    | 22 +++++++++------
 docs/metrics.rst            |  2 +-
 tests/models/test_dagrun.py | 67 +++++++++++++++++++++++++++++++--------------
 3 files changed, 61 insertions(+), 30 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 8ba7f4e..02e9c89 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -371,14 +371,20 @@ class DagRun(Base, LoggingMixin):
         Note, the stat will only be emitted if the DagRun is a scheduler triggered one
         (i.e. external_trigger is False).
         """
+        if self.state == State.RUNNING:
+            return
+        if self.external_trigger:
+            return
+        if not finished_tis:
+            return
+
         try:
-            if self.state == State.RUNNING:
-                return
-            if self.external_trigger:
-                return
-            if not finished_tis:
-                return
             dag = self.get_dag()
+
+            if not self.dag.schedule_interval or self.dag.schedule_interval == "@once":
+                # We can't emit this metric if there is no following schedule to cacluate from!
+                return
+
             ordered_tis_by_start_date = [ti for ti in finished_tis if ti.start_date]
             ordered_tis_by_start_date.sort(key=lambda ti: ti.start_date, reverse=False)
             first_start_date = ordered_tis_by_start_date[0].start_date
@@ -386,8 +392,8 @@ class DagRun(Base, LoggingMixin):
                 # dag.following_schedule calculates the expected start datetime for a scheduled dagrun
                 # i.e. a daily flow for execution date 1/1/20 actually runs on 1/2/20 hh:mm:ss,
                 # and ti.start_date will be 1/2/20 hh:mm:ss so the following schedule is comparison
-                true_delay = (first_start_date - dag.following_schedule(self.execution_date)).total_seconds()
-                if true_delay >= 0:
+                true_delay = first_start_date - dag.following_schedule(self.execution_date)
+                if true_delay.total_seconds() > 0:
                     Stats.timing('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay)
         except Exception as e:
             self.log.warning('Failed to record first_task_scheduling_delay metric:\n', e)
diff --git a/docs/metrics.rst b/docs/metrics.rst
index 82e62b0..ee42525 100644
--- a/docs/metrics.rst
+++ b/docs/metrics.rst
@@ -108,5 +108,5 @@ Name                                              Description
 ``dagrun.duration.failed.<dag_id>``               Milliseconds taken for a DagRun to reach failed state
 ``dagrun.schedule_delay.<dag_id>``                Milliseconds of delay between the scheduled DagRun
                                                   start date and the actual DagRun start date
-``dagrun.<dag_id>.first_task_scheduling_delay``   Seconds elapsed between first task start_date and dagrun expected start
+``dagrun.<dag_id>.first_task_scheduling_delay``   Milliseconds elapsed between first task start_date and dagrun expected start
 ================================================= =======================================================================
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 1f627c5..98980a1 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -34,6 +34,7 @@ from airflow.utils.state import State
 from airflow.utils.trigger_rule import TriggerRule
 from tests.compat import mock, call
 from tests.models import DEFAULT_DATE
+from tests.test_utils import db
 
 
 class DagRunTest(unittest.TestCase):
@@ -628,31 +629,55 @@ class DagRunTest(unittest.TestCase):
         self.assertNotIn(call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id)),
                          stats_mock.mock_calls)
 
-    @mock.patch.object(Stats, 'timing')
-    def test_emit_scheduling_delay(self, stats_mock):
+    @parameterized.expand(
+        [
+            ("*/5 * * * *", True),
+            (None, False),
+            ("@once", False),
+        ]
+    )
+    def test_emit_scheduling_delay(self, schedule_interval, expected):
         """
         Tests that dag scheduling delay stat is set properly once running scheduled dag.
         dag_run.update_state() invokes the _emit_true_scheduling_delay_stats_for_finished_state method.
         """
-        dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1))
+        # Cleanup
+        db.clear_db_runs()
+        db.clear_db_dags()
+
+        dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1), schedule_interval=schedule_interval)
         dag_task = DummyOperator(task_id='dummy', dag=dag, owner='airflow')
 
         session = settings.Session()
-        orm_dag = DagModel(dag_id=dag.dag_id, is_active=True)
-        session.add(orm_dag)
-        session.flush()
-        dag_run = dag.create_dagrun(
-            run_id="test",
-            state=State.SUCCESS,
-            execution_date=dag.start_date,
-            start_date=dag.start_date,
-            session=session,
-        )
-        ti = dag_run.get_task_instance(dag_task.task_id)
-        ti.set_state(State.SUCCESS, session)
-        session.commit()
-        session.close()
-        dag_run.update_state()
-        true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date)).total_seconds()
-        sched_delay_stat_call = call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay)
-        self.assertIn(sched_delay_stat_call, stats_mock.mock_calls)
+        try:
+            orm_dag = DagModel(dag_id=dag.dag_id, is_active=True)
+            session.add(orm_dag)
+            session.flush()
+            dag_run = dag.create_dagrun(
+                run_id="test",
+                state=State.SUCCESS,
+                execution_date=dag.start_date,
+                start_date=dag.start_date,
+                session=session,
+            )
+            ti = dag_run.get_task_instance(dag_task.task_id, session)
+            ti.set_state(State.SUCCESS, session)
+            session.flush()
+
+            with mock.patch.object(Stats, 'timing') as stats_mock:
+                dag_run.update_state(session)
+
+            metric_name = 'dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id)
+
+            if expected:
+                true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date))
+                sched_delay_stat_call = call(metric_name, true_delay)
+                assert sched_delay_stat_call in stats_mock.mock_calls
+            else:
+                # Assert that we never passed the metric
+                sched_delay_stat_call = call(metric_name, mock.ANY)
+                assert sched_delay_stat_call not in stats_mock.mock_calls
+        finally:
+            # Don't write anything to the DB
+            session.rollback()
+            session.close()