You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2021/09/09 11:03:48 UTC
[airflow] branch main updated: Fix DagRun execution order from
queued to running not being properly followed (#18061)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ebbe2b4 Fix DagRun execution order from queued to running not being properly followed (#18061)
ebbe2b4 is described below
commit ebbe2b4cafebe2b523ca08abd40145c3c7eec046
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Sep 9 12:03:33 2021 +0100
Fix DagRun execution order from queued to running not being properly followed (#18061)
We made a fix that resolved max_active_runs not allowing other dagruns to move to
running state, see #17945 and introduced a bug that dagruns were not following the
execution_date order when moving to running state.
This PR fixes it by adding a 'max_active_runs` column in dagmodel. Also an extra test
not connected with this change was added because I was able to trigger the bug while
working on this
---
airflow/jobs/scheduler_job.py | 3 +-
...5d12_add_max_active_runs_column_to_dagmodel_.py | 59 +++++++++++++
airflow/models/dag.py | 4 +
airflow/models/dagrun.py | 41 ++++++++-
docs/apache-airflow/migrations-ref.rst | 4 +-
tests/jobs/test_scheduler_job.py | 96 ++++++++++++++++++++++
tests/models/test_dag.py | 20 +++++
7 files changed, 221 insertions(+), 6 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index d7239a8..4d8a704 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -887,14 +887,15 @@ class SchedulerJob(BaseJob):
)
for dag_run in dag_runs:
+
try:
dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
except SerializedDagNotFound:
self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
active_runs = active_runs_of_dags[dag_run.dag_id]
+
if dag.max_active_runs and active_runs >= dag.max_active_runs:
- dag_run.last_scheduling_decision = timezone.utcnow()
self.log.debug(
"DAG %s already has %d active runs, not moving any more runs to RUNNING state %s",
dag.dag_id,
diff --git a/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py b/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py
new file mode 100644
index 0000000..c1a6e19
--- /dev/null
+++ b/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add max_active_runs column to dagmodel table
+
+Revision ID: 092435bf5d12
+Revises: 142555e44c17
+Create Date: 2021-09-06 21:29:24.728923
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import text
+
+# revision identifiers, used by Alembic.
+revision = '092435bf5d12'
+down_revision = '7b2661a43ba3'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply Add max_active_runs column to dagmodel table"""
+ op.add_column('dag', sa.Column('max_active_runs', sa.Integer(), nullable=True))
+ with op.batch_alter_table('dag_run', schema=None) as batch_op:
+ # Add index to dag_run.dag_id and also add index to dag_run.state where state==running
+ batch_op.create_index('idx_dag_run_dag_id', ['dag_id'])
+ batch_op.create_index(
+ 'idx_dag_run_running_dags',
+ ["state", "dag_id"],
+ postgres_where=text("state='running'"),
+ mssql_where=text("state='running'"),
+ sqlite_where=text("state='running'"),
+ )
+
+
+def downgrade():
+ """Unapply Add max_active_runs column to dagmodel table"""
+ op.drop_column('dag', 'max_active_runs')
+ with op.batch_alter_table('dag_run', schema=None) as batch_op:
+ # Drop index to dag_run.dag_id and also drop index to dag_run.state where state==running
+ batch_op.drop_index('idx_dag_run_dag_id')
+ batch_op.drop_index('idx_dag_run_running_dags')
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 7743399..dc15d21 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2263,6 +2263,7 @@ class DAG(LoggingMixin):
orm_dag.description = dag.description
orm_dag.schedule_interval = dag.schedule_interval
orm_dag.max_active_tasks = dag.max_active_tasks
+ orm_dag.max_active_runs = dag.max_active_runs
orm_dag.has_task_concurrency_limits = any(t.max_active_tis_per_dag is not None for t in dag.tasks)
orm_dag.calculate_dagrun_date_fields(
@@ -2493,6 +2494,7 @@ class DagModel(Base):
tags = relationship('DagTag', cascade='all,delete-orphan', backref=backref('dag'))
max_active_tasks = Column(Integer, nullable=False)
+ max_active_runs = Column(Integer, nullable=True)
has_task_concurrency_limits = Column(Boolean, nullable=False)
@@ -2529,6 +2531,8 @@ class DagModel(Base):
self.max_active_tasks = concurrency
else:
self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
+ if self.max_active_runs is None:
+ self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
if self.has_task_concurrency_limits is None:
# Be safe -- this will be updated later once the DAG is parsed
self.has_task_concurrency_limits = True
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index e8f5f98..aa32fdc 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -19,7 +19,19 @@ import warnings
from datetime import datetime
from typing import TYPE_CHECKING, Any, Iterable, List, NamedTuple, Optional, Tuple, Union
-from sqlalchemy import Boolean, Column, Index, Integer, PickleType, String, UniqueConstraint, and_, func, or_
+from sqlalchemy import (
+ Boolean,
+ Column,
+ Index,
+ Integer,
+ PickleType,
+ String,
+ UniqueConstraint,
+ and_,
+ func,
+ or_,
+ text,
+)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import joinedload, relationship, synonym
@@ -91,6 +103,15 @@ class DagRun(Base, LoggingMixin):
UniqueConstraint('dag_id', 'execution_date', name='dag_run_dag_id_execution_date_key'),
UniqueConstraint('dag_id', 'run_id', name='dag_run_dag_id_run_id_key'),
Index('idx_last_scheduling_decision', last_scheduling_decision),
+ Index('idx_dag_run_dag_id', dag_id),
+ Index(
+ 'idx_dag_run_running_dags',
+ 'state',
+ 'dag_id',
+ postgres_where=text("state='running'"),
+ mssql_where=text("state='running'"),
+ sqlite_where=text("state='running'"),
+ ),
)
task_instances = relationship(TI, back_populates="dag_run")
@@ -207,10 +228,22 @@ class DagRun(Base, LoggingMixin):
DagModel.is_paused == expression.false(),
DagModel.is_active == expression.true(),
)
- .order_by(
- nulls_first(cls.last_scheduling_decision, session=session),
- cls.execution_date,
+ )
+ if state == State.QUEUED:
+ # For dag runs in the queued state, we check if they have reached the max_active_runs limit
+ # and if so we drop them
+ running_drs = (
+ session.query(DagRun.dag_id, func.count(DagRun.state).label('num_running'))
+ .filter(DagRun.state == DagRunState.RUNNING)
+ .group_by(DagRun.dag_id)
+ .subquery()
+ )
+ query = query.outerjoin(running_drs, running_drs.c.dag_id == DagRun.dag_id).filter(
+ func.coalesce(running_drs.c.num_running, 0) < DagModel.max_active_runs
)
+ query = query.order_by(
+ nulls_first(cls.last_scheduling_decision, session=session),
+ cls.execution_date,
)
if not settings.ALLOW_FUTURE_EXEC_DATES:
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 052d61a..a5446b2 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``7b2661a43ba3`` (head) | ``142555e44c17`` | | Change TaskInstance and TaskReschedule tables from execution_date to run_id. |
+| ``092435bf5d12`` (head) | ``7b2661a43ba3`` | | Add ``max_active_runs`` column to ``dag_model`` table |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``7b2661a43ba3`` | ``142555e44c17`` | | Change TaskInstance and TaskReschedule tables from execution_date to run_id. |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``142555e44c17`` | ``54bebd308c5f`` | | Add ``data_interval_start`` and ``data_interval_end`` to ``DagRun`` |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index dfcb67e..ddeaf5f 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2689,6 +2689,102 @@ class TestSchedulerJob:
)
assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 11
+ def test_start_queued_dagruns_do_follow_execution_date_order(self, dag_maker):
+ session = settings.Session()
+ with dag_maker('test_dag1', max_active_runs=1) as dag:
+ DummyOperator(task_id='mytask')
+ date = dag.following_schedule(DEFAULT_DATE)
+ for i in range(30):
+ dr = dag_maker.create_dagrun(
+ run_id=f'dagrun_{i}', run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date
+ )
+ date = dr.execution_date + timedelta(hours=1)
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.executor = MockExecutor(do_update=False)
+ self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+ self.scheduler_job._start_queued_dagruns(session)
+ session.flush()
+ dr = DagRun.find(run_id='dagrun_0')
+ ti = dr[0].get_task_instance(task_id='mytask', session=session)
+ ti.state = State.SUCCESS
+ session.merge(ti)
+ session.commit()
+ assert dr[0].state == State.RUNNING
+ dr[0].state = State.SUCCESS
+ session.merge(dr[0])
+ session.flush()
+ assert dr[0].state == State.SUCCESS
+ self.scheduler_job._start_queued_dagruns(session)
+ session.flush()
+ dr = DagRun.find(run_id='dagrun_1')
+ assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 1
+
+ assert dr[0].state == State.RUNNING
+
+ def test_no_dagruns_would_stuck_in_running(self, dag_maker):
+ # Test that running dagruns are not stuck in running.
+ # Create one dagrun in 'running' state and 1 in 'queued' state from one dag(max_active_runs=1)
+ # Create 16 dagruns in 'running' state and 16 in 'queued' state from another dag
+ # Create 16 dagruns in 'running' state and 16 in 'queued' state from yet another dag
+ # Finish the task of the first dag, and check that another dagrun starts running
+ # from the first dag.
+
+ session = settings.Session()
+ # first dag and dagruns
+ date = timezone.datetime(2016, 1, 1)
+ with dag_maker('test_dagrun_states_are_correct_1', max_active_runs=1, start_date=date) as dag:
+ task1 = DummyOperator(task_id='dummy_task')
+
+ dr1_running = dag_maker.create_dagrun(run_id='dr1_run_1', execution_date=date)
+ dag_maker.create_dagrun(
+ run_id='dr1_run_2',
+ state=State.QUEUED,
+ execution_date=dag.following_schedule(dr1_running.execution_date),
+ )
+ # second dag and dagruns
+ date = timezone.datetime(2020, 1, 1)
+ with dag_maker('test_dagrun_states_are_correct_2', start_date=date) as dag:
+ DummyOperator(task_id='dummy_task')
+ for i in range(16):
+ dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.RUNNING, execution_date=date)
+ date = dr.execution_date + timedelta(hours=1)
+ dr16 = DagRun.find(run_id='dr2_run_16')
+ date = dr16[0].execution_date + timedelta(hours=1)
+ for i in range(16, 32):
+ dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date)
+ date = dr.execution_date + timedelta(hours=1)
+
+ # third dag and dagruns
+ date = timezone.datetime(2021, 1, 1)
+ with dag_maker('test_dagrun_states_are_correct_3', start_date=date) as dag:
+ DummyOperator(task_id='dummy_task')
+ for i in range(16):
+ dr = dag_maker.create_dagrun(run_id=f'dr3_run_{i+1}', state=State.RUNNING, execution_date=date)
+ date = dr.execution_date + timedelta(hours=1)
+ dr16 = DagRun.find(run_id='dr3_run_16')
+ date = dr16[0].execution_date + timedelta(hours=1)
+ for i in range(16, 32):
+ dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date)
+ date = dr.execution_date + timedelta(hours=1)
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.executor = MockExecutor(do_update=False)
+ self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+ ti = TaskInstance(task=task1, execution_date=DEFAULT_DATE)
+ ti.refresh_from_db()
+ ti.state = State.SUCCESS
+ session.merge(ti)
+ session.flush()
+ # Run the scheduler loop
+ with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
+ self.scheduler_job._do_scheduling(session)
+ self.scheduler_job._do_scheduling(session)
+
+ assert DagRun.find(run_id='dr1_run_1')[0].state == State.SUCCESS
+ assert DagRun.find(run_id='dr1_run_2')[0].state == State.RUNNING
+
@pytest.mark.parametrize(
"state, start_date, end_date",
[
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index c6d54ed..61cbcee 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1759,6 +1759,26 @@ class TestDagModel:
session.rollback()
session.close()
+ def test_max_active_runs_not_none(self):
+ dag = DAG(dag_id='test_max_active_runs_not_none', start_date=timezone.datetime(2038, 1, 1))
+ DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+ session = settings.Session()
+ orm_dag = DagModel(
+ dag_id=dag.dag_id,
+ has_task_concurrency_limits=False,
+ next_dagrun=None,
+ next_dagrun_create_after=None,
+ is_active=True,
+ )
+ session.add(orm_dag)
+ session.flush()
+
+ assert orm_dag.max_active_runs is not None
+
+ session.rollback()
+ session.close()
+
def test_dags_needing_dagruns_only_unpaused(self):
"""
We should never create dagruns for unpaused DAGs