You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/11/30 16:55:48 UTC

[airflow] branch v1-10-test updated: Add metric for scheduling delay between first run task & expected start time (#9544)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new b62d0f4  Add metric for scheduling delay between first run task & expected start time (#9544)
b62d0f4 is described below

commit b62d0f4e781767c06e473755c6839d644b32ad45
Author: Ace Haidrey <ah...@pandora.com>
AuthorDate: Fri Nov 13 14:03:42 2020 -0800

    Add metric for scheduling delay between first run task & expected start time (#9544)
    
    Co-authored-by: Ace Haidrey <ah...@pinterest.com>
    (cherry picked from commit aac3877ec374e5f376d8f95b50031c10625216a4)
---
 airflow/models/dagrun.py    | 36 +++++++++++++++++++++++++++++
 docs/metrics.rst            | 23 ++++++++++---------
 tests/models/test_dagrun.py | 55 ++++++++++++++++++++++++++++++++++++++++++---
 3 files changed, 100 insertions(+), 14 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 9775c9f..8ba7f4e 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -320,6 +320,7 @@ class DagRun(Base, LoggingMixin):
         else:
             self.set_state(State.RUNNING)
 
+        self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks)
         self._emit_duration_stats_for_finished_state()
 
         # todo: determine we want to use with_for_update to make sure to lock the run
@@ -356,6 +357,41 @@ class DagRun(Base, LoggingMixin):
                     session=session):
                 return True
 
+    def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis):
+        """
+        This is a helper method to emit the true scheduling delay stats, which is defined as
+        the time when the first task in DAG starts minus the expected DAG run datetime.
+        This method will be used in the update_state method when the state of the DagRun
+        is updated to a completed status (either success or failure). The method will find the first
+        started task within the DAG and calculate the expected DagRun start time (based on
+        dag.execution_date & dag.schedule_interval), and minus these two values to get the delay.
+        The emitted data may contains outlier (e.g. when the first task was cleared, so
+        the second task's start_date will be used), but we can get rid of the the outliers
+        on the stats side through the dashboards tooling built.
+        Note, the stat will only be emitted if the DagRun is a scheduler triggered one
+        (i.e. external_trigger is False).
+        """
+        try:
+            if self.state == State.RUNNING:
+                return
+            if self.external_trigger:
+                return
+            if not finished_tis:
+                return
+            dag = self.get_dag()
+            ordered_tis_by_start_date = [ti for ti in finished_tis if ti.start_date]
+            ordered_tis_by_start_date.sort(key=lambda ti: ti.start_date, reverse=False)
+            first_start_date = ordered_tis_by_start_date[0].start_date
+            if first_start_date:
+                # dag.following_schedule calculates the expected start datetime for a scheduled dagrun
+                # i.e. a daily flow for execution date 1/1/20 actually runs on 1/2/20 hh:mm:ss,
+                # and ti.start_date will be 1/2/20 hh:mm:ss so the following schedule is comparison
+                true_delay = (first_start_date - dag.following_schedule(self.execution_date)).total_seconds()
+                if true_delay >= 0:
+                    Stats.timing('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay)
+        except Exception as e:
+            self.log.warning('Failed to record first_task_scheduling_delay metric:\n', e)
+
     def _emit_duration_stats_for_finished_state(self):
         if self.state == State.RUNNING:
             return
diff --git a/docs/metrics.rst b/docs/metrics.rst
index afbd7c9..1e6e06b 100644
--- a/docs/metrics.rst
+++ b/docs/metrics.rst
@@ -90,14 +90,15 @@ Name                                                Description
 Timers
 ------
 
-=========================================== =================================================
-Name                                        Description
-=========================================== =================================================
-``dagrun.dependency-check.<dag_id>``        Milliseconds taken to check DAG dependencies
-``dag.<dag_id>.<task_id>.duration``         Milliseconds taken to finish a task
-``dag_processing.last_duration.<dag_file>`` Milliseconds taken to load the given DAG file
-``dagrun.duration.success.<dag_id>``        Milliseconds taken for a DagRun to reach success state
-``dagrun.duration.failed.<dag_id>``         Milliseconds taken for a DagRun to reach failed state
-``dagrun.schedule_delay.<dag_id>``          Milliseconds of delay between the scheduled DagRun
-                                            start date and the actual DagRun start date
-=========================================== =================================================
+================================================= =======================================================================
+Name                                              Description
+================================================= =======================================================================
+``dagrun.dependency-check.<dag_id>``              Milliseconds taken to check DAG dependencies
+``dag.<dag_id>.<task_id>.duration``               Milliseconds taken to finish a task
+``dag_processing.last_duration.<dag_file>``       Milliseconds taken to load the given DAG file
+``dagrun.duration.success.<dag_id>``              Milliseconds taken for a DagRun to reach success state
+``dagrun.duration.failed.<dag_id>``               Milliseconds taken for a DagRun to reach failed state
+``dagrun.schedule_delay.<dag_id>``                Milliseconds of delay between the scheduled DagRun
+                                                  start date and the actual DagRun start date
+``dagrun.<dag_id>.first_task_scheduling_delay``   Seconds elapsed between first task start_date and dagrun expected start
+================================================= ==============================================================================
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 6dcf49e..e823a72 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -16,7 +16,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import unittest
 
@@ -24,14 +23,16 @@ from parameterized import parameterized
 
 from airflow import settings, models
 from airflow.jobs import BackfillJob
-from airflow.models import DAG, DagRun, clear_task_instances
+from airflow.models import DAG, DagRun, clear_task_instances, DagModel
 from airflow.models import TaskInstance as TI
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.python_operator import ShortCircuitOperator
+from airflow.settings import Stats
 from airflow.utils import timezone
+from airflow.utils.dates import days_ago
 from airflow.utils.state import State
 from airflow.utils.trigger_rule import TriggerRule
-from tests.compat import mock
+from tests.compat import mock, call
 from tests.models import DEFAULT_DATE
 
 
@@ -608,3 +609,51 @@ class DagRunTest(unittest.TestCase):
         dagrun.verify_integrity()
         task = dagrun.get_task_instances()[0]
         assert task.queue == 'queue1'
+
+    @mock.patch.object(Stats, 'timing')
+    def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock):
+        """
+        Tests that dag scheduling delay stat is not called if the dagrun is not a scheduled run.
+        This case is manual run. Simple test for sanity check.
+        """
+        dag = DAG(dag_id='test_dagrun_stats', start_date=days_ago(1))
+        dag_task = DummyOperator(task_id='dummy', dag=dag)
+
+        initial_task_states = {
+            dag_task.task_id: State.SUCCESS,
+        }
+
+        dag_run = self.create_dag_run(dag=dag, state=State.RUNNING, task_states=initial_task_states)
+        dag_run.update_state()
+        self.assertNotIn(call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id)),
+                         stats_mock.mock_calls)
+
+    @mock.patch.object(Stats, 'timing')
+    def test_emit_scheduling_delay(self, stats_mock):
+        """
+        Tests that dag scheduling delay stat is set properly once running scheduled dag.
+        dag_run.update_state() invokes the _emit_true_scheduling_delay_stats_for_finished_state method.
+        """
+        dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1))
+        dag_task = DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id, is_active=True)
+        session.add(orm_dag)
+        session.flush()
+        dag_run = dag.create_dagrun(
+            run_id="test",
+            state=State.SUCCESS,
+            execution_date=dag.start_date,
+            start_date=dag.start_date,
+            session=session,
+        )
+        ti = dag_run.get_task_instance(dag_task.task_id)
+        ti.set_state(State.SUCCESS, session)
+        session.commit()
+        session.close()
+        dag_run.update_state()
+        true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date)).total_seconds()
+        stats_mock.assert_called()
+        sched_delay_stat_call = call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay)
+        self.assertIn(sched_delay_stat_call, stats_mock.mock_calls)