You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/11/30 16:55:48 UTC
[airflow] branch v1-10-test updated: Add metric for scheduling
delay between first run task & expected start time (#9544)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new b62d0f4 Add metric for scheduling delay between first run task & expected start time (#9544)
b62d0f4 is described below
commit b62d0f4e781767c06e473755c6839d644b32ad45
Author: Ace Haidrey <ah...@pandora.com>
AuthorDate: Fri Nov 13 14:03:42 2020 -0800
Add metric for scheduling delay between first run task & expected start time (#9544)
Co-authored-by: Ace Haidrey <ah...@pinterest.com>
(cherry picked from commit aac3877ec374e5f376d8f95b50031c10625216a4)
---
airflow/models/dagrun.py | 36 +++++++++++++++++++++++++++++
docs/metrics.rst | 23 ++++++++++---------
tests/models/test_dagrun.py | 55 ++++++++++++++++++++++++++++++++++++++++++---
3 files changed, 100 insertions(+), 14 deletions(-)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 9775c9f..8ba7f4e 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -320,6 +320,7 @@ class DagRun(Base, LoggingMixin):
else:
self.set_state(State.RUNNING)
+ self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks)
self._emit_duration_stats_for_finished_state()
# todo: determine we want to use with_for_update to make sure to lock the run
@@ -356,6 +357,41 @@ class DagRun(Base, LoggingMixin):
session=session):
return True
+ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis):
+ """
+ This is a helper method to emit the true scheduling delay stats, which is defined as
+ the time when the first task in DAG starts minus the expected DAG run datetime.
+ This method will be used in the update_state method when the state of the DagRun
+ is updated to a completed status (either success or failure). The method will find the first
+ started task within the DAG and calculate the expected DagRun start time (based on
+ dag.execution_date & dag.schedule_interval), and minus these two values to get the delay.
+ The emitted data may contains outlier (e.g. when the first task was cleared, so
+ the second task's start_date will be used), but we can get rid of the the outliers
+ on the stats side through the dashboards tooling built.
+ Note, the stat will only be emitted if the DagRun is a scheduler triggered one
+ (i.e. external_trigger is False).
+ """
+ try:
+ if self.state == State.RUNNING:
+ return
+ if self.external_trigger:
+ return
+ if not finished_tis:
+ return
+ dag = self.get_dag()
+ ordered_tis_by_start_date = [ti for ti in finished_tis if ti.start_date]
+ ordered_tis_by_start_date.sort(key=lambda ti: ti.start_date, reverse=False)
+ first_start_date = ordered_tis_by_start_date[0].start_date
+ if first_start_date:
+ # dag.following_schedule calculates the expected start datetime for a scheduled dagrun
+ # i.e. a daily flow for execution date 1/1/20 actually runs on 1/2/20 hh:mm:ss,
+ # and ti.start_date will be 1/2/20 hh:mm:ss so the following schedule is comparison
+ true_delay = (first_start_date - dag.following_schedule(self.execution_date)).total_seconds()
+ if true_delay >= 0:
+ Stats.timing('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay)
+ except Exception as e:
+ self.log.warning('Failed to record first_task_scheduling_delay metric:\n', e)
+
def _emit_duration_stats_for_finished_state(self):
if self.state == State.RUNNING:
return
diff --git a/docs/metrics.rst b/docs/metrics.rst
index afbd7c9..1e6e06b 100644
--- a/docs/metrics.rst
+++ b/docs/metrics.rst
@@ -90,14 +90,15 @@ Name Description
Timers
------
-=========================================== =================================================
-Name Description
-=========================================== =================================================
-``dagrun.dependency-check.<dag_id>`` Milliseconds taken to check DAG dependencies
-``dag.<dag_id>.<task_id>.duration`` Milliseconds taken to finish a task
-``dag_processing.last_duration.<dag_file>`` Milliseconds taken to load the given DAG file
-``dagrun.duration.success.<dag_id>`` Milliseconds taken for a DagRun to reach success state
-``dagrun.duration.failed.<dag_id>`` Milliseconds taken for a DagRun to reach failed state
-``dagrun.schedule_delay.<dag_id>`` Milliseconds of delay between the scheduled DagRun
- start date and the actual DagRun start date
-=========================================== =================================================
+================================================= =======================================================================
+Name Description
+================================================= =======================================================================
+``dagrun.dependency-check.<dag_id>`` Milliseconds taken to check DAG dependencies
+``dag.<dag_id>.<task_id>.duration`` Milliseconds taken to finish a task
+``dag_processing.last_duration.<dag_file>`` Milliseconds taken to load the given DAG file
+``dagrun.duration.success.<dag_id>`` Milliseconds taken for a DagRun to reach success state
+``dagrun.duration.failed.<dag_id>`` Milliseconds taken for a DagRun to reach failed state
+``dagrun.schedule_delay.<dag_id>`` Milliseconds of delay between the scheduled DagRun
+ start date and the actual DagRun start date
+``dagrun.<dag_id>.first_task_scheduling_delay`` Seconds elapsed between first task start_date and dagrun expected start
+================================================= ==============================================================================
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 6dcf49e..e823a72 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -16,7 +16,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
import datetime
import unittest
@@ -24,14 +23,16 @@ from parameterized import parameterized
from airflow import settings, models
from airflow.jobs import BackfillJob
-from airflow.models import DAG, DagRun, clear_task_instances
+from airflow.models import DAG, DagRun, clear_task_instances, DagModel
from airflow.models import TaskInstance as TI
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
+from airflow.settings import Stats
from airflow.utils import timezone
+from airflow.utils.dates import days_ago
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
-from tests.compat import mock
+from tests.compat import mock, call
from tests.models import DEFAULT_DATE
@@ -608,3 +609,51 @@ class DagRunTest(unittest.TestCase):
dagrun.verify_integrity()
task = dagrun.get_task_instances()[0]
assert task.queue == 'queue1'
+
+ @mock.patch.object(Stats, 'timing')
+ def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock):
+ """
+ Tests that dag scheduling delay stat is not called if the dagrun is not a scheduled run.
+ This case is manual run. Simple test for sanity check.
+ """
+ dag = DAG(dag_id='test_dagrun_stats', start_date=days_ago(1))
+ dag_task = DummyOperator(task_id='dummy', dag=dag)
+
+ initial_task_states = {
+ dag_task.task_id: State.SUCCESS,
+ }
+
+ dag_run = self.create_dag_run(dag=dag, state=State.RUNNING, task_states=initial_task_states)
+ dag_run.update_state()
+ self.assertNotIn(call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id)),
+ stats_mock.mock_calls)
+
+ @mock.patch.object(Stats, 'timing')
+ def test_emit_scheduling_delay(self, stats_mock):
+ """
+ Tests that dag scheduling delay stat is set properly once running scheduled dag.
+ dag_run.update_state() invokes the _emit_true_scheduling_delay_stats_for_finished_state method.
+ """
+ dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1))
+ dag_task = DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+ session = settings.Session()
+ orm_dag = DagModel(dag_id=dag.dag_id, is_active=True)
+ session.add(orm_dag)
+ session.flush()
+ dag_run = dag.create_dagrun(
+ run_id="test",
+ state=State.SUCCESS,
+ execution_date=dag.start_date,
+ start_date=dag.start_date,
+ session=session,
+ )
+ ti = dag_run.get_task_instance(dag_task.task_id)
+ ti.set_state(State.SUCCESS, session)
+ session.commit()
+ session.close()
+ dag_run.update_state()
+ true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date)).total_seconds()
+ stats_mock.assert_called()
+ sched_delay_stat_call = call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay)
+ self.assertIn(sched_delay_stat_call, stats_mock.mock_calls)