You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/11/11 19:26:20 UTC
[airflow] 01/03: Fix missing dagruns when ``catchup=True`` (#19528)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2aed7c1e994f0efed75f7c52d00d0c3a0946246d
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Nov 11 10:44:11 2021 +0100
Fix missing dagruns when ``catchup=True`` (#19528)
There's a bug that when the max_active_runs is reached, run dates could skip.
This PR fixes it
Closes: #19461
(cherry picked from commit 2bd4b55c53b593f2747a88f4c018d7e420460d9a)
---
airflow/jobs/scheduler_job.py | 25 ++++++++--------
airflow/models/dag.py | 1 +
tests/jobs/test_scheduler_job.py | 63 ++++++++++++++++++++++++++++++++++++++--
3 files changed, 74 insertions(+), 15 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2a230a7..44c4df7 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -914,26 +914,23 @@ class SchedulerJob(BaseJob):
creating_job_id=self.id,
)
active_runs_of_dags[dag.dag_id] += 1
- self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+ if self._should_update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):
+ dag_model.calculate_dagrun_date_fields(dag, data_interval)
# TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
# memory for larger dags? or expunge_all()
- def _update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> None:
- """
- Update the next_dagrun, next_dagrun_data_interval_start/end
- and next_dagrun_create_after for this dag.
- """
- if total_active_runs >= dag_model.max_active_runs:
+ def _should_update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> bool:
+ """Check if the dag's next_dagruns_create_after should be updated."""
+ if total_active_runs >= dag.max_active_runs:
self.log.info(
"DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
dag_model.dag_id,
total_active_runs,
- dag_model.max_active_runs,
+ dag.max_active_runs,
)
dag_model.next_dagrun_create_after = None
- else:
- data_interval = dag.get_next_data_interval(dag_model)
- dag_model.calculate_dagrun_date_fields(dag, data_interval)
+ return False
+ return True
def _start_queued_dagruns(
self,
@@ -1016,7 +1013,8 @@ class SchedulerJob(BaseJob):
self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
active_runs = dag.get_num_active_runs(only_running=False, session=session)
# Work out if we should allow creating a new DagRun now?
- self._update_dag_next_dagruns(dag, dag_model, active_runs)
+ if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
+ dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
callback_to_execute = DagCallbackRequest(
full_filepath=dag.fileloc,
@@ -1041,7 +1039,8 @@ class SchedulerJob(BaseJob):
if dag_run.state in State.finished:
active_runs = dag.get_num_active_runs(only_running=False, session=session)
# Work out if we should allow creating a new DagRun now?
- self._update_dag_next_dagruns(dag, dag_model, active_runs)
+ if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
+ dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
# This will do one query per dag run. We "could" build up a complex
# query to update all the TIs across all the execution dates and dag
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index d1c947f..4e6f6cd 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -629,6 +629,7 @@ class DAG(LoggingMixin):
data_interval = dag_model.next_dagrun_data_interval
if data_interval is not None:
return data_interval
+
# Compatibility: A run was scheduled without an explicit data interval.
# This means the run was scheduled before AIP-39 implementation. Try to
# infer from the logical date.
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 5380960..db05e7d 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1274,8 +1274,8 @@ class TestSchedulerJob:
session.flush()
session.refresh(dr)
assert dr.state == State.FAILED
- # check that next_dagrun has been updated by Schedulerjob._update_dag_next_dagruns
- assert dag_maker.dag_model.next_dagrun == dr.execution_date + timedelta(days=1)
+ # check that next_dagrun_create_after has been updated by calculate_dagrun_date_fields
+ assert dag_maker.dag_model.next_dagrun_create_after == dr.execution_date + timedelta(days=1)
# check that no running/queued runs yet
assert (
session.query(DagRun).filter(DagRun.state.in_([DagRunState.RUNNING, DagRunState.QUEUED])).count()
@@ -2871,6 +2871,65 @@ class TestSchedulerJob:
== 0
)
+ def test_max_active_runs_creation_phasing(self, dag_maker, session):
+ """
+ Test that when creating runs once max_active_runs is reached that the runs come in the right order
+ without gaps
+ """
+
+ def complete_one_dagrun():
+ ti = (
+ session.query(TaskInstance)
+ .join(TaskInstance.dag_run)
+ .filter(TaskInstance.state != State.SUCCESS)
+ .order_by(DagRun.execution_date)
+ .first()
+ )
+ if ti:
+ ti.state = State.SUCCESS
+ session.flush()
+
+ with dag_maker(max_active_runs=3, session=session) as dag:
+ # Need to use something that doesn't immediately get marked as success by the scheduler
+ BashOperator(task_id='task', bash_command='true')
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.executor = MockExecutor(do_update=True)
+ self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+ DagModel.dags_needing_dagruns(session).all()
+ for _ in range(3):
+ self.scheduler_job._do_scheduling(session)
+
+ model: DagModel = session.query(DagModel).get(dag.dag_id)
+
+ # Pre-condition
+ assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}
+
+ assert model.next_dagrun == timezone.convert_to_utc(
+ timezone.DateTime(
+ 2016,
+ 1,
+ 3,
+ )
+ )
+ assert model.next_dagrun_create_after is None
+
+ complete_one_dagrun()
+
+ assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}
+
+ for _ in range(5):
+ self.scheduler_job._do_scheduling(session)
+ complete_one_dagrun()
+ model: DagModel = session.query(DagModel).get(dag.dag_id)
+
+ expected_execution_dates = [datetime.datetime(2016, 1, d, tzinfo=timezone.utc) for d in range(1, 6)]
+ dagrun_execution_dates = [
+ dr.execution_date for dr in session.query(DagRun).order_by(DagRun.execution_date).all()
+ ]
+ assert dagrun_execution_dates == expected_execution_dates
+
def test_do_schedule_max_active_runs_and_manual_trigger(self, dag_maker):
"""
Make sure that when a DAG is already at max_active_runs, that manually triggered