You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2021/09/09 11:03:48 UTC

[airflow] branch main updated: Fix DagRun execution order from queued to running not being properly followed (#18061)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ebbe2b4  Fix DagRun execution order from queued to running not being properly followed (#18061)
ebbe2b4 is described below

commit ebbe2b4cafebe2b523ca08abd40145c3c7eec046
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Sep 9 12:03:33 2021 +0100

    Fix DagRun execution order from queued to running not being properly followed (#18061)
    
    We made a fix that resolved max_active_runs not allowing other dagruns to move to
    running state, see #17945 and introduced a bug that dagruns were not following the
    execution_date order when moving to running state.
    
    This PR fixes it by adding a 'max_active_runs` column in dagmodel. Also an extra test
    not connected with this change was added because I was able to trigger the bug while
    working on this
---
 airflow/jobs/scheduler_job.py                      |  3 +-
 ...5d12_add_max_active_runs_column_to_dagmodel_.py | 59 +++++++++++++
 airflow/models/dag.py                              |  4 +
 airflow/models/dagrun.py                           | 41 ++++++++-
 docs/apache-airflow/migrations-ref.rst             |  4 +-
 tests/jobs/test_scheduler_job.py                   | 96 ++++++++++++++++++++++
 tests/models/test_dag.py                           | 20 +++++
 7 files changed, 221 insertions(+), 6 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index d7239a8..4d8a704 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -887,14 +887,15 @@ class SchedulerJob(BaseJob):
                 )
 
         for dag_run in dag_runs:
+
             try:
                 dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
             except SerializedDagNotFound:
                 self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
                 continue
             active_runs = active_runs_of_dags[dag_run.dag_id]
+
             if dag.max_active_runs and active_runs >= dag.max_active_runs:
-                dag_run.last_scheduling_decision = timezone.utcnow()
                 self.log.debug(
                     "DAG %s already has %d active runs, not moving any more runs to RUNNING state %s",
                     dag.dag_id,
diff --git a/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py b/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py
new file mode 100644
index 0000000..c1a6e19
--- /dev/null
+++ b/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add max_active_runs column to dagmodel table
+
+Revision ID: 092435bf5d12
+Revises: 142555e44c17
+Create Date: 2021-09-06 21:29:24.728923
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import text
+
+# revision identifiers, used by Alembic.
+revision = '092435bf5d12'
+down_revision = '7b2661a43ba3'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add max_active_runs column to dagmodel table"""
+    op.add_column('dag', sa.Column('max_active_runs', sa.Integer(), nullable=True))
+    with op.batch_alter_table('dag_run', schema=None) as batch_op:
+        # Add index to dag_run.dag_id and also add index to dag_run.state where state==running
+        batch_op.create_index('idx_dag_run_dag_id', ['dag_id'])
+        batch_op.create_index(
+            'idx_dag_run_running_dags',
+            ["state", "dag_id"],
+            postgres_where=text("state='running'"),
+            mssql_where=text("state='running'"),
+            sqlite_where=text("state='running'"),
+        )
+
+
+def downgrade():
+    """Unapply Add max_active_runs column to dagmodel table"""
+    op.drop_column('dag', 'max_active_runs')
+    with op.batch_alter_table('dag_run', schema=None) as batch_op:
+        # Drop index to dag_run.dag_id and also drop index to dag_run.state where state==running
+        batch_op.drop_index('idx_dag_run_dag_id')
+        batch_op.drop_index('idx_dag_run_running_dags')
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 7743399..dc15d21 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2263,6 +2263,7 @@ class DAG(LoggingMixin):
             orm_dag.description = dag.description
             orm_dag.schedule_interval = dag.schedule_interval
             orm_dag.max_active_tasks = dag.max_active_tasks
+            orm_dag.max_active_runs = dag.max_active_runs
             orm_dag.has_task_concurrency_limits = any(t.max_active_tis_per_dag is not None for t in dag.tasks)
 
             orm_dag.calculate_dagrun_date_fields(
@@ -2493,6 +2494,7 @@ class DagModel(Base):
     tags = relationship('DagTag', cascade='all,delete-orphan', backref=backref('dag'))
 
     max_active_tasks = Column(Integer, nullable=False)
+    max_active_runs = Column(Integer, nullable=True)
 
     has_task_concurrency_limits = Column(Boolean, nullable=False)
 
@@ -2529,6 +2531,8 @@ class DagModel(Base):
                 self.max_active_tasks = concurrency
             else:
                 self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
+        if self.max_active_runs is None:
+            self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
         if self.has_task_concurrency_limits is None:
             # Be safe -- this will be updated later once the DAG is parsed
             self.has_task_concurrency_limits = True
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index e8f5f98..aa32fdc 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -19,7 +19,19 @@ import warnings
 from datetime import datetime
 from typing import TYPE_CHECKING, Any, Iterable, List, NamedTuple, Optional, Tuple, Union
 
-from sqlalchemy import Boolean, Column, Index, Integer, PickleType, String, UniqueConstraint, and_, func, or_
+from sqlalchemy import (
+    Boolean,
+    Column,
+    Index,
+    Integer,
+    PickleType,
+    String,
+    UniqueConstraint,
+    and_,
+    func,
+    or_,
+    text,
+)
 from sqlalchemy.exc import IntegrityError
 from sqlalchemy.ext.declarative import declared_attr
 from sqlalchemy.orm import joinedload, relationship, synonym
@@ -91,6 +103,15 @@ class DagRun(Base, LoggingMixin):
         UniqueConstraint('dag_id', 'execution_date', name='dag_run_dag_id_execution_date_key'),
         UniqueConstraint('dag_id', 'run_id', name='dag_run_dag_id_run_id_key'),
         Index('idx_last_scheduling_decision', last_scheduling_decision),
+        Index('idx_dag_run_dag_id', dag_id),
+        Index(
+            'idx_dag_run_running_dags',
+            'state',
+            'dag_id',
+            postgres_where=text("state='running'"),
+            mssql_where=text("state='running'"),
+            sqlite_where=text("state='running'"),
+        ),
     )
 
     task_instances = relationship(TI, back_populates="dag_run")
@@ -207,10 +228,22 @@ class DagRun(Base, LoggingMixin):
                 DagModel.is_paused == expression.false(),
                 DagModel.is_active == expression.true(),
             )
-            .order_by(
-                nulls_first(cls.last_scheduling_decision, session=session),
-                cls.execution_date,
+        )
+        if state == State.QUEUED:
+            # For dag runs in the queued state, we check if they have reached the max_active_runs limit
+            # and if so we drop them
+            running_drs = (
+                session.query(DagRun.dag_id, func.count(DagRun.state).label('num_running'))
+                .filter(DagRun.state == DagRunState.RUNNING)
+                .group_by(DagRun.dag_id)
+                .subquery()
+            )
+            query = query.outerjoin(running_drs, running_drs.c.dag_id == DagRun.dag_id).filter(
+                func.coalesce(running_drs.c.num_running, 0) < DagModel.max_active_runs
             )
+        query = query.order_by(
+            nulls_first(cls.last_scheduling_decision, session=session),
+            cls.execution_date,
         )
 
         if not settings.ALLOW_FUTURE_EXEC_DATES:
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 052d61a..a5446b2 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``7b2661a43ba3`` (head)        | ``142555e44c17`` |                 | Change TaskInstance and TaskReschedule tables from execution_date to run_id.          |
+| ``092435bf5d12`` (head)        | ``7b2661a43ba3`` |                 | Add ``max_active_runs`` column to ``dag_model`` table                                 |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``7b2661a43ba3``               | ``142555e44c17`` |                 | Change TaskInstance and TaskReschedule tables from execution_date to run_id.          |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``142555e44c17``               | ``54bebd308c5f`` |                 | Add ``data_interval_start`` and ``data_interval_end`` to ``DagRun``                   |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index dfcb67e..ddeaf5f 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2689,6 +2689,102 @@ class TestSchedulerJob:
         )
         assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 11
 
+    def test_start_queued_dagruns_do_follow_execution_date_order(self, dag_maker):
+        session = settings.Session()
+        with dag_maker('test_dag1', max_active_runs=1) as dag:
+            DummyOperator(task_id='mytask')
+        date = dag.following_schedule(DEFAULT_DATE)
+        for i in range(30):
+            dr = dag_maker.create_dagrun(
+                run_id=f'dagrun_{i}', run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date
+            )
+            date = dr.execution_date + timedelta(hours=1)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=False)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        self.scheduler_job._start_queued_dagruns(session)
+        session.flush()
+        dr = DagRun.find(run_id='dagrun_0')
+        ti = dr[0].get_task_instance(task_id='mytask', session=session)
+        ti.state = State.SUCCESS
+        session.merge(ti)
+        session.commit()
+        assert dr[0].state == State.RUNNING
+        dr[0].state = State.SUCCESS
+        session.merge(dr[0])
+        session.flush()
+        assert dr[0].state == State.SUCCESS
+        self.scheduler_job._start_queued_dagruns(session)
+        session.flush()
+        dr = DagRun.find(run_id='dagrun_1')
+        assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 1
+
+        assert dr[0].state == State.RUNNING
+
+    def test_no_dagruns_would_stuck_in_running(self, dag_maker):
+        # Test that running dagruns are not stuck in running.
+        # Create one dagrun in 'running' state and 1 in 'queued' state from one dag(max_active_runs=1)
+        # Create 16 dagruns in 'running' state and 16 in 'queued' state from another dag
+        # Create 16 dagruns in 'running' state and 16 in 'queued' state from yet another dag
+        # Finish the task of the first dag, and check that another dagrun starts running
+        # from the first dag.
+
+        session = settings.Session()
+        # first dag and dagruns
+        date = timezone.datetime(2016, 1, 1)
+        with dag_maker('test_dagrun_states_are_correct_1', max_active_runs=1, start_date=date) as dag:
+            task1 = DummyOperator(task_id='dummy_task')
+
+        dr1_running = dag_maker.create_dagrun(run_id='dr1_run_1', execution_date=date)
+        dag_maker.create_dagrun(
+            run_id='dr1_run_2',
+            state=State.QUEUED,
+            execution_date=dag.following_schedule(dr1_running.execution_date),
+        )
+        # second dag and dagruns
+        date = timezone.datetime(2020, 1, 1)
+        with dag_maker('test_dagrun_states_are_correct_2', start_date=date) as dag:
+            DummyOperator(task_id='dummy_task')
+        for i in range(16):
+            dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.RUNNING, execution_date=date)
+            date = dr.execution_date + timedelta(hours=1)
+        dr16 = DagRun.find(run_id='dr2_run_16')
+        date = dr16[0].execution_date + timedelta(hours=1)
+        for i in range(16, 32):
+            dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date)
+            date = dr.execution_date + timedelta(hours=1)
+
+        # third dag and dagruns
+        date = timezone.datetime(2021, 1, 1)
+        with dag_maker('test_dagrun_states_are_correct_3', start_date=date) as dag:
+            DummyOperator(task_id='dummy_task')
+        for i in range(16):
+            dr = dag_maker.create_dagrun(run_id=f'dr3_run_{i+1}', state=State.RUNNING, execution_date=date)
+            date = dr.execution_date + timedelta(hours=1)
+        dr16 = DagRun.find(run_id='dr3_run_16')
+        date = dr16[0].execution_date + timedelta(hours=1)
+        for i in range(16, 32):
+            dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date)
+            date = dr.execution_date + timedelta(hours=1)
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=False)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        ti = TaskInstance(task=task1, execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        ti.state = State.SUCCESS
+        session.merge(ti)
+        session.flush()
+        # Run the scheduler loop
+        with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
+            self.scheduler_job._do_scheduling(session)
+            self.scheduler_job._do_scheduling(session)
+
+        assert DagRun.find(run_id='dr1_run_1')[0].state == State.SUCCESS
+        assert DagRun.find(run_id='dr1_run_2')[0].state == State.RUNNING
+
     @pytest.mark.parametrize(
         "state, start_date, end_date",
         [
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index c6d54ed..61cbcee 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1759,6 +1759,26 @@ class TestDagModel:
         session.rollback()
         session.close()
 
+    def test_max_active_runs_not_none(self):
+        dag = DAG(dag_id='test_max_active_runs_not_none', start_date=timezone.datetime(2038, 1, 1))
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=None,
+            next_dagrun_create_after=None,
+            is_active=True,
+        )
+        session.add(orm_dag)
+        session.flush()
+
+        assert orm_dag.max_active_runs is not None
+
+        session.rollback()
+        session.close()
+
     def test_dags_needing_dagruns_only_unpaused(self):
         """
         We should never create dagruns for unpaused DAGs