You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/11/11 19:26:20 UTC

[airflow] 01/03: Fix missing dagruns when ``catchup=True`` (#19528)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2aed7c1e994f0efed75f7c52d00d0c3a0946246d
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Nov 11 10:44:11 2021 +0100

    Fix missing dagruns when ``catchup=True`` (#19528)
    
    There's a bug that when the max_active_runs is reached, run dates could skip.
    This PR fixes it
    
    Closes: #19461
    (cherry picked from commit 2bd4b55c53b593f2747a88f4c018d7e420460d9a)
---
 airflow/jobs/scheduler_job.py    | 25 ++++++++--------
 airflow/models/dag.py            |  1 +
 tests/jobs/test_scheduler_job.py | 63 ++++++++++++++++++++++++++++++++++++++--
 3 files changed, 74 insertions(+), 15 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2a230a7..44c4df7 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -914,26 +914,23 @@ class SchedulerJob(BaseJob):
                     creating_job_id=self.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):
+                dag_model.calculate_dagrun_date_fields(dag, data_interval)
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> None:
-        """
-        Update the next_dagrun, next_dagrun_data_interval_start/end
-        and next_dagrun_create_after for this dag.
-        """
-        if total_active_runs >= dag_model.max_active_runs:
+    def _should_update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> bool:
+        """Check if the dag's next_dagruns_create_after should be updated."""
+        if total_active_runs >= dag.max_active_runs:
             self.log.info(
                 "DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
                 dag_model.dag_id,
                 total_active_runs,
-                dag_model.max_active_runs,
+                dag.max_active_runs,
             )
             dag_model.next_dagrun_create_after = None
-        else:
-            data_interval = dag.get_next_data_interval(dag_model)
-            dag_model.calculate_dagrun_date_fields(dag, data_interval)
+            return False
+        return True
 
     def _start_queued_dagruns(
         self,
@@ -1016,7 +1013,8 @@ class SchedulerJob(BaseJob):
             self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
             active_runs = dag.get_num_active_runs(only_running=False, session=session)
             # Work out if we should allow creating a new DagRun now?
-            self._update_dag_next_dagruns(dag, dag_model, active_runs)
+            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
+                dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
 
             callback_to_execute = DagCallbackRequest(
                 full_filepath=dag.fileloc,
@@ -1041,7 +1039,8 @@ class SchedulerJob(BaseJob):
         if dag_run.state in State.finished:
             active_runs = dag.get_num_active_runs(only_running=False, session=session)
             # Work out if we should allow creating a new DagRun now?
-            self._update_dag_next_dagruns(dag, dag_model, active_runs)
+            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
+                dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
 
         # This will do one query per dag run. We "could" build up a complex
         # query to update all the TIs across all the execution dates and dag
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index d1c947f..4e6f6cd 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -629,6 +629,7 @@ class DAG(LoggingMixin):
         data_interval = dag_model.next_dagrun_data_interval
         if data_interval is not None:
             return data_interval
+
         # Compatibility: A run was scheduled without an explicit data interval.
         # This means the run was scheduled before AIP-39 implementation. Try to
         # infer from the logical date.
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 5380960..db05e7d 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1274,8 +1274,8 @@ class TestSchedulerJob:
         session.flush()
         session.refresh(dr)
         assert dr.state == State.FAILED
-        # check that next_dagrun has been updated by Schedulerjob._update_dag_next_dagruns
-        assert dag_maker.dag_model.next_dagrun == dr.execution_date + timedelta(days=1)
+        # check that next_dagrun_create_after has been updated by calculate_dagrun_date_fields
+        assert dag_maker.dag_model.next_dagrun_create_after == dr.execution_date + timedelta(days=1)
         # check that no running/queued runs yet
         assert (
             session.query(DagRun).filter(DagRun.state.in_([DagRunState.RUNNING, DagRunState.QUEUED])).count()
@@ -2871,6 +2871,65 @@ class TestSchedulerJob:
             == 0
         )
 
+    def test_max_active_runs_creation_phasing(self, dag_maker, session):
+        """
+        Test that when creating runs once max_active_runs is reached that the runs come in the right order
+        without gaps
+        """
+
+        def complete_one_dagrun():
+            ti = (
+                session.query(TaskInstance)
+                .join(TaskInstance.dag_run)
+                .filter(TaskInstance.state != State.SUCCESS)
+                .order_by(DagRun.execution_date)
+                .first()
+            )
+            if ti:
+                ti.state = State.SUCCESS
+                session.flush()
+
+        with dag_maker(max_active_runs=3, session=session) as dag:
+            # Need to use something that doesn't immediately get marked as success by the scheduler
+            BashOperator(task_id='task', bash_command='true')
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=True)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        DagModel.dags_needing_dagruns(session).all()
+        for _ in range(3):
+            self.scheduler_job._do_scheduling(session)
+
+        model: DagModel = session.query(DagModel).get(dag.dag_id)
+
+        # Pre-condition
+        assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}
+
+        assert model.next_dagrun == timezone.convert_to_utc(
+            timezone.DateTime(
+                2016,
+                1,
+                3,
+            )
+        )
+        assert model.next_dagrun_create_after is None
+
+        complete_one_dagrun()
+
+        assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}
+
+        for _ in range(5):
+            self.scheduler_job._do_scheduling(session)
+            complete_one_dagrun()
+            model: DagModel = session.query(DagModel).get(dag.dag_id)
+
+        expected_execution_dates = [datetime.datetime(2016, 1, d, tzinfo=timezone.utc) for d in range(1, 6)]
+        dagrun_execution_dates = [
+            dr.execution_date for dr in session.query(DagRun).order_by(DagRun.execution_date).all()
+        ]
+        assert dagrun_execution_dates == expected_execution_dates
+
     def test_do_schedule_max_active_runs_and_manual_trigger(self, dag_maker):
         """
         Make sure that when a DAG is already at max_active_runs, that manually triggered