You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/09/10 13:35:20 UTC

[airflow] branch v2-1-test updated: Limit the number of queued dagruns created by the Scheduler (#18065)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-1-test by this push:
     new 203b455  Limit the number of queued dagruns created by the Scheduler (#18065)
203b455 is described below

commit 203b455eb64c88f231d2fab694344da888cae8e8
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Sep 9 14:24:24 2021 +0100

    Limit the number of queued dagruns created by the Scheduler (#18065)
    
    There's no limit to the amount of queued dagruns to create currently
    and it has become a concern with issues raised against it. See #18023 and #17979
    
    Co-authored-by: Sam Wheating <sa...@gmail.com>
    (cherry picked from commit 0eb41b5952c2ce1884594c82bbf05835912b9812)
---
 airflow/config_templates/config.yml                |  8 ++++
 airflow/config_templates/default_airflow.cfg       |  4 ++
 airflow/jobs/scheduler_job.py                      | 21 ++++++++-
 ...26fe78_add_index_on_state_dag_id_for_queued_.py | 52 ++++++++++++++++++++++
 airflow/models/dagrun.py                           | 10 +++++
 docs/apache-airflow/migrations-ref.rst             |  4 +-
 tests/jobs/test_scheduler_job.py                   | 25 +++++++++++
 7 files changed, 122 insertions(+), 2 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 9945213..7abcb06 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -191,6 +191,14 @@
       type: string
       example: ~
       default: "16"
+    - name: max_queued_runs_per_dag
+      description: |
+        The maximum number of queued dagruns for a single DAG. The scheduler will not create more DAG runs
+        if it reaches the limit. This is not configurable at the DAG level.
+      version_added: 2.1.4
+      type: string
+      example: ~
+      default: "16"
     - name: load_examples
       description: |
         Whether to load the DAG examples that ship with Airflow. It's good to
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 03d5e1f..56a1d90 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -127,6 +127,10 @@ dags_are_paused_at_creation = True
 # which is defaulted as ``max_active_runs_per_dag``.
 max_active_runs_per_dag = 16
 
+# The maximum number of queued dagruns for a single DAG. The scheduler will not create more DAG runs
+# if it reaches the limit. This is not configurable at the DAG level.
+max_queued_runs_per_dag = 16
+
 # Whether to load the DAG examples that ship with Airflow. It's good to
 # get started, but you probably want to set this to ``False`` in a production
 # environment
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 8d5f888..45083a4 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -985,14 +985,31 @@ class SchedulerJob(BaseJob):
         existing_dagruns = (
             session.query(DagRun.dag_id, DagRun.execution_date).filter(existing_dagruns_filter).all()
         )
+        max_queued_dagruns = conf.getint('core', 'max_queued_runs_per_dag')
+
+        queued_runs_of_dags = defaultdict(
+            int,
+            session.query(DagRun.dag_id, func.count('*'))
+            .filter(  # We use `list` here because SQLA doesn't accept a set
+                # We use set to avoid duplicate dag_ids
+                DagRun.dag_id.in_(list({dm.dag_id for dm in dag_models})),
+                DagRun.state == State.QUEUED,
+            )
+            .group_by(DagRun.dag_id)
+            .all(),
+        )
 
         for dag_model in dag_models:
+            # Lets quickly check if we have exceeded the number of queued dagruns per dags
+            total_queued = queued_runs_of_dags[dag_model.dag_id]
+            if total_queued >= max_queued_dagruns:
+                continue
+
             try:
                 dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
             except SerializedDagNotFound:
                 self.log.exception("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
                 continue
-
             dag_hash = self.dagbag.dags_hash.get(dag.dag_id)
             # Explicitly check if the DagRun already exists. This is an edge case
             # where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after`
@@ -1003,6 +1020,7 @@ class SchedulerJob(BaseJob):
             # create a new one. This is so that in the next Scheduling loop we try to create new runs
             # instead of falling in a loop of Integrity Error.
             if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+
                 dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
@@ -1012,6 +1030,7 @@ class SchedulerJob(BaseJob):
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
+                queued_runs_of_dags[dag_model.dag_id] += 1
             dag_model.calculate_dagrun_date_fields(dag, dag_model.next_dagrun)
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
diff --git a/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py
new file mode 100644
index 0000000..7326d73
--- /dev/null
+++ b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add index on state, dag_id for queued dagrun
+
+Revision ID: ccde3e26fe78
+Revises: 092435bf5d12
+Create Date: 2021-09-08 16:35:34.867711
+
+"""
+
+from alembic import op
+from sqlalchemy import text
+
+# revision identifiers, used by Alembic.
+revision = 'ccde3e26fe78'
+down_revision = '092435bf5d12'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add index on state, dag_id for queued dagrun"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.create_index(
+            'idx_dag_run_queued_dags',
+            ["state", "dag_id"],
+            postgres_where=text("state='queued'"),
+            mssql_where=text("state='queued'"),
+            sqlite_where=text("state='queued'"),
+        )
+
+
+def downgrade():
+    """Unapply Add index on state, dag_id for queued dagrun"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.drop_index('idx_dag_run_queued_dags')
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index c27942b..1e5c2c1 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -107,6 +107,16 @@ class DagRun(Base, LoggingMixin):
             mssql_where=text("state='running'"),
             sqlite_where=text("state='running'"),
         ),
+        # since mysql lacks filtered/partial indices, this creates a
+        # duplicate index on mysql. Not the end of the world
+        Index(
+            'idx_dag_run_queued_dags',
+            'state',
+            'dag_id',
+            postgres_where=text("state='queued'"),
+            mssql_where=text("state='queued'"),
+            sqlite_where=text("state='queued'"),
+        ),
     )
 
     task_instances = relationship(
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index e049603..4eb98d2 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``092435bf5d12`` (head)        | ``97cdd93827b8`` | ``2.1.4``       | Add ``max_active_runs`` column to ``dag_model`` table                                 |
+| ``ccde3e26fe78`` (head)        | ``092435bf5d12`` | ``2.1.4``       | Add index on state, dag_id for queued ``dagrun``                                      |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``092435bf5d12``               | ``7b2661a43ba3`` | ``2.1.4``       | Add ``max_active_runs`` column to ``dag_model`` table                                 |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``97cdd93827b8``               | ``a13f7613ad25`` | ``2.1.3``       | Add ``queued_at`` column in ``dag_run`` table                                         |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 926a1fe..7953394 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1579,6 +1579,31 @@ class TestSchedulerJob(unittest.TestCase):
         self.scheduler_job.executor.end.assert_called_once()
         mock_processor_agent.return_value.end.reset_mock(side_effect=True)
 
+    def test_theres_limit_to_queued_dagruns_in_a_dag(self, dag_maker):
+        """This tests that there's limit to the number of queued dagrun scheduler can create in a dag"""
+        with dag_maker() as dag:
+            DummyOperator(task_id='mytask')
+
+        session = settings.Session()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor()
+        self.scheduler_job.processor_agent = mock.MagicMock()
+
+        self.scheduler_job.dagbag = dag_maker.dagbag
+
+        session = settings.Session()
+        orm_dag = session.query(DagModel).get(dag.dag_id)
+        assert orm_dag is not None
+        for _ in range(20):
+            self.scheduler_job._create_dag_runs([orm_dag], session)
+        assert session.query(DagRun).count() == 16
+
+        with conf_vars({('core', 'max_queued_runs_per_dag'): '5'}):
+            clear_db_runs()
+            for i in range(20):
+                self.scheduler_job._create_dag_runs([orm_dag], session)
+        assert session.query(DagRun).count() == 5
+
     def test_dagrun_timeout_verify_max_active_runs(self):
         """
         Test if a a dagrun will not be scheduled if max_dag_runs