You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/09/10 13:35:20 UTC
[airflow] branch v2-1-test updated: Limit the number of queued
dagruns created by the Scheduler (#18065)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-1-test by this push:
new 203b455 Limit the number of queued dagruns created by the Scheduler (#18065)
203b455 is described below
commit 203b455eb64c88f231d2fab694344da888cae8e8
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Sep 9 14:24:24 2021 +0100
Limit the number of queued dagruns created by the Scheduler (#18065)
There's no limit to the amount of queued dagruns to create currently
and it has become a concern with issues raised against it. See #18023 and #17979
Co-authored-by: Sam Wheating <sa...@gmail.com>
(cherry picked from commit 0eb41b5952c2ce1884594c82bbf05835912b9812)
---
airflow/config_templates/config.yml | 8 ++++
airflow/config_templates/default_airflow.cfg | 4 ++
airflow/jobs/scheduler_job.py | 21 ++++++++-
...26fe78_add_index_on_state_dag_id_for_queued_.py | 52 ++++++++++++++++++++++
airflow/models/dagrun.py | 10 +++++
docs/apache-airflow/migrations-ref.rst | 4 +-
tests/jobs/test_scheduler_job.py | 25 +++++++++++
7 files changed, 122 insertions(+), 2 deletions(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 9945213..7abcb06 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -191,6 +191,14 @@
type: string
example: ~
default: "16"
+ - name: max_queued_runs_per_dag
+ description: |
+ The maximum number of queued dagruns for a single DAG. The scheduler will not create more DAG runs
+ if it reaches the limit. This is not configurable at the DAG level.
+ version_added: 2.1.4
+ type: string
+ example: ~
+ default: "16"
- name: load_examples
description: |
Whether to load the DAG examples that ship with Airflow. It's good to
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 03d5e1f..56a1d90 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -127,6 +127,10 @@ dags_are_paused_at_creation = True
# which is defaulted as ``max_active_runs_per_dag``.
max_active_runs_per_dag = 16
+# The maximum number of queued dagruns for a single DAG. The scheduler will not create more DAG runs
+# if it reaches the limit. This is not configurable at the DAG level.
+max_queued_runs_per_dag = 16
+
# Whether to load the DAG examples that ship with Airflow. It's good to
# get started, but you probably want to set this to ``False`` in a production
# environment
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 8d5f888..45083a4 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -985,14 +985,31 @@ class SchedulerJob(BaseJob):
existing_dagruns = (
session.query(DagRun.dag_id, DagRun.execution_date).filter(existing_dagruns_filter).all()
)
+ max_queued_dagruns = conf.getint('core', 'max_queued_runs_per_dag')
+
+ queued_runs_of_dags = defaultdict(
+ int,
+ session.query(DagRun.dag_id, func.count('*'))
+ .filter( # We use `list` here because SQLA doesn't accept a set
+ # We use set to avoid duplicate dag_ids
+ DagRun.dag_id.in_(list({dm.dag_id for dm in dag_models})),
+ DagRun.state == State.QUEUED,
+ )
+ .group_by(DagRun.dag_id)
+ .all(),
+ )
for dag_model in dag_models:
+ # Lets quickly check if we have exceeded the number of queued dagruns per dags
+ total_queued = queued_runs_of_dags[dag_model.dag_id]
+ if total_queued >= max_queued_dagruns:
+ continue
+
try:
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
except SerializedDagNotFound:
self.log.exception("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
continue
-
dag_hash = self.dagbag.dags_hash.get(dag.dag_id)
# Explicitly check if the DagRun already exists. This is an edge case
# where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after`
@@ -1003,6 +1020,7 @@ class SchedulerJob(BaseJob):
# create a new one. This is so that in the next Scheduling loop we try to create new runs
# instead of falling in a loop of Integrity Error.
if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+
dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=dag_model.next_dagrun,
@@ -1012,6 +1030,7 @@ class SchedulerJob(BaseJob):
dag_hash=dag_hash,
creating_job_id=self.id,
)
+ queued_runs_of_dags[dag_model.dag_id] += 1
dag_model.calculate_dagrun_date_fields(dag, dag_model.next_dagrun)
# TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
diff --git a/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py
new file mode 100644
index 0000000..7326d73
--- /dev/null
+++ b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add index on state, dag_id for queued dagrun
+
+Revision ID: ccde3e26fe78
+Revises: 092435bf5d12
+Create Date: 2021-09-08 16:35:34.867711
+
+"""
+
+from alembic import op
+from sqlalchemy import text
+
+# revision identifiers, used by Alembic.
+revision = 'ccde3e26fe78'
+down_revision = '092435bf5d12'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply Add index on state, dag_id for queued dagrun"""
+ with op.batch_alter_table('dag_run') as batch_op:
+ batch_op.create_index(
+ 'idx_dag_run_queued_dags',
+ ["state", "dag_id"],
+ postgres_where=text("state='queued'"),
+ mssql_where=text("state='queued'"),
+ sqlite_where=text("state='queued'"),
+ )
+
+
+def downgrade():
+ """Unapply Add index on state, dag_id for queued dagrun"""
+ with op.batch_alter_table('dag_run') as batch_op:
+ batch_op.drop_index('idx_dag_run_queued_dags')
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index c27942b..1e5c2c1 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -107,6 +107,16 @@ class DagRun(Base, LoggingMixin):
mssql_where=text("state='running'"),
sqlite_where=text("state='running'"),
),
+ # since mysql lacks filtered/partial indices, this creates a
+ # duplicate index on mysql. Not the end of the world
+ Index(
+ 'idx_dag_run_queued_dags',
+ 'state',
+ 'dag_id',
+ postgres_where=text("state='queued'"),
+ mssql_where=text("state='queued'"),
+ sqlite_where=text("state='queued'"),
+ ),
)
task_instances = relationship(
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index e049603..4eb98d2 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``092435bf5d12`` (head) | ``97cdd93827b8`` | ``2.1.4`` | Add ``max_active_runs`` column to ``dag_model`` table |
+| ``ccde3e26fe78`` (head) | ``092435bf5d12`` | ``2.1.4`` | Add index on state, dag_id for queued ``dagrun`` |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``092435bf5d12`` | ``7b2661a43ba3`` | ``2.1.4`` | Add ``max_active_runs`` column to ``dag_model`` table |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``97cdd93827b8`` | ``a13f7613ad25`` | ``2.1.3`` | Add ``queued_at`` column in ``dag_run`` table |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 926a1fe..7953394 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1579,6 +1579,31 @@ class TestSchedulerJob(unittest.TestCase):
self.scheduler_job.executor.end.assert_called_once()
mock_processor_agent.return_value.end.reset_mock(side_effect=True)
+ def test_theres_limit_to_queued_dagruns_in_a_dag(self, dag_maker):
+ """This tests that there's limit to the number of queued dagrun scheduler can create in a dag"""
+ with dag_maker() as dag:
+ DummyOperator(task_id='mytask')
+
+ session = settings.Session()
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.executor = MockExecutor()
+ self.scheduler_job.processor_agent = mock.MagicMock()
+
+ self.scheduler_job.dagbag = dag_maker.dagbag
+
+ session = settings.Session()
+ orm_dag = session.query(DagModel).get(dag.dag_id)
+ assert orm_dag is not None
+ for _ in range(20):
+ self.scheduler_job._create_dag_runs([orm_dag], session)
+ assert session.query(DagRun).count() == 16
+
+ with conf_vars({('core', 'max_queued_runs_per_dag'): '5'}):
+ clear_db_runs()
+ for i in range(20):
+ self.scheduler_job._create_dag_runs([orm_dag], session)
+ assert session.query(DagRun).count() == 5
+
def test_dagrun_timeout_verify_max_active_runs(self):
"""
Test if a a dagrun will not be scheduled if max_dag_runs