You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/09/10 21:54:07 UTC

[airflow] branch v2-1-test updated (e7fc43f -> c81aa2b)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard e7fc43f  Add Changelog for 2.1.4
 discard eaca043  Bump version to 2.1.4
 discard a253b10  Update version added fields in airflow/config_templates/config.yml (#18128)
 discard 8b09602  Fix deprecation error message rather than silencing it (#18126)
 discard bf276ca  Limit the number of queued dagruns created by the Scheduler (#18065)
     new 247382f  Limit the number of queued dagruns created by the Scheduler (#18065)
     new f5f70e0  Fix deprecation error message rather than silencing it (#18126)
     new 0ee20ff  Update version added fields in airflow/config_templates/config.yml (#18128)
     new c1e9f80  Do not let create_dagrun overwrite explicit run_id (#17728)
     new 2ef6ab1  Bump version to 2.1.4
     new c81aa2b  Add Changelog for 2.1.4

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (e7fc43f)
            \
             N -- N -- N   refs/heads/v2-1-test (c81aa2b)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGELOG.txt                    |   1 +
 airflow/models/dag.py            |   9 +-
 tests/conftest.py                |  18 +-
 tests/jobs/test_scheduler_job.py | 392 ++++++++++-----------------------------
 4 files changed, 112 insertions(+), 308 deletions(-)

[airflow] 04/06: Do not let create_dagrun overwrite explicit run_id (#17728)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c1e9f8073193425194fe5d76c46c3c0f0f9af7ff
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Thu Aug 19 22:33:09 2021 +0800

    Do not let create_dagrun overwrite explicit run_id (#17728)
    
    Previous DAG.create_dagrun() has an weird behavior that when *all* of
    run_id, execution_date, and run_type are provided, the function would
    ignore the run_id argument and overwrite it by auto-generating a run_id
    with DagRun.generate_run_id(). This fix the logic to respect the
    explicit run_id value.
    
    I don't think any of the "Airflow proper" code would be affected by
    this, but the dag_maker fixture used in the test suite needs to be
    tweaked a bit to continue working.
    
    (cherry picked from commit 50771e0f66803d0a0a0b552ab77f4e6be7d1088b)
---
 airflow/models/dag.py |  9 +++++----
 tests/conftest.py     | 18 +++++++++++-------
 2 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 4ac2ace..a1419fe 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1767,15 +1767,16 @@ class DAG(LoggingMixin):
         :param dag_hash: Hash of Serialized DAG
         :type dag_hash: str
         """
-        if run_id and not run_type:
+        if run_id:  # Infer run_type from run_id if needed.
             if not isinstance(run_id, str):
                 raise ValueError(f"`run_id` expected to be a str is {type(run_id)}")
-            run_type: DagRunType = DagRunType.from_run_id(run_id)
-        elif run_type and execution_date:
+            if not run_type:
+                run_type = DagRunType.from_run_id(run_id)
+        elif run_type and execution_date is not None:  # Generate run_id from run_type and execution_date.
             if not isinstance(run_type, DagRunType):
                 raise ValueError(f"`run_type` expected to be a DagRunType is {type(run_type)}")
             run_id = DagRun.generate_run_id(run_type, execution_date)
-        elif not run_id:
+        else:
             raise AirflowException(
                 "Creating DagRun needs either `run_id` or both `run_type` and `execution_date`"
             )
diff --git a/tests/conftest.py b/tests/conftest.py
index 0873ac4..6bee400 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -459,13 +459,17 @@ def dag_maker(request):
 
         def create_dagrun(self, **kwargs):
             dag = self.dag
-            defaults = dict(
-                run_id='test',
-                state=State.RUNNING,
-                execution_date=self.start_date,
-                start_date=self.start_date,
-            )
-            kwargs = {**defaults, **kwargs}
+            kwargs = {
+                "state": State.RUNNING,
+                "execution_date": self.start_date,
+                "start_date": self.start_date,
+                "session": self.session,
+                **kwargs,
+            }
+            # Need to provide run_id if the user does not either provide one
+            # explicitly, or pass run_type for inference in dag.create_dagrun().
+            if "run_id" not in kwargs and "run_type" not in kwargs:
+                kwargs["run_id"] = "test"
             self.dag_run = dag.create_dagrun(**kwargs)
             return self.dag_run
 

[airflow] 06/06: Add Changelog for 2.1.4

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c81aa2b72686b235d6ba636f8c892a064b9bf621
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Sep 10 15:11:27 2021 +0100

    Add Changelog for 2.1.4
---
 CHANGELOG.txt | 39 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 39 insertions(+)

diff --git a/CHANGELOG.txt b/CHANGELOG.txt
index ecbb6b6..560abc5 100644
--- a/CHANGELOG.txt
+++ b/CHANGELOG.txt
@@ -1,3 +1,42 @@
+Airflow 2.1.4, 2021-09-15
+-------------------------
+
+Bug Fixes
+"""""""""
+
+- Fix deprecation error message rather than silencing it (#18126)
+- Limit the number of queued dagruns created by the Scheduler (#18065)
+- Fix ``DagRun`` execution order from queued to running not being properly followed (#18061)
+- Fix ``max_active_runs`` not allowing moving of queued dagruns to running (#17945)
+- Avoid redirect loop for users with no permissions (#17838)
+- Avoid endless redirect loop when user has no roles (#17613)
+- Fix log links on graph TI modal (#17862)
+- Hide variable import form if user lacks permission (#18000)
+- Improve dag/task concurrency check (#17786)
+- Fix Clear task instances endpoint resets all DAG runs bug (#17961)
+- Fixes incorrect parameter passed to views (#18083) (#18085)
+- Fix Sentry handler from ``LocalTaskJob`` causing error (#18119)
+- Limit ``colorlog`` version (6.x is incompatible) (#18099)
+- Only show Pause/Unpause tooltip on hover (#17957)
+- Improve graph view load time for dags with open groups (#17821)
+- Increase width for Run column (#17817)
+- Fix wrong query on running tis (#17631)
+- Add root to tree refresh url (#17633)
+- Do not delete running DAG from the UI (#17630)
+- Improve discoverability of Provider packages' functionality
+- Do not let ``create_dagrun`` overwrite explicit ``run_id`` (#17728)
+
+Doc only changes
+""""""""""""""""
+
+- Update version added fields in airflow/config_templates/config.yml (#18128)
+- Improve the description of how to handle dynamic task generation (#17963)
+- Improve cross-links to operators and hooks references (#17622)
+- Doc: Fix replacing Airflow version for Docker stack (#17711)
+- Make the providers operators/hooks reference much more usable (#17768)
+- Update description about the new ``connection-types`` provider meta-data
+- Suggest to use secrets backend for variable when it contains sensitive data (#17319)
+
 Airflow 2.1.3, 2021-08-21
 -------------------------
 

[airflow] 01/06: Limit the number of queued dagruns created by the Scheduler (#18065)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 247382fd0240c371a62748c67bc7a93700af98f0
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Sep 9 14:24:24 2021 +0100

    Limit the number of queued dagruns created by the Scheduler (#18065)
    
    There's no limit to the amount of queued dagruns to create currently
    and it has become a concern with issues raised against it. See #18023 and #17979
    
    Co-authored-by: Sam Wheating <sa...@gmail.com>
    (cherry picked from commit 0eb41b5952c2ce1884594c82bbf05835912b9812)
---
 airflow/config_templates/config.yml                |   8 +
 airflow/config_templates/default_airflow.cfg       |   4 +
 airflow/jobs/scheduler_job.py                      |  21 +-
 ...26fe78_add_index_on_state_dag_id_for_queued_.py |  52 +++
 airflow/models/dagrun.py                           |  10 +
 docs/apache-airflow/migrations-ref.rst             |   4 +-
 tests/jobs/test_scheduler_job.py                   | 411 ++++++---------------
 7 files changed, 214 insertions(+), 296 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 9945213..7abcb06 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -191,6 +191,14 @@
       type: string
       example: ~
       default: "16"
+    - name: max_queued_runs_per_dag
+      description: |
+        The maximum number of queued dagruns for a single DAG. The scheduler will not create more DAG runs
+        if it reaches the limit. This is not configurable at the DAG level.
+      version_added: 2.1.4
+      type: string
+      example: ~
+      default: "16"
     - name: load_examples
       description: |
         Whether to load the DAG examples that ship with Airflow. It's good to
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 03d5e1f..56a1d90 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -127,6 +127,10 @@ dags_are_paused_at_creation = True
 # which is defaulted as ``max_active_runs_per_dag``.
 max_active_runs_per_dag = 16
 
+# The maximum number of queued dagruns for a single DAG. The scheduler will not create more DAG runs
+# if it reaches the limit. This is not configurable at the DAG level.
+max_queued_runs_per_dag = 16
+
 # Whether to load the DAG examples that ship with Airflow. It's good to
 # get started, but you probably want to set this to ``False`` in a production
 # environment
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 8d5f888..45083a4 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -985,14 +985,31 @@ class SchedulerJob(BaseJob):
         existing_dagruns = (
             session.query(DagRun.dag_id, DagRun.execution_date).filter(existing_dagruns_filter).all()
         )
+        max_queued_dagruns = conf.getint('core', 'max_queued_runs_per_dag')
+
+        queued_runs_of_dags = defaultdict(
+            int,
+            session.query(DagRun.dag_id, func.count('*'))
+            .filter(  # We use `list` here because SQLA doesn't accept a set
+                # We use set to avoid duplicate dag_ids
+                DagRun.dag_id.in_(list({dm.dag_id for dm in dag_models})),
+                DagRun.state == State.QUEUED,
+            )
+            .group_by(DagRun.dag_id)
+            .all(),
+        )
 
         for dag_model in dag_models:
+            # Lets quickly check if we have exceeded the number of queued dagruns per dags
+            total_queued = queued_runs_of_dags[dag_model.dag_id]
+            if total_queued >= max_queued_dagruns:
+                continue
+
             try:
                 dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
             except SerializedDagNotFound:
                 self.log.exception("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
                 continue
-
             dag_hash = self.dagbag.dags_hash.get(dag.dag_id)
             # Explicitly check if the DagRun already exists. This is an edge case
             # where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after`
@@ -1003,6 +1020,7 @@ class SchedulerJob(BaseJob):
             # create a new one. This is so that in the next Scheduling loop we try to create new runs
             # instead of falling in a loop of Integrity Error.
             if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+
                 dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
@@ -1012,6 +1030,7 @@ class SchedulerJob(BaseJob):
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
+                queued_runs_of_dags[dag_model.dag_id] += 1
             dag_model.calculate_dagrun_date_fields(dag, dag_model.next_dagrun)
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
diff --git a/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py
new file mode 100644
index 0000000..7326d73
--- /dev/null
+++ b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add index on state, dag_id for queued dagrun
+
+Revision ID: ccde3e26fe78
+Revises: 092435bf5d12
+Create Date: 2021-09-08 16:35:34.867711
+
+"""
+
+from alembic import op
+from sqlalchemy import text
+
+# revision identifiers, used by Alembic.
+revision = 'ccde3e26fe78'
+down_revision = '092435bf5d12'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add index on state, dag_id for queued dagrun"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.create_index(
+            'idx_dag_run_queued_dags',
+            ["state", "dag_id"],
+            postgres_where=text("state='queued'"),
+            mssql_where=text("state='queued'"),
+            sqlite_where=text("state='queued'"),
+        )
+
+
+def downgrade():
+    """Unapply Add index on state, dag_id for queued dagrun"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.drop_index('idx_dag_run_queued_dags')
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index c27942b..1e5c2c1 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -107,6 +107,16 @@ class DagRun(Base, LoggingMixin):
             mssql_where=text("state='running'"),
             sqlite_where=text("state='running'"),
         ),
+        # since mysql lacks filtered/partial indices, this creates a
+        # duplicate index on mysql. Not the end of the world
+        Index(
+            'idx_dag_run_queued_dags',
+            'state',
+            'dag_id',
+            postgres_where=text("state='queued'"),
+            mssql_where=text("state='queued'"),
+            sqlite_where=text("state='queued'"),
+        ),
     )
 
     task_instances = relationship(
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index e049603..d43689c 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``092435bf5d12`` (head)        | ``97cdd93827b8`` | ``2.1.4``       | Add ``max_active_runs`` column to ``dag_model`` table                                 |
+| ``ccde3e26fe78`` (head)        | ``092435bf5d12`` | ``2.1.4``       | Add index on state, dag_id for queued ``dagrun``                                      |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``092435bf5d12``               | ``97cdd93827b8`` | ``2.1.4``       | Add ``max_active_runs`` column to ``dag_model`` table                                 |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``97cdd93827b8``               | ``a13f7613ad25`` | ``2.1.3``       | Add ``queued_at`` column in ``dag_run`` table                                         |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 926a1fe..2364c80 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -799,21 +799,33 @@ class TestSchedulerJob(unittest.TestCase):
         assert 0 == len(self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session))
         session.rollback()
 
-    def test_tis_for_queued_dagruns_are_not_run(self, dag_maker):
+    def test_tis_for_queued_dagruns_are_not_run(self):
         """
         This tests that tis from queued dagruns are not queued
         """
         dag_id = "test_tis_for_queued_dagruns_are_not_run"
         task_id_1 = 'dummy'
+        session = settings.Session()
 
-        with dag_maker(dag_id) as dag:
+        with DAG(dag_id=dag_id, start_date=DEFAULT_DATE) as dag:
             task1 = DummyOperator(task_id=task_id_1)
-        dr1 = dag_maker.create_dagrun(state=State.QUEUED)
-        dr2 = dag_maker.create_dagrun(
-            run_id='test2', execution_date=dag.following_schedule(dr1.execution_date)
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.sync_to_db(session=session)
+        dr1 = dag.create_dagrun(run_id='test', run_type=DagRunType.SCHEDULED, state=State.QUEUED)
+        dr2 = dag.create_dagrun(
+            run_id='test2',
+            execution_date=dag.following_schedule(dr1.execution_date),
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
         )
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        session = settings.Session()
+
         ti1 = TaskInstance(task1, dr1.execution_date)
         ti2 = TaskInstance(task1, dr2.execution_date)
         ti1.state = State.SCHEDULED
@@ -1579,6 +1591,41 @@ class TestSchedulerJob(unittest.TestCase):
         self.scheduler_job.executor.end.assert_called_once()
         mock_processor_agent.return_value.end.reset_mock(side_effect=True)
 
+    def test_theres_limit_to_queued_dagruns_in_a_dag(self):
+        """This tests that there's limit to the number of queued dagrun scheduler can create in a dag"""
+        with DAG(dag_id='test_theres_limit_to_queued_dagruns_in_a_dag', start_date=DEFAULT_DATE) as dag:
+            DummyOperator(task_id='mytask')
+
+        session = settings.Session()
+
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.sync_to_db(session=session)
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor()
+        self.scheduler_job.processor_agent = mock.MagicMock()
+
+        self.scheduler_job.dagbag = dagbag
+
+        session = settings.Session()
+        orm_dag = session.query(DagModel).get(dag.dag_id)
+        assert orm_dag is not None
+        for _ in range(20):
+            self.scheduler_job._create_dag_runs([orm_dag], session)
+        assert session.query(DagRun).count() == 16
+
+        with conf_vars({('core', 'max_queued_runs_per_dag'): '5'}):
+            clear_db_runs()
+            for i in range(20):
+                self.scheduler_job._create_dag_runs([orm_dag], session)
+        assert session.query(DagRun).count() == 5
+
     def test_dagrun_timeout_verify_max_active_runs(self):
         """
         Test if a a dagrun will not be scheduled if max_dag_runs
@@ -3729,19 +3776,38 @@ class TestSchedulerJob(unittest.TestCase):
         # Assert that the other one is queued
         assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 1
 
-    def test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags(self, dag_maker):
-        session = settings.Session()
-        with dag_maker('test_dag1', max_active_runs=1) as dag:
+    def test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags(self):
+
+        with DAG(
+            dag_id='test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags',
+            start_date=DEFAULT_DATE,
+            max_active_runs=1,
+        ) as dag:
             DummyOperator(task_id='mytask')
+
+        session = settings.Session()
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.sync_to_db(session=session)
         date = dag.following_schedule(DEFAULT_DATE)
         for _ in range(30):
             dr = dag.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date)
             date = dr.execution_date + timedelta(hours=1)
 
         date = timezone.datetime(2020, 1, 1)
-        with dag_maker('test_dag2', start_date=date) as dag2:
+        with DAG(
+            dag_id='test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags_2',
+            start_date=date,
+        ) as dag2:
             DummyOperator(task_id='mytask')
 
+        dagbag.bag_dag(dag=dag2, root_dag=dag2)
+        dagbag.sync_to_db(session=session)
         for _ in range(10):
             dr = dag2.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date)
             date = dr.execution_date + timedelta(hours=1)
@@ -3766,13 +3832,25 @@ class TestSchedulerJob(unittest.TestCase):
         )
         assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 11
 
-    def test_start_queued_dagruns_do_follow_execution_date_order(self, dag_maker):
+    def test_start_queued_dagruns_do_follow_execution_date_order(self):
         session = settings.Session()
-        with dag_maker('test_dag1', max_active_runs=1) as dag:
+        with DAG(
+            dag_id='test_start_queued_dagruns_do_follow_execution_date_order',
+            start_date=DEFAULT_DATE,
+            max_active_runs=1,
+        ) as dag:
             DummyOperator(task_id='mytask')
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.sync_to_db(session=session)
         date = dag.following_schedule(DEFAULT_DATE)
         for i in range(30):
-            dr = dag_maker.create_dagrun(
+            dr = dag.create_dagrun(
                 run_id=f'dagrun_{i}', run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date
             )
             date = dr.execution_date + timedelta(hours=1)
@@ -3782,7 +3860,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         self.scheduler_job._start_queued_dagruns(session)
         session.flush()
-        dr = DagRun.find(run_id='dagrun_0')
+        dr = DagRun.find(dag_id=dag.dag_id, run_id='dagrun_0')
         ti = dr[0].get_task_instance(task_id='mytask', session=session)
         ti.state = State.SUCCESS
         session.merge(ti)
@@ -3799,7 +3877,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         assert dr[0].state == State.RUNNING
 
-    def test_no_dagruns_would_stuck_in_running(self, dag_maker):
+    def test_no_dagruns_would_stuck_in_running(self):
         # Test that running dagruns are not stuck in running.
         # Create one dagrun in 'running' state and 1 in 'queued' state from one dag(max_active_runs=1)
         # Create 16 dagruns in 'running' state and 16 in 'queued' state from another dag
@@ -3810,39 +3888,54 @@ class TestSchedulerJob(unittest.TestCase):
         session = settings.Session()
         # first dag and dagruns
         date = timezone.datetime(2016, 1, 1)
-        with dag_maker('test_dagrun_states_are_correct_1', max_active_runs=1, start_date=date) as dag:
+
+        with DAG(
+            dag_id='test_dagrun_states_are_correct_1',
+            start_date=date,
+            max_active_runs=1,
+        ) as dag:
             task1 = DummyOperator(task_id='dummy_task')
 
-        dr1_running = dag_maker.create_dagrun(run_id='dr1_run_1', execution_date=date)
-        dag_maker.create_dagrun(
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.sync_to_db(session=session)
+
+        dr1_running = dag.create_dagrun(run_id='dr1_run_1', execution_date=date, state=State.RUNNING)
+        dag.create_dagrun(
             run_id='dr1_run_2',
             state=State.QUEUED,
             execution_date=dag.following_schedule(dr1_running.execution_date),
         )
         # second dag and dagruns
         date = timezone.datetime(2020, 1, 1)
-        with dag_maker('test_dagrun_states_are_correct_2', start_date=date) as dag:
+
+        with DAG(dag_id='test_dagrun_states_are_correct_2', start_date=date) as dag2:
             DummyOperator(task_id='dummy_task')
         for i in range(16):
-            dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.RUNNING, execution_date=date)
+            dr = dag2.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.RUNNING, execution_date=date)
             date = dr.execution_date + timedelta(hours=1)
         dr16 = DagRun.find(run_id='dr2_run_16')
         date = dr16[0].execution_date + timedelta(hours=1)
         for i in range(16, 32):
-            dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date)
+            dr = dag2.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date)
             date = dr.execution_date + timedelta(hours=1)
 
         # third dag and dagruns
         date = timezone.datetime(2021, 1, 1)
-        with dag_maker('test_dagrun_states_are_correct_3', start_date=date) as dag:
+        with DAG(dag_id='test_dagrun_states_are_correct_3', start_date=date) as dag3:
             DummyOperator(task_id='dummy_task')
         for i in range(16):
-            dr = dag_maker.create_dagrun(run_id=f'dr3_run_{i+1}', state=State.RUNNING, execution_date=date)
+            dr = dag3.create_dagrun(run_id=f'dr3_run_{i+1}', state=State.RUNNING, execution_date=date)
             date = dr.execution_date + timedelta(hours=1)
         dr16 = DagRun.find(run_id='dr3_run_16')
         date = dr16[0].execution_date + timedelta(hours=1)
         for i in range(16, 32):
-            dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date)
+            dr = dag3.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date)
             date = dr.execution_date + timedelta(hours=1)
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
@@ -3862,276 +3955,6 @@ class TestSchedulerJob(unittest.TestCase):
         assert DagRun.find(run_id='dr1_run_1')[0].state == State.SUCCESS
         assert DagRun.find(run_id='dr1_run_2')[0].state == State.RUNNING
 
-    @pytest.mark.parametrize(
-        "state, start_date, end_date",
-        [
-            [State.NONE, None, None],
-            [
-                State.UP_FOR_RETRY,
-                timezone.utcnow() - datetime.timedelta(minutes=30),
-                timezone.utcnow() - datetime.timedelta(minutes=15),
-            ],
-            [
-                State.UP_FOR_RESCHEDULE,
-                timezone.utcnow() - datetime.timedelta(minutes=30),
-                timezone.utcnow() - datetime.timedelta(minutes=15),
-            ],
-        ],
-    )
-    def test_dag_file_processor_process_task_instances(self, state, start_date, end_date, dag_maker):
-        """
-        Test if _process_task_instances puts the right task instances into the
-        mock_list.
-        """
-        with dag_maker(dag_id='test_scheduler_process_execute_task'):
-            BashOperator(task_id='dummy', bash_command='echo hi')
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.processor_agent = mock.MagicMock()
-
-        dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
-        assert dr is not None
-
-        with create_session() as session:
-            ti = dr.get_task_instances(session=session)[0]
-            ti.state = state
-            ti.start_date = start_date
-            ti.end_date = end_date
-
-            self.scheduler_job._schedule_dag_run(dr, session)
-            assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 1
-
-            session.refresh(ti)
-            assert ti.state == State.SCHEDULED
-
-    @pytest.mark.parametrize(
-        "state,start_date,end_date",
-        [
-            [State.NONE, None, None],
-            [
-                State.UP_FOR_RETRY,
-                timezone.utcnow() - datetime.timedelta(minutes=30),
-                timezone.utcnow() - datetime.timedelta(minutes=15),
-            ],
-            [
-                State.UP_FOR_RESCHEDULE,
-                timezone.utcnow() - datetime.timedelta(minutes=30),
-                timezone.utcnow() - datetime.timedelta(minutes=15),
-            ],
-        ],
-    )
-    def test_dag_file_processor_process_task_instances_with_max_active_tis_per_dag(
-        self, state, start_date, end_date, dag_maker
-    ):
-        """
-        Test if _process_task_instances puts the right task instances into the
-        mock_list.
-        """
-        with dag_maker(dag_id='test_scheduler_process_execute_task_with_max_active_tis_per_dag'):
-            BashOperator(task_id='dummy', max_active_tis_per_dag=2, bash_command='echo Hi')
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.processor_agent = mock.MagicMock()
-
-        dr = dag_maker.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-        )
-        assert dr is not None
-
-        with create_session() as session:
-            ti = dr.get_task_instances(session=session)[0]
-            ti.state = state
-            ti.start_date = start_date
-            ti.end_date = end_date
-
-            self.scheduler_job._schedule_dag_run(dr, session)
-            assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 1
-
-            session.refresh(ti)
-            assert ti.state == State.SCHEDULED
-
-    @pytest.mark.parametrize(
-        "state, start_date, end_date",
-        [
-            [State.NONE, None, None],
-            [
-                State.UP_FOR_RETRY,
-                timezone.utcnow() - datetime.timedelta(minutes=30),
-                timezone.utcnow() - datetime.timedelta(minutes=15),
-            ],
-            [
-                State.UP_FOR_RESCHEDULE,
-                timezone.utcnow() - datetime.timedelta(minutes=30),
-                timezone.utcnow() - datetime.timedelta(minutes=15),
-            ],
-        ],
-    )
-    def test_dag_file_processor_process_task_instances_depends_on_past(
-        self, state, start_date, end_date, dag_maker
-    ):
-        """
-        Test if _process_task_instances puts the right task instances into the
-        mock_list.
-        """
-        with dag_maker(
-            dag_id='test_scheduler_process_execute_task_depends_on_past',
-            default_args={
-                'depends_on_past': True,
-            },
-        ):
-            BashOperator(task_id='dummy1', bash_command='echo hi')
-            BashOperator(task_id='dummy2', bash_command='echo hi')
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.processor_agent = mock.MagicMock()
-        dr = dag_maker.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-        )
-        assert dr is not None
-
-        with create_session() as session:
-            tis = dr.get_task_instances(session=session)
-            for ti in tis:
-                ti.state = state
-                ti.start_date = start_date
-                ti.end_date = end_date
-
-            self.scheduler_job._schedule_dag_run(dr, session)
-            assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 2
-
-            session.refresh(tis[0])
-            session.refresh(tis[1])
-            assert tis[0].state == State.SCHEDULED
-            assert tis[1].state == State.SCHEDULED
-
-    def test_scheduler_job_add_new_task(self, dag_maker):
-        """
-        Test if a task instance will be added if the dag is updated
-        """
-        with dag_maker(dag_id='test_scheduler_add_new_task') as dag:
-            BashOperator(task_id='dummy', bash_command='echo test')
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.dagbag = dag_maker.dagbag
-
-        session = settings.Session()
-        orm_dag = dag_maker.dag_model
-        assert orm_dag is not None
-
-        if self.scheduler_job.processor_agent:
-            self.scheduler_job.processor_agent.end()
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.processor_agent = mock.MagicMock()
-        dag = self.scheduler_job.dagbag.get_dag('test_scheduler_add_new_task', session=session)
-        self.scheduler_job._create_dag_runs([orm_dag], session)
-
-        drs = DagRun.find(dag_id=dag.dag_id, session=session)
-        assert len(drs) == 1
-        dr = drs[0]
-
-        tis = dr.get_task_instances()
-        assert len(tis) == 1
-
-        BashOperator(task_id='dummy2', dag=dag, bash_command='echo test')
-        SerializedDagModel.write_dag(dag=dag)
-
-        self.scheduler_job._schedule_dag_run(dr, session)
-        assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 2
-        session.flush()
-
-        drs = DagRun.find(dag_id=dag.dag_id, session=session)
-        assert len(drs) == 1
-        dr = drs[0]
-
-        tis = dr.get_task_instances()
-        assert len(tis) == 2
-
-    def test_runs_respected_after_clear(self, dag_maker):
-        """
-        Test dag after dag.clear, max_active_runs is respected
-        """
-        with dag_maker(
-            dag_id='test_scheduler_max_active_runs_respected_after_clear', max_active_runs=1
-        ) as dag:
-            BashOperator(task_id='dummy', bash_command='echo Hi')
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.processor_agent = mock.MagicMock()
-
-        session = settings.Session()
-        date = DEFAULT_DATE
-        dag_maker.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            state=State.QUEUED,
-        )
-        date = dag.following_schedule(date)
-        dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            execution_date=date,
-            state=State.QUEUED,
-        )
-        date = dag.following_schedule(date)
-        dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            execution_date=date,
-            state=State.QUEUED,
-        )
-        dag.clear()
-
-        assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 3
-
-        session = settings.Session()
-        self.scheduler_job._start_queued_dagruns(session)
-        session.flush()
-        # Assert that only 1 dagrun is active
-        assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)) == 1
-        # Assert that the other two are queued
-        assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 2
-
-    def test_timeout_triggers(self, dag_maker):
-        """
-        Tests that tasks in the deferred state, but whose trigger timeout
-        has expired, are correctly failed.
-
-        """
-
-        session = settings.Session()
-        # Create the test DAG and task
-        with dag_maker(
-            dag_id='test_timeout_triggers',
-            start_date=DEFAULT_DATE,
-            schedule_interval='@once',
-            max_active_runs=1,
-            session=session,
-        ):
-            DummyOperator(task_id='dummy1')
-
-        # Create a Task Instance for the task that is allegedly deferred
-        # but past its timeout, and one that is still good.
-        # We don't actually need a linked trigger here; the code doesn't check.
-        dr1 = dag_maker.create_dagrun()
-        dr2 = dag_maker.create_dagrun(
-            run_id="test2", execution_date=DEFAULT_DATE + datetime.timedelta(seconds=1)
-        )
-        ti1 = dr1.get_task_instance('dummy1', session)
-        ti2 = dr2.get_task_instance('dummy1', session)
-        ti1.state = State.DEFERRED
-        ti1.trigger_timeout = timezone.utcnow() - datetime.timedelta(seconds=60)
-        ti2.state = State.DEFERRED
-        ti2.trigger_timeout = timezone.utcnow() + datetime.timedelta(seconds=60)
-        session.flush()
-
-        # Boot up the scheduler and make it check timeouts
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.check_trigger_timeouts(session=session)
-
-        # Make sure that TI1 is now scheduled to fail, and 2 wasn't touched
-        session.refresh(ti1)
-        session.refresh(ti2)
-        assert ti1.state == State.SCHEDULED
-        assert ti1.next_method == "__fail__"
-        assert ti2.state == State.DEFERRED
-
 
 @pytest.mark.xfail(reason="Work out where this goes")
 def test_task_with_upstream_skip_process_task_instances():

[airflow] 05/06: Bump version to 2.1.4

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2ef6ab1dc3e31b605a6f8f4ba0699aa2aa1cfca4
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Sep 10 15:06:48 2021 +0100

    Bump version to 2.1.4
---
 README.md | 16 ++++++++--------
 setup.py  |  2 +-
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/README.md b/README.md
index 722fad2..8981e3a 100644
--- a/README.md
+++ b/README.md
@@ -82,7 +82,7 @@ Airflow is not a streaming solution, but it is often used to process real-time d
 
 Apache Airflow is tested with:
 
-|                      | Main version (dev)        | Stable version (2.1.3)   |
+|                      | Main version (dev)        | Stable version (2.1.4)   |
 | -------------------- | ------------------------- | ------------------------ |
 | Python               | 3.6, 3.7, 3.8, 3.9        | 3.6, 3.7, 3.8, 3.9       |
 | Kubernetes           | 1.20, 1.19, 1.18          | 1.20, 1.19, 1.18         |
@@ -142,15 +142,15 @@ them to appropriate format and workflow that your tool requires.
 
 
 ```bash
-pip install apache-airflow==2.1.3 \
- --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.3/constraints-3.7.txt"
+pip install apache-airflow==2.1.4 \
+ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.4/constraints-3.7.txt"
 ```
 
 2. Installing with extras (for example postgres,google)
 
 ```bash
-pip install apache-airflow[postgres,google]==2.1.3 \
- --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.3/constraints-3.7.txt"
+pip install apache-airflow[postgres,google]==2.1.4 \
+ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.4/constraints-3.7.txt"
 ```
 
 For information on installing provider packages check
@@ -231,7 +231,7 @@ packages:
 * **Airflow Providers**: SemVer rules apply to changes in the particular provider's code only.
   SemVer MAJOR and MINOR versions for the packages are independent from Airflow version.
   For example `google 4.1.0` and `amazon 3.0.3` providers can happily be installed
-  with `Airflow 2.1.3`. If there are limits of cross-dependencies between providers and Airflow packages,
+  with `Airflow 2.1.4`. If there are limits of cross-dependencies between providers and Airflow packages,
   they are present in providers as `install_requires` limitations. We aim to keep backwards
   compatibility of providers with all previously released Airflow 2 versions but
   there will be sometimes breaking changes that might make some, or all
@@ -254,7 +254,7 @@ Apache Airflow version life cycle:
 
 | Version | Current Patch/Minor | State     | First Release | Limited Support | EOL/Terminated |
 |---------|---------------------|-----------|---------------|-----------------|----------------|
-| 2       | 2.1.3               | Supported | Dec 17, 2020  | Dec 2021        | TBD            |
+| 2       | 2.1.4               | Supported | Dec 17, 2020  | Dec 2021        | TBD            |
 | 1.10    | 1.10.15             | EOL       | Aug 27, 2018  | Dec 17, 2020    | June 17, 2021  |
 | 1.9     | 1.9.0               | EOL       | Jan 03, 2018  | Aug 27, 2018    | Aug 27, 2018   |
 | 1.8     | 1.8.2               | EOL       | Mar 19, 2017  | Jan 03, 2018    | Jan 03, 2018   |
@@ -280,7 +280,7 @@ They are based on the official release schedule of Python and Kubernetes, nicely
 
 2. The "oldest" supported version of Python/Kubernetes is the default one. "Default" is only meaningful
    in terms of "smoke tests" in CI PRs which are run using this default version and default reference
-   image available. Currently ``apache/airflow:latest`` and ``apache/airflow:2.1.3` images
+   image available. Currently ``apache/airflow:latest`` and ``apache/airflow:2.1.4` images
    are both Python 3.6 images, however the first MINOR/MAJOR release of Airflow release after 23.12.2021 will
    become Python 3.7 images.
 
diff --git a/setup.py b/setup.py
index 33cb4f9..33e1c8d 100644
--- a/setup.py
+++ b/setup.py
@@ -41,7 +41,7 @@ PY39 = sys.version_info >= (3, 9)
 
 logger = logging.getLogger(__name__)
 
-version = '2.1.3'
+version = '2.1.4'
 
 my_dir = dirname(__file__)
 

[airflow] 02/06: Fix deprecation error message rather than silencing it (#18126)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f5f70e0ae79036e5a322fa1fc96430518c15e052
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Fri Sep 10 00:07:19 2021 +0100

    Fix deprecation error message rather than silencing it (#18126)
    
    (cherry picked from commit c9d29467f71060f14863ca3508cb1055572479b5)
---
 .../versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py  | 2 +-
 .../versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py    | 2 +-
 airflow/models/dagrun.py                                              | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py b/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py
index 82bb4c2..a9f612d 100644
--- a/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py
+++ b/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py
@@ -44,7 +44,7 @@ def upgrade():
         batch_op.create_index(
             'idx_dag_run_running_dags',
             ["state", "dag_id"],
-            postgres_where=text("state='running'"),
+            postgresql_where=text("state='running'"),
             mssql_where=text("state='running'"),
             sqlite_where=text("state='running'"),
         )
diff --git a/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py
index 7326d73..6a1cbe6 100644
--- a/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py
+++ b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py
@@ -40,7 +40,7 @@ def upgrade():
         batch_op.create_index(
             'idx_dag_run_queued_dags',
             ["state", "dag_id"],
-            postgres_where=text("state='queued'"),
+            postgresql_where=text("state='queued'"),
             mssql_where=text("state='queued'"),
             sqlite_where=text("state='queued'"),
         )
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 1e5c2c1..ec4bdfb 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -103,7 +103,7 @@ class DagRun(Base, LoggingMixin):
             'idx_dag_run_running_dags',
             'state',
             'dag_id',
-            postgres_where=text("state='running'"),
+            postgresql_where=text("state='running'"),
             mssql_where=text("state='running'"),
             sqlite_where=text("state='running'"),
         ),
@@ -113,7 +113,7 @@ class DagRun(Base, LoggingMixin):
             'idx_dag_run_queued_dags',
             'state',
             'dag_id',
-            postgres_where=text("state='queued'"),
+            postgresql_where=text("state='queued'"),
             mssql_where=text("state='queued'"),
             sqlite_where=text("state='queued'"),
         ),

[airflow] 03/06: Update version added fields in airflow/config_templates/config.yml (#18128)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0ee20fff9f32cdcfe07fc1ab545d9b491c4374ac
Author: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
AuthorDate: Fri Sep 10 01:21:06 2021 +0200

    Update version added fields in airflow/config_templates/config.yml (#18128)
    
    (cherry picked from commit 2767781b880b0fb03d46950c06e1e44902c25a7c)
---
 airflow/config_templates/config.yml | 128 ++++++++++++++++++------------------
 1 file changed, 64 insertions(+), 64 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 7abcb06..38be813 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -231,7 +231,7 @@
         but means plugin changes picked up by tasks straight away)
       default: "False"
       example: ~
-      version_added: "2.0.0"
+      version_added: 2.0.0
       see_also: ":ref:`plugins:loading`"
       type: boolean
     - name: fernet_key
@@ -382,7 +382,7 @@
         All the template_fields for each of Task Instance are stored in the Database.
         Keeping this number small may cause an error when you try to view ``Rendered`` tab in
         TaskInstance view for older tasks.
-      version_added: 2.0.0
+      version_added: 1.10.10
       type: integer
       example: ~
       default: "30"
@@ -422,7 +422,7 @@
         Number of times the code should be retried in case of DB Operational Errors.
         Not all transactions will be retried as it can cause undesired state.
         Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``.
-      version_added: ~
+      version_added: 2.0.0
       type: integer
       example: ~
       default: "3"
@@ -431,7 +431,7 @@
         Hide sensitive Variables or Connection extra json keys from UI and task logs when set to True
 
         (Connection passwords are always hidden in logs)
-      version_added: ~
+      version_added: 2.1.0
       type: boolean
       example: ~
       default: "True"
@@ -439,7 +439,7 @@
       description: |
         A comma-separated list of extra sensitive keywords to look for in variables names or connection's
         extra JSON.
-      version_added: ~
+      version_added: 2.1.0
       type: string
       example: ~
       default: ""
@@ -451,7 +451,7 @@
       description: |
         The folder where airflow should store its log files
         This path must be absolute
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "{AIRFLOW_HOME}/logs"
@@ -459,7 +459,7 @@
       description: |
         Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
         Set this to True if you want to enable remote logging.
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "False"
@@ -467,7 +467,7 @@
       description: |
         Users must supply an Airflow connection id that provides access to the storage
         location.
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: ""
@@ -477,7 +477,7 @@
         Credentials
         <https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__ will
         be used.
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: ""
@@ -489,14 +489,14 @@
         GCS buckets should start with "gs://"
         WASB buckets should start with "wasb" just to help Airflow select correct handler
         Stackdriver logs should start with "stackdriver://"
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: ""
     - name: encrypt_s3_logs
       description: |
         Use server-side encryption for logs stored in S3
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "False"
@@ -505,7 +505,7 @@
         Logging level.
 
         Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``.
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "INFO"
@@ -514,7 +514,7 @@
         Logging level for Flask-appbuilder UI.
 
         Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``.
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "WARN"
@@ -523,7 +523,7 @@
         Logging class
         Specify the class that will specify the logging configuration
         This class has to be on the python classpath
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: "my.path.default_local_settings.LOGGING_CONFIG"
       default: ""
@@ -531,14 +531,14 @@
       description: |
         Flag to enable/disable Colored logs in Console
         Colour the logs when the controlling terminal is a TTY.
-      version_added: 1.10.4
+      version_added: 2.0.0
       type: string
       example: ~
       default: "True"
     - name: colored_log_format
       description: |
         Log format for when Colored logs is enabled
-      version_added: 1.10.4
+      version_added: 2.0.0
       type: string
       example: ~
       default: >-
@@ -546,48 +546,48 @@
         %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
     - name: colored_formatter_class
       description: ~
-      version_added: 1.10.4
+      version_added: 2.0.0
       type: string
       example: ~
       default: "airflow.utils.log.colored_log.CustomTTYColoredFormatter"
     - name: log_format
       description: |
         Format of Log line
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "[%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s"
     - name: simple_log_format
       description: ~
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "%%(asctime)s %%(levelname)s - %%(message)s"
     - name: task_log_prefix_template
       description: |
         Specify prefix pattern like mentioned below with stream handler TaskHandlerWithCustomFormatter
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: "{{ti.dag_id}}-{{ti.task_id}}-{{execution_date}}-{{try_number}}"
       default: ""
     - name: log_filename_template
       description: |
         Formatting for how airflow generates file names/paths for each task run.
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "{{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log"
     - name: log_processor_filename_template
       description: |
         Formatting for how airflow generates file names for log
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "{{{{ filename }}}}.log"
     - name: dag_processor_manager_log_location
       description: |
         full path of dag_processor_manager logfile
-      version_added: 1.10.2
+      version_added: 2.0.0
       type: string
       example: ~
       default: "{AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log"
@@ -595,7 +595,7 @@
       description: |
         Name of handler to read task instance logs.
         Defaults to use ``task`` handler.
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "task"
@@ -603,7 +603,7 @@
       description: |
         A comma\-separated list of third-party logger names that will be configured to print messages to
         consoles\.
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: "connexion,sqlalchemy"
       default: ""
@@ -614,25 +614,25 @@
     - name: statsd_on
       description: |
         Enables sending metrics to StatsD.
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "False"
     - name: statsd_host
       description: ~
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "localhost"
     - name: statsd_port
       description: ~
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "8125"
     - name: statsd_prefix
       description: ~
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "airflow"
@@ -641,7 +641,7 @@
         If you want to avoid sending all the available metrics to StatsD,
         you can configure an allow list of prefixes (comma separated) to send only the metrics that
         start with the elements of the list (e.g: "scheduler,executor,dagrun")
-      version_added: 1.10.6
+      version_added: 2.0.0
       type: string
       example: ~
       default: ""
@@ -652,21 +652,21 @@
 
         The function should have the following signature:
         def func_name(stat_name: str) -> str:
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: ""
     - name: statsd_datadog_enabled
       description: |
         To enable datadog integration to send airflow metrics.
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "False"
     - name: statsd_datadog_tags
       description: |
         List of datadog tags attached to all metrics(e.g: key1:value1,key2:value2)
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: ""
@@ -675,7 +675,7 @@
         If you want to utilise your own custom Statsd client set the relevant
         module path below.
         Note: The module path must exist on your PYTHONPATH for Airflow to pick it up
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: ~
@@ -762,7 +762,7 @@
     - name: maximum_page_limit
       description: |
         Used to set the maximum page limit for API requests
-      version_added: ~
+      version_added: 2.0.0
       type: integer
       example: ~
       default: "100"
@@ -774,14 +774,14 @@
         If no limit is supplied, the OpenApi spec default is used.
       type: integer
       example: ~
-      version_added: ~
+      version_added: 2.0.0
       default: "100"
     - name: google_oauth2_audience
       description: The intended audience for JWT token credentials used for authorization.
         This value must match on the client and server sides.
         If empty, audience will not be tested.
       type: string
-      version_added: ~
+      version_added: 2.0.0
       example: project-id-random-value.apps.googleusercontent.com
       default: ""
     - name: google_key_path
@@ -791,7 +791,7 @@
         <https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__ will
         be used.
       type: string
-      version_added: ~
+      version_added: 2.0.0
       example: /files/service-account-json
       default: ""
     - name: access_control_allow_headers
@@ -801,21 +801,21 @@
         the server side response to the browser's
         Access-Control-Request-Headers header.
       type: string
-      version_added: ~
+      version_added: 2.1.0
       example: ~
       default: ""
     - name: access_control_allow_methods
       description: |
         Specifies the method or methods allowed when accessing the resource.
       type: string
-      version_added: ~
+      version_added: 2.1.0
       example: ~
       default: ""
     - name: access_control_allow_origin
       description: |
         Indicates whether the response can be shared with requesting code from the given origin.
       type: string
-      version_added: ~
+      version_added: 2.2.0
       example: ~
       default: ""
 - name: lineage
@@ -900,7 +900,7 @@
     - name: default_queue
       description: |
         Default queue that tasks get assigned to and that worker listen on.
-      version_added: ~
+      version_added: 2.1.0
       type: string
       example: ~
       default: "default"
@@ -908,7 +908,7 @@
       description: |
         Is allowed to pass additional/unused arguments (args, kwargs) to the BaseOperator operator.
         If set to False, an exception will be thrown, otherwise only the console message will be displayed.
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "False"
@@ -926,7 +926,7 @@
       description: |
         Template for mapred_job_name in HiveOperator, supports the following named parameters
         hostname, dag_id, task_id, execution_date
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: ~
@@ -1064,7 +1064,7 @@
         Access log format for gunicorn webserver.
         default format is %%(h)s %%(l)s %%(u)s %%(t)s "%%(r)s" %%(s)s %%(b)s "%%(f)s" "%%(a)s"
         documentation - https://docs.gunicorn.org/en/stable/settings.html#access-log-format
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: ""
@@ -1251,7 +1251,7 @@
     - name: show_recent_stats_for_completed_runs
       description: |
         'Recent Tasks' stats will show for old DagRuns if set
-      version_added: ~
+      version_added: 2.0.0
       type: boolean
       example: ~
       default: "True"
@@ -1292,21 +1292,21 @@
       default: "airflow.utils.email.send_email_smtp"
     - name: email_conn_id
       description: Email connection to use
-      version_added: ~
+      version_added: 2.1.0
       type: string
       example: ~
       default: "smtp_default"
     - name: default_email_on_retry
       description: |
         Whether email alerts should be sent when a task is retried
-      version_added: ~
+      version_added: 2.0.0
       type: boolean
       example: ~
       default: "True"
     - name: default_email_on_failure
       description: |
         Whether email alerts should be sent when a task failed
-      version_added: ~
+      version_added: 2.0.0
       type: boolean
       example: ~
       default: "True"
@@ -1314,7 +1314,7 @@
       description: |
         File that will be used as the template for Email subject (which will be rendered using Jinja2).
         If not set, Airflow uses a base template.
-      version_added: ~
+      version_added: 2.0.1
       type: string
       example: "/path/to/my_subject_template_file"
       default: ~
@@ -1323,7 +1323,7 @@
       description: |
         File that will be used as the template for Email content (which will be rendered using Jinja2).
         If not set, Airflow uses a base template.
-      version_added: ~
+      version_added: 2.0.1
       type: string
       example: "/path/to/my_html_content_template_file"
       default: ~
@@ -1380,13 +1380,13 @@
       default: "airflow@example.com"
     - name: smtp_timeout
       description: ~
-      version_added: ~
+      version_added: 2.0.0
       type: integer
       example: ~
       default: "30"
     - name: smtp_retry_limit
       description: ~
-      version_added: ~
+      version_added: 2.0.0
       type: integer
       example: ~
       default: "5"
@@ -1400,7 +1400,7 @@
   options:
     - name: sentry_on
       description: Enable error reporting to Sentry
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "false"
@@ -1468,7 +1468,7 @@
         running tasks while another worker has unutilized processes that are unable to process the already
         claimed blocked tasks.
         https://docs.celeryproject.org/en/stable/userguide/optimizing.html#prefetch-limits
-      version_added: ~
+      version_added: 2.0.0
       type: integer
       example: "1"
       default: ~
@@ -1488,7 +1488,7 @@
         Umask that will be used when starting workers with the ``airflow celery worker``
         in daemon mode. This control the file-creation mode mask which determines the initial
         value of file permission bits for newly created files.
-      version_added: ~
+      version_added: 2.0.0
       type: string
       example: ~
       default: "0o077"
@@ -1631,7 +1631,7 @@
     - name: worker_precheck
       description: |
         Worker initialisation check to validate Metadata Database connection
-      version_added: 1.10.1
+      version_added: 2.0.0
       type: string
       example: ~
       default: "False"
@@ -1893,7 +1893,7 @@
       default: "False"
     - name: dependency_detector
       description: DAG dependency detector class to use
-      version_added: ~
+      version_added: 2.1.0
       type: string
       example: ~
       default: "airflow.serialization.serialized_objects.DependencyDetector"
@@ -2140,7 +2140,7 @@
       description: |
         Enables TCP keepalive mechanism. This prevents Kubernetes API requests to hang indefinitely
         when idle connection is time-outed on services like cloud load balancers or firewalls.
-      version_added: ~
+      version_added: 2.0.0
       type: boolean
       example: ~
       default: "True"
@@ -2148,7 +2148,7 @@
       description: |
         When the `enable_tcp_keepalive` option is enabled, TCP probes a connection that has
         been idle for `tcp_keep_idle` seconds.
-      version_added: ~
+      version_added: 2.0.0
       type: integer
       example: ~
       default: "120"
@@ -2156,7 +2156,7 @@
       description: |
         When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond
         to a keepalive probe, TCP retransmits the probe after `tcp_keep_intvl` seconds.
-      version_added: ~
+      version_added: 2.0.0
       type: integer
       example: ~
       default: "30"
@@ -2165,14 +2165,14 @@
         When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond
         to a keepalive probe, TCP retransmits the probe `tcp_keep_cnt number` of times before
         a connection is considered to be broken.
-      version_added: ~
+      version_added: 2.0.0
       type: integer
       example: ~
       default: "6"
     - name: verify_ssl
       description: |
         Set this to false to skip verifying SSL certificate of Kubernetes python client.
-      version_added: ~
+      version_added: 2.1.0
       type: boolean
       example: ~
       default: "True"
@@ -2220,7 +2220,7 @@
     - name: shards
       description: |
         The number of running smart sensor processes for each service.
-      version_added: ~
+      version_added: 2.0.0
       type: integer
       example: ~
       default: "5"