You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/04/01 13:47:23 UTC

[airflow] 08/11: Re-introduce dagrun.schedule_delay metric (#15105)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f649b8a22b9bf109fe0355af6937324d69191f34
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Mar 31 13:38:14 2021 +0100

    Re-introduce dagrun.schedule_delay metric (#15105)
    
    This was mistakenly removed in the HA scheduler refactor work.
    
    It is now added back, and has tests this time so we will notice if it
    breaks in future.
    
    By using freezegun we can assert the _exact_ of the metric emitted to
    make sure it also has the correct value without introducing in
    timing-based flakiness.
    
    (cherry picked from commit ca4c4f3d343dea0a034546a896072b9c87244e71)
---
 airflow/jobs/scheduler_job.py    | 10 +++++++++-
 tests/jobs/test_scheduler_job.py | 19 ++++++++++++++++---
 2 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index e380512..1076cb6 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1637,7 +1637,7 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
             # create a new one. This is so that in the next Scheduling loop we try to create new runs
             # instead of falling in a loop of Integrity Error.
             if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                dag.create_dagrun(
+                run = dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
                     start_date=timezone.utcnow(),
@@ -1648,6 +1648,14 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
                     creating_job_id=self.id,
                 )
 
+                expected_start_date = dag.following_schedule(run.execution_date)
+                if expected_start_date:
+                    schedule_delay = run.start_date - expected_start_date
+                    Stats.timing(
+                        f'dagrun.schedule_delay.{dag.dag_id}',
+                        schedule_delay,
+                    )
+
         self._update_dag_next_dagruns(dag_models, session)
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 9347fa4..7a9e273 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -30,6 +30,7 @@ from zipfile import ZipFile
 
 import psutil
 import pytest
+from freezegun import freeze_time
 from parameterized import parameterized
 from sqlalchemy import func
 
@@ -3643,8 +3644,16 @@ class TestSchedulerJob(unittest.TestCase):
                 full_filepath=dag.fileloc, dag_id=dag_id
             )
 
-    def test_scheduler_sets_job_id_on_dag_run(self):
-        dag = DAG(dag_id='test_scheduler_sets_job_id_on_dag_run', start_date=DEFAULT_DATE)
+    @freeze_time(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9))
+    @mock.patch('airflow.jobs.scheduler_job.Stats.timing')
+    def test_create_dag_runs(self, stats_timing):
+        """
+        Test various invariants of _create_dag_runs.
+
+        - That the run created has the creating_job_id set
+        - That we emit the right DagRun metrics
+        """
+        dag = DAG(dag_id='test_create_dag_runs', start_date=DEFAULT_DATE)
 
         DummyOperator(
             task_id='dummy',
@@ -3652,7 +3661,7 @@ class TestSchedulerJob(unittest.TestCase):
         )
 
         dagbag = DagBag(
-            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            dag_folder=os.devnull,
             include_examples=False,
             read_dags_from_db=True,
         )
@@ -3666,6 +3675,10 @@ class TestSchedulerJob(unittest.TestCase):
         with create_session() as session:
             self.scheduler_job._create_dag_runs([dag_model], session)
 
+        stats_timing.assert_called_once_with(
+            "dagrun.schedule_delay.test_create_dag_runs", datetime.timedelta(seconds=9)
+        )
+
         assert dag.get_last_dagrun().creating_job_id == self.scheduler_job.id
 
     def test_extra_operator_links_not_loaded_in_scheduler_loop(self):