You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/11 21:57:31 UTC

[airflow] branch v2-1-test updated (0bd9558 -> 205d6d4)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a change to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 0bd9558  Proper warning message when recorded PID is different from current PID (#17411)
     new f7bece3  Add 'queued' state to DagRun (#16401)
     new 38d4bab  Fix race condition with dagrun callbacks (#16741)
     new 205d6d4  Run mini scheduler in LocalTaskJob during task exit (#16289)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/api_connexion/openapi/v1.yaml              |   4 +-
 airflow/jobs/local_task_job.py                     |  68 +-
 airflow/jobs/scheduler_job.py                      | 165 ++---
 ...3827b8_add_queued_at_column_to_dagrun_table.py} |  25 +-
 airflow/models/dag.py                              |   4 +-
 airflow/models/dagrun.py                           |  17 +-
 airflow/models/taskinstance.py                     |  66 +-
 airflow/www/static/js/tree.js                      |   4 +-
 airflow/www/views.py                               |   5 +-
 docs/apache-airflow/migrations-ref.rst             |   4 +-
 tests/api/common/experimental/test_mark_tasks.py   |   4 +-
 .../endpoints/test_dag_run_endpoint.py             |  14 +-
 tests/api_connexion/schemas/test_dag_run_schema.py |   3 +
 tests/cli/commands/test_task_command.py            |   4 +-
 tests/dag_processing/test_processor.py             | 750 +++++++++++++++++++++
 tests/jobs/test_local_task_job.py                  | 130 +++-
 tests/jobs/test_scheduler_job.py                   | 278 ++++----
 tests/models/test_cleartasks.py                    |  37 +
 tests/models/test_dagrun.py                        |  25 +-
 tests/models/test_taskinstance.py                  | 103 ---
 tests/sensors/test_external_task_sensor.py         |   8 +-
 tests/utils/test_dag_processing.py                 |  11 +-
 22 files changed, 1264 insertions(+), 465 deletions(-)
 copy airflow/migrations/versions/{4446e08588_dagrun_start_end.py => 97cdd93827b8_add_queued_at_column_to_dagrun_table.py} (60%)
 create mode 100644 tests/dag_processing/test_processor.py

[airflow] 01/03: Add 'queued' state to DagRun (#16401)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f7bece3da14879712a895e897dad339944170ce3
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Jul 6 15:03:27 2021 +0100

    Add 'queued' state to DagRun (#16401)
    
    This change adds queued state to DagRun. Newly created DagRuns
    start in the queued state, are then moved to the running state after
    satisfying the DAG's max_active_runs. If the Dag doesn't have
    max_active_runs, the DagRuns are moved to running state immediately
    
    Clearing a DagRun sets the state to queued state
    
    Closes: #9975, #16366
    (cherry picked from commit 6611ffd399dce0474d8329720de7e83f568df598)
---
 airflow/api_connexion/openapi/v1.yaml              |   4 +-
 airflow/jobs/scheduler_job.py                      | 149 ++--
 ...93827b8_add_queued_at_column_to_dagrun_table.py |  49 ++
 airflow/models/dag.py                              |   4 +-
 airflow/models/dagrun.py                           |  17 +-
 airflow/models/taskinstance.py                     |   6 +-
 airflow/www/static/js/tree.js                      |   4 +-
 airflow/www/views.py                               |   5 +-
 docs/apache-airflow/migrations-ref.rst             |   4 +-
 tests/api/common/experimental/test_mark_tasks.py   |   4 +-
 .../endpoints/test_dag_run_endpoint.py             |  14 +-
 tests/api_connexion/schemas/test_dag_run_schema.py |   3 +
 tests/dag_processing/test_processor.py             | 746 +++++++++++++++++++++
 tests/jobs/test_scheduler_job.py                   | 206 +++---
 tests/models/test_cleartasks.py                    |  37 +
 tests/models/test_dagrun.py                        |  25 +-
 tests/sensors/test_external_task_sensor.py         |   8 +-
 tests/utils/test_dag_processing.py                 |  11 +-
 18 files changed, 1037 insertions(+), 259 deletions(-)

diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index 182f356..47fa3dc 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -1853,6 +1853,7 @@ components:
           description: |
             The start time. The time when DAG run was actually created.
           readOnly: true
+          nullable: true
         end_date:
           type: string
           format: date-time
@@ -3025,8 +3026,9 @@ components:
       description: DAG State.
       type: string
       enum:
-        - success
+        - queued
         - running
+        - success
         - failed
 
     TriggerRule:
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index fe8e0b0..b7506b5 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -804,7 +804,7 @@ class SchedulerJob(BaseJob):
         """
         For all DAG IDs in the DagBag, look for task instances in the
         old_states and set them to new_state if the corresponding DagRun
-        does not exist or exists but is not in the running state. This
+        does not exist or exists but is not in the running or queued state. This
         normally should not happen, but it can if the state of DagRuns are
         changed manually.
 
@@ -821,7 +821,7 @@ class SchedulerJob(BaseJob):
             .filter(models.TaskInstance.state.in_(old_states))
             .filter(
                 or_(
-                    models.DagRun.state != State.RUNNING,
+                    models.DagRun.state.notin_([State.RUNNING, State.QUEUED]),
                     models.DagRun.state.is_(None),
                 )
             )
@@ -1489,39 +1489,12 @@ class SchedulerJob(BaseJob):
             if settings.USE_JOB_SCHEDULE:
                 self._create_dagruns_for_dags(guard, session)
 
-            dag_runs = self._get_next_dagruns_to_examine(session)
+            self._start_queued_dagruns(session)
+            guard.commit()
+            dag_runs = self._get_next_dagruns_to_examine(State.RUNNING, session)
             # Bulk fetch the currently active dag runs for the dags we are
             # examining, rather than making one query per DagRun
 
-            # TODO: This query is probably horribly inefficient (though there is an
-            # index on (dag_id,state)). It is to deal with the case when a user
-            # clears more than max_active_runs older tasks -- we don't want the
-            # scheduler to suddenly go and start running tasks from all of the
-            # runs. (AIRFLOW-137/GH #1442)
-            #
-            # The longer term fix would be to have `clear` do this, and put DagRuns
-            # in to the queued state, then take DRs out of queued before creating
-            # any new ones
-
-            # Build up a set of execution_dates that are "active" for a given
-            # dag_id -- only tasks from those runs will be scheduled.
-            active_runs_by_dag_id = defaultdict(set)
-
-            query = (
-                session.query(
-                    TI.dag_id,
-                    TI.execution_date,
-                )
-                .filter(
-                    TI.dag_id.in_(list({dag_run.dag_id for dag_run in dag_runs})),
-                    TI.state.notin_(list(State.finished) + [State.REMOVED]),
-                )
-                .group_by(TI.dag_id, TI.execution_date)
-            )
-
-            for dag_id, execution_date in query:
-                active_runs_by_dag_id[dag_id].add(execution_date)
-
             for dag_run in dag_runs:
                 # Use try_except to not stop the Scheduler when a Serialized DAG is not found
                 # This takes care of Dynamic DAGs especially
@@ -1530,7 +1503,7 @@ class SchedulerJob(BaseJob):
                 # But this would take care of the scenario when the Scheduler is restarted after DagRun is
                 # created and the DAG is deleted / renamed
                 try:
-                    self._schedule_dag_run(dag_run, active_runs_by_dag_id.get(dag_run.dag_id, set()), session)
+                    self._schedule_dag_run(dag_run, session)
                 except SerializedDagNotFound:
                     self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
                     continue
@@ -1570,9 +1543,9 @@ class SchedulerJob(BaseJob):
             return num_queued_tis
 
     @retry_db_transaction
-    def _get_next_dagruns_to_examine(self, session):
+    def _get_next_dagruns_to_examine(self, state, session):
         """Get Next DagRuns to Examine with retries"""
-        return DagRun.next_dagruns_to_examine(session)
+        return DagRun.next_dagruns_to_examine(state, session)
 
     @retry_db_transaction
     def _create_dagruns_for_dags(self, guard, session):
@@ -1593,7 +1566,7 @@ class SchedulerJob(BaseJob):
         # as DagModel.dag_id and DagModel.next_dagrun
         # This list is used to verify if the DagRun already exist so that we don't attempt to create
         # duplicate dag runs
-        active_dagruns = (
+        existing_dagruns = (
             session.query(DagRun.dag_id, DagRun.execution_date)
             .filter(
                 tuple_(DagRun.dag_id, DagRun.execution_date).in_(
@@ -1616,89 +1589,83 @@ class SchedulerJob(BaseJob):
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to run self._update_dag_next_dagruns if the Dag Run already exists or if we
+            # we need to set dag.next_dagrun_info if the Dag Run already exists or if we
             # create a new one. This is so that in the next Scheduling loop we try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                run = dag.create_dagrun(
+            if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+                dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
-                    start_date=timezone.utcnow(),
-                    state=State.RUNNING,
+                    state=State.QUEUED,
                     external_trigger=False,
                     session=session,
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-
-                expected_start_date = dag.following_schedule(run.execution_date)
-                if expected_start_date:
-                    schedule_delay = run.start_date - expected_start_date
-                    Stats.timing(
-                        f'dagrun.schedule_delay.{dag.dag_id}',
-                        schedule_delay,
-                    )
-
-        self._update_dag_next_dagruns(dag_models, session)
+            dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
+                dag_model.next_dagrun
+            )
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], session: Session) -> None:
-        """
-        Bulk update the next_dagrun and next_dagrun_create_after for all the dags.
+    def _start_queued_dagruns(
+        self,
+        session: Session,
+    ) -> int:
+        """Find DagRuns in queued state and decide moving them to running state"""
+        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
 
-        We batch the select queries to get info about all the dags at once
-        """
-        # Check max_active_runs, to see if we are _now_ at the limit for any of
-        # these dag? (we've just created a DagRun for them after all)
-        active_runs_of_dags = dict(
+        active_runs_of_dags = defaultdict(
+            lambda: 0,
             session.query(DagRun.dag_id, func.count('*'))
-            .filter(
-                DagRun.dag_id.in_([o.dag_id for o in dag_models]),
+            .filter(  # We use `list` here because SQLA doesn't accept a set
+                # We use set to avoid duplicate dag_ids
+                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
                 DagRun.state == State.RUNNING,
-                DagRun.external_trigger.is_(False),
             )
             .group_by(DagRun.dag_id)
-            .all()
+            .all(),
         )
 
-        for dag_model in dag_models:
-            # Get the DAG in a try_except to not stop the Scheduler when a Serialized DAG is not found
-            # This takes care of Dynamic DAGs especially
+        def _update_state(dag_run):
+            dag_run.state = State.RUNNING
+            dag_run.start_date = timezone.utcnow()
+            expected_start_date = dag.following_schedule(dag_run.execution_date)
+            if expected_start_date:
+                schedule_delay = dag_run.start_date - expected_start_date
+                Stats.timing(
+                    f'dagrun.schedule_delay.{dag.dag_id}',
+                    schedule_delay,
+                )
+
+        for dag_run in dag_runs:
             try:
-                dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
+                dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
             except SerializedDagNotFound:
-                self.log.exception("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
+                self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
                 continue
-            active_runs_of_dag = active_runs_of_dags.get(dag.dag_id, 0)
-            if dag.max_active_runs and active_runs_of_dag >= dag.max_active_runs:
-                self.log.info(
-                    "DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
+            active_runs = active_runs_of_dags[dag_run.dag_id]
+            if dag.max_active_runs and active_runs >= dag.max_active_runs:
+                self.log.debug(
+                    "DAG %s already has %d active runs, not moving any more runs to RUNNING state %s",
                     dag.dag_id,
-                    active_runs_of_dag,
-                    dag.max_active_runs,
+                    active_runs,
+                    dag_run.execution_date,
                 )
-                dag_model.next_dagrun_create_after = None
             else:
-                dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
-                    dag_model.next_dagrun
-                )
+                active_runs_of_dags[dag_run.dag_id] += 1
+                _update_state(dag_run)
 
     def _schedule_dag_run(
         self,
         dag_run: DagRun,
-        currently_active_runs: Set[datetime.datetime],
         session: Session,
     ) -> int:
         """
         Make scheduling decisions about an individual dag run
 
-        ``currently_active_runs`` is passed in so that a batch query can be
-        used to ask this for all dag runs in the batch, to avoid an n+1 query.
-
         :param dag_run: The DagRun to schedule
-        :param currently_active_runs: Number of currently active runs of this DAG
         :return: Number of tasks scheduled
         """
         dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
@@ -1725,9 +1692,6 @@ class SchedulerJob(BaseJob):
             session.flush()
             self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
 
-            # Work out if we should allow creating a new DagRun now?
-            self._update_dag_next_dagruns([session.query(DagModel).get(dag_run.dag_id)], session)
-
             callback_to_execute = DagCallbackRequest(
                 full_filepath=dag.fileloc,
                 dag_id=dag.dag_id,
@@ -1745,19 +1709,6 @@ class SchedulerJob(BaseJob):
             self.log.error("Execution date is in future: %s", dag_run.execution_date)
             return 0
 
-        if dag.max_active_runs:
-            if (
-                len(currently_active_runs) >= dag.max_active_runs
-                and dag_run.execution_date not in currently_active_runs
-            ):
-                self.log.info(
-                    "DAG %s already has %d active runs, not queuing any tasks for run %s",
-                    dag.dag_id,
-                    len(currently_active_runs),
-                    dag_run.execution_date,
-                )
-                return 0
-
         self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session)
         # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
         schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
diff --git a/airflow/migrations/versions/97cdd93827b8_add_queued_at_column_to_dagrun_table.py b/airflow/migrations/versions/97cdd93827b8_add_queued_at_column_to_dagrun_table.py
new file mode 100644
index 0000000..03caebc
--- /dev/null
+++ b/airflow/migrations/versions/97cdd93827b8_add_queued_at_column_to_dagrun_table.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add queued_at column to dagrun table
+
+Revision ID: 97cdd93827b8
+Revises: a13f7613ad25
+Create Date: 2021-06-29 21:53:48.059438
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mssql
+
+# revision identifiers, used by Alembic.
+revision = '97cdd93827b8'
+down_revision = 'a13f7613ad25'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add queued_at column to dagrun table"""
+    conn = op.get_bind()
+    if conn.dialect.name == "mssql":
+        op.add_column('dag_run', sa.Column('queued_at', mssql.DATETIME2(precision=6), nullable=True))
+    else:
+        op.add_column('dag_run', sa.Column('queued_at', sa.DateTime(), nullable=True))
+
+
+def downgrade():
+    """Unapply Add queued_at column to dagrun table"""
+    op.drop_column('dag_run', "queued_at")
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 13d69c2..a3d06db 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1153,7 +1153,7 @@ class DAG(LoggingMixin):
         confirm_prompt=False,
         include_subdags=True,
         include_parentdag=True,
-        dag_run_state: str = State.RUNNING,
+        dag_run_state: str = State.QUEUED,
         dry_run=False,
         session=None,
         get_tis=False,
@@ -1369,7 +1369,7 @@ class DAG(LoggingMixin):
         confirm_prompt=False,
         include_subdags=True,
         include_parentdag=False,
-        dag_run_state=State.RUNNING,
+        dag_run_state=State.QUEUED,
         dry_run=False,
     ):
         all_tis = []
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 5b8ac0c..c503ac4 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -61,12 +61,15 @@ class DagRun(Base, LoggingMixin):
 
     __tablename__ = "dag_run"
 
+    __NO_VALUE = object()
+
     id = Column(Integer, primary_key=True)
     dag_id = Column(String(ID_LEN))
+    queued_at = Column(UtcDateTime)
     execution_date = Column(UtcDateTime, default=timezone.utcnow)
-    start_date = Column(UtcDateTime, default=timezone.utcnow)
+    start_date = Column(UtcDateTime)
     end_date = Column(UtcDateTime)
-    _state = Column('state', String(50), default=State.RUNNING)
+    _state = Column('state', String(50), default=State.QUEUED)
     run_id = Column(String(ID_LEN))
     creating_job_id = Column(Integer)
     external_trigger = Column(Boolean, default=True)
@@ -102,6 +105,7 @@ class DagRun(Base, LoggingMixin):
         self,
         dag_id: Optional[str] = None,
         run_id: Optional[str] = None,
+        queued_at: Optional[datetime] = __NO_VALUE,
         execution_date: Optional[datetime] = None,
         start_date: Optional[datetime] = None,
         external_trigger: Optional[bool] = None,
@@ -118,6 +122,10 @@ class DagRun(Base, LoggingMixin):
         self.external_trigger = external_trigger
         self.conf = conf or {}
         self.state = state
+        if queued_at is self.__NO_VALUE:
+            self.queued_at = timezone.utcnow() if state == State.QUEUED else None
+        else:
+            self.queued_at = queued_at
         self.run_type = run_type
         self.dag_hash = dag_hash
         self.creating_job_id = creating_job_id
@@ -140,6 +148,8 @@ class DagRun(Base, LoggingMixin):
         if self._state != state:
             self._state = state
             self.end_date = timezone.utcnow() if self._state in State.finished else None
+            if state == State.QUEUED:
+                self.queued_at = timezone.utcnow()
 
     @declared_attr
     def state(self):
@@ -160,6 +170,7 @@ class DagRun(Base, LoggingMixin):
     @classmethod
     def next_dagruns_to_examine(
         cls,
+        state: str,
         session: Session,
         max_number: Optional[int] = None,
     ):
@@ -180,7 +191,7 @@ class DagRun(Base, LoggingMixin):
         # TODO: Bake this query, it is run _A lot_
         query = (
             session.query(cls)
-            .filter(cls.state == State.RUNNING, cls.run_type != DagRunType.BACKFILL_JOB)
+            .filter(cls.state == state, cls.run_type != DagRunType.BACKFILL_JOB)
             .join(
                 DagModel,
                 DagModel.dag_id == cls.dag_id,
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index b99fa34..8d2578f 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -138,7 +138,7 @@ def clear_task_instances(
     session,
     activate_dag_runs=None,
     dag=None,
-    dag_run_state: Union[str, Literal[False]] = State.RUNNING,
+    dag_run_state: Union[str, Literal[False]] = State.QUEUED,
 ):
     """
     Clears a set of task instances, but makes sure the running ones
@@ -240,7 +240,9 @@ def clear_task_instances(
         )
         for dr in drs:
             dr.state = dag_run_state
-            dr.start_date = timezone.utcnow()
+            dr.start_date = None
+            if dag_run_state == State.QUEUED:
+                dr.last_scheduling_decision = None
 
 
 class TaskInstanceKey(NamedTuple):
diff --git a/airflow/www/static/js/tree.js b/airflow/www/static/js/tree.js
index 4bf366a..d45c880 100644
--- a/airflow/www/static/js/tree.js
+++ b/airflow/www/static/js/tree.js
@@ -58,7 +58,9 @@ document.addEventListener('DOMContentLoaded', () => {
   const tree = d3.layout.tree().nodeSize([0, 25]);
   let nodes = tree.nodes(data);
   const nodeobj = {};
-  const getActiveRuns = () => data.instances.filter((run) => run.state === 'running').length > 0;
+  const runActiveStates = ['queued', 'running'];
+  const getActiveRuns = () => data.instances
+    .filter((run) => runActiveStates.includes(run.state)).length > 0;
 
   const now = Date.now() / 1000;
   const devicePixelRatio = window.devicePixelRatio || 1;
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 09d27e0..9af5e1b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1526,7 +1526,7 @@ class Airflow(AirflowBaseView):
         dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             execution_date=execution_date,
-            state=State.RUNNING,
+            state=State.QUEUED,
             conf=run_conf,
             external_trigger=True,
             dag_hash=current_app.dag_bag.dags_hash.get(dag_id),
@@ -3451,6 +3451,7 @@ class DagRunModelView(AirflowModelView):
         'execution_date',
         'run_id',
         'run_type',
+        'queued_at',
         'start_date',
         'end_date',
         'external_trigger',
@@ -3786,7 +3787,7 @@ class TaskInstanceModelView(AirflowModelView):
         lazy_gettext('Clear'),
         lazy_gettext(
             'Are you sure you want to clear the state of the selected task'
-            ' instance(s) and set their dagruns to the running state?'
+            ' instance(s) and set their dagruns to the QUEUED state?'
         ),
         single=False,
     )
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 0c663da..0af143f 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,9 +23,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``e9304a3141f0`` (head)        | ``83f031fd9f1c`` |                 | Make XCom primary key columns non-nullable                                            |
-+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``83f031fd9f1c``               | ``a13f7613ad25`` |                 | Improve MSSQL compatibility                                                           |
+| ``97cdd93827b8`` (head)        | ``a13f7613ad25`` | ``2.1.3``       | Add ``queued_at`` column in ``dag_run`` table                                         |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``a13f7613ad25``               | ``e165e7455d70`` | ``2.1.0``       | Resource based permissions for default ``Flask-AppBuilder`` views                     |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/tests/api/common/experimental/test_mark_tasks.py b/tests/api/common/experimental/test_mark_tasks.py
index 4dab57e..49008d3 100644
--- a/tests/api/common/experimental/test_mark_tasks.py
+++ b/tests/api/common/experimental/test_mark_tasks.py
@@ -414,7 +414,9 @@ class TestMarkDAGRun(unittest.TestCase):
             assert ti.state == state
 
     def _create_test_dag_run(self, state, date):
-        return self.dag1.create_dagrun(run_type=DagRunType.MANUAL, state=state, execution_date=date)
+        return self.dag1.create_dagrun(
+            run_type=DagRunType.MANUAL, state=state, start_date=date, execution_date=date
+        )
 
     def _verify_dag_run_state(self, dag, date, state):
         drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date)
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index e51eca8..0aa13b2 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -90,7 +90,7 @@ class TestDagRunEndpoint:
 
     def teardown_method(self) -> None:
         clear_db_runs()
-        # clear_db_dags()
+        clear_db_dags()
 
     def _create_dag(self, dag_id):
         dag_instance = DagModel(dag_id=dag_id)
@@ -118,6 +118,7 @@ class TestDagRunEndpoint:
             execution_date=timezone.parse(self.default_time_2),
             start_date=timezone.parse(self.default_time),
             external_trigger=True,
+            state=state,
         )
         dag_runs.append(dagrun_model_2)
         if extra_dag:
@@ -131,6 +132,7 @@ class TestDagRunEndpoint:
                         execution_date=timezone.parse(self.default_time_2),
                         start_date=timezone.parse(self.default_time),
                         external_trigger=True,
+                        state=state,
                     )
                 )
         if commit:
@@ -193,6 +195,7 @@ class TestGetDagRun(TestDagRunEndpoint):
             execution_date=timezone.parse(self.default_time),
             start_date=timezone.parse(self.default_time),
             external_trigger=True,
+            state='running',
         )
         session.add(dagrun_model)
         session.commit()
@@ -532,7 +535,7 @@ class TestGetDagRunsEndDateFilters(TestDagRunEndpoint):
             (
                 f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte="
                 f"{(timezone.utcnow() + timedelta(days=1)).isoformat()}",
-                ["TEST_DAG_RUN_ID_1"],
+                ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
             ),
         ]
     )
@@ -750,6 +753,7 @@ class TestGetDagRunBatchPagination(TestDagRunEndpoint):
             DagRun(
                 dag_id="TEST_DAG_ID",
                 run_id="TEST_DAG_RUN_ID" + str(i),
+                state='running',
                 run_type=DagRunType.MANUAL,
                 execution_date=timezone.parse(self.default_time) + timedelta(minutes=i),
                 start_date=timezone.parse(self.default_time),
@@ -884,7 +888,7 @@ class TestGetDagRunBatchDateFilters(TestDagRunEndpoint):
             ),
             (
                 {"end_date_lte": f"{(timezone.utcnow() + timedelta(days=1)).isoformat()}"},
-                ["TEST_DAG_RUN_ID_1"],
+                ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
             ),
         ]
     )
@@ -927,8 +931,8 @@ class TestPostDagRun(TestDagRunEndpoint):
             "end_date": None,
             "execution_date": response.json["execution_date"],
             "external_trigger": True,
-            "start_date": response.json["start_date"],
-            "state": "running",
+            "start_date": None,
+            "state": "queued",
         } == response.json
 
     @parameterized.expand(
diff --git a/tests/api_connexion/schemas/test_dag_run_schema.py b/tests/api_connexion/schemas/test_dag_run_schema.py
index 3e6bf2e..9e4a9e8 100644
--- a/tests/api_connexion/schemas/test_dag_run_schema.py
+++ b/tests/api_connexion/schemas/test_dag_run_schema.py
@@ -49,6 +49,7 @@ class TestDAGRunSchema(TestDAGRunBase):
     def test_serialize(self, session):
         dagrun_model = DagRun(
             run_id="my-dag-run",
+            state='running',
             run_type=DagRunType.MANUAL.value,
             execution_date=timezone.parse(self.default_time),
             start_date=timezone.parse(self.default_time),
@@ -124,6 +125,7 @@ class TestDagRunCollection(TestDAGRunBase):
     def test_serialize(self, session):
         dagrun_model_1 = DagRun(
             run_id="my-dag-run",
+            state='running',
             execution_date=timezone.parse(self.default_time),
             run_type=DagRunType.MANUAL.value,
             start_date=timezone.parse(self.default_time),
@@ -131,6 +133,7 @@ class TestDagRunCollection(TestDAGRunBase):
         )
         dagrun_model_2 = DagRun(
             run_id="my-dag-run-2",
+            state='running',
             execution_date=timezone.parse(self.default_time),
             start_date=timezone.parse(self.default_time),
             run_type=DagRunType.MANUAL.value,
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
new file mode 100644
index 0000000..9425bbb
--- /dev/null
+++ b/tests/dag_processing/test_processor.py
@@ -0,0 +1,746 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import datetime
+import os
+import unittest
+from datetime import timedelta
+from tempfile import NamedTemporaryFile
+from unittest import mock
+from unittest.mock import MagicMock, patch
+
+import pytest
+from parameterized import parameterized
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.dag_processing.processor import DagFileProcessor
+from airflow.jobs.scheduler_job import SchedulerJob
+from airflow.models import DAG, DagBag, DagModel, SlaMiss, TaskInstance
+from airflow.models.dagrun import DagRun
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.models.taskinstance import SimpleTaskInstance
+from airflow.operators.bash import BashOperator
+from airflow.operators.dummy import DummyOperator
+from airflow.serialization.serialized_objects import SerializedDAG
+from airflow.utils import timezone
+from airflow.utils.callback_requests import TaskCallbackRequest
+from airflow.utils.dates import days_ago
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.test_utils.config import conf_vars, env_vars
+from tests.test_utils.db import (
+    clear_db_dags,
+    clear_db_import_errors,
+    clear_db_jobs,
+    clear_db_pools,
+    clear_db_runs,
+    clear_db_serialized_dags,
+    clear_db_sla_miss,
+)
+from tests.test_utils.mock_executor import MockExecutor
+
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+
+@pytest.fixture(scope="class")
+def disable_load_example():
+    with conf_vars({('core', 'load_examples'): 'false'}):
+        with env_vars({('core', 'load_examples'): 'false'}):
+            yield
+
+
+@pytest.mark.usefixtures("disable_load_example")
+class TestDagFileProcessor(unittest.TestCase):
+    @staticmethod
+    def clean_db():
+        clear_db_runs()
+        clear_db_pools()
+        clear_db_dags()
+        clear_db_sla_miss()
+        clear_db_import_errors()
+        clear_db_jobs()
+        clear_db_serialized_dags()
+
+    def setUp(self):
+        self.clean_db()
+
+        # Speed up some tests by not running the tasks, just look at what we
+        # enqueue!
+        self.null_exec = MockExecutor()
+        self.scheduler_job = None
+
+    def tearDown(self) -> None:
+        if self.scheduler_job and self.scheduler_job.processor_agent:
+            self.scheduler_job.processor_agent.end()
+            self.scheduler_job = None
+        self.clean_db()
+
+    def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timedelta(hours=1), **kwargs):
+        dag = DAG(
+            dag_id='test_scheduler_reschedule',
+            start_date=start_date,
+            # Make sure it only creates a single DAG Run
+            end_date=end_date,
+        )
+        dag.clear()
+        dag.is_subdag = False
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id, is_paused=False)
+            session.merge(orm_dag)
+            session.commit()
+        return dag
+
+    @classmethod
+    def setUpClass(cls):
+        # Ensure the DAGs we are looking at from the DB are up-to-date
+        non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False)
+        non_serialized_dagbag.sync_to_db()
+        cls.dagbag = DagBag(read_dags_from_db=True)
+
+    def test_dag_file_processor_sla_miss_callback(self):
+        """
+        Test that the dag file processor calls the sla miss callback
+        """
+        session = settings.Session()
+
+        sla_callback = MagicMock()
+
+        # Create dag with a start of 1 day ago, but an sla of 0
+        # so we'll already have an sla_miss on the books.
+        test_start_date = days_ago(1)
+        dag = DAG(
+            dag_id='test_sla_miss',
+            sla_miss_callback=sla_callback,
+            default_args={'start_date': test_start_date, 'sla': datetime.timedelta()},
+        )
+
+        task = DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session.merge(TaskInstance(task=task, execution_date=test_start_date, state='success'))
+
+        session.merge(SlaMiss(task_id='dummy', dag_id='test_sla_miss', execution_date=test_start_date))
+
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        dag_file_processor.manage_slas(dag=dag, session=session)
+
+        assert sla_callback.called
+
+    def test_dag_file_processor_sla_miss_callback_invalid_sla(self):
+        """
+        Test that the dag file processor does not call the sla miss callback when
+        given an invalid sla
+        """
+        session = settings.Session()
+
+        sla_callback = MagicMock()
+
+        # Create dag with a start of 1 day ago, but an sla of 0
+        # so we'll already have an sla_miss on the books.
+        # Pass anything besides a timedelta object to the sla argument.
+        test_start_date = days_ago(1)
+        dag = DAG(
+            dag_id='test_sla_miss',
+            sla_miss_callback=sla_callback,
+            default_args={'start_date': test_start_date, 'sla': None},
+        )
+
+        task = DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session.merge(TaskInstance(task=task, execution_date=test_start_date, state='success'))
+
+        session.merge(SlaMiss(task_id='dummy', dag_id='test_sla_miss', execution_date=test_start_date))
+
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        dag_file_processor.manage_slas(dag=dag, session=session)
+        sla_callback.assert_not_called()
+
+    def test_dag_file_processor_sla_miss_callback_sent_notification(self):
+        """
+        Test that the dag file processor does not call the sla_miss_callback when a
+        notification has already been sent
+        """
+        session = settings.Session()
+
+        # Mock the callback function so we can verify that it was not called
+        sla_callback = MagicMock()
+
+        # Create dag with a start of 2 days ago, but an sla of 1 day
+        # ago so we'll already have an sla_miss on the books
+        test_start_date = days_ago(2)
+        dag = DAG(
+            dag_id='test_sla_miss',
+            sla_miss_callback=sla_callback,
+            default_args={'start_date': test_start_date, 'sla': datetime.timedelta(days=1)},
+        )
+
+        task = DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        # Create a TaskInstance for two days ago
+        session.merge(TaskInstance(task=task, execution_date=test_start_date, state='success'))
+
+        # Create an SlaMiss where notification was sent, but email was not
+        session.merge(
+            SlaMiss(
+                task_id='dummy',
+                dag_id='test_sla_miss',
+                execution_date=test_start_date,
+                email_sent=False,
+                notification_sent=True,
+            )
+        )
+
+        # Now call manage_slas and see if the sla_miss callback gets called
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        dag_file_processor.manage_slas(dag=dag, session=session)
+
+        sla_callback.assert_not_called()
+
+    def test_dag_file_processor_sla_miss_callback_exception(self):
+        """
+        Test that the dag file processor gracefully logs an exception if there is a problem
+        calling the sla_miss_callback
+        """
+        session = settings.Session()
+
+        sla_callback = MagicMock(side_effect=RuntimeError('Could not call function'))
+
+        test_start_date = days_ago(2)
+        dag = DAG(
+            dag_id='test_sla_miss',
+            sla_miss_callback=sla_callback,
+            default_args={'start_date': test_start_date},
+        )
+
+        task = DummyOperator(task_id='dummy', dag=dag, owner='airflow', sla=datetime.timedelta(hours=1))
+
+        session.merge(TaskInstance(task=task, execution_date=test_start_date, state='Success'))
+
+        # Create an SlaMiss where notification was sent, but email was not
+        session.merge(SlaMiss(task_id='dummy', dag_id='test_sla_miss', execution_date=test_start_date))
+
+        # Now call manage_slas and see if the sla_miss callback gets called
+        mock_log = mock.MagicMock()
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock_log)
+        dag_file_processor.manage_slas(dag=dag, session=session)
+        assert sla_callback.called
+        mock_log.exception.assert_called_once_with(
+            'Could not call sla_miss_callback for DAG %s', 'test_sla_miss'
+        )
+
+    @mock.patch('airflow.dag_processing.processor.send_email')
+    def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks(self, mock_send_email):
+        session = settings.Session()
+
+        test_start_date = days_ago(2)
+        dag = DAG(
+            dag_id='test_sla_miss',
+            default_args={'start_date': test_start_date, 'sla': datetime.timedelta(days=1)},
+        )
+
+        email1 = 'test1@test.com'
+        task = DummyOperator(
+            task_id='sla_missed', dag=dag, owner='airflow', email=email1, sla=datetime.timedelta(hours=1)
+        )
+
+        session.merge(TaskInstance(task=task, execution_date=test_start_date, state='Success'))
+
+        email2 = 'test2@test.com'
+        DummyOperator(task_id='sla_not_missed', dag=dag, owner='airflow', email=email2)
+
+        session.merge(SlaMiss(task_id='sla_missed', dag_id='test_sla_miss', execution_date=test_start_date))
+
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+
+        dag_file_processor.manage_slas(dag=dag, session=session)
+
+        assert len(mock_send_email.call_args_list) == 1
+
+        send_email_to = mock_send_email.call_args_list[0][0][0]
+        assert email1 in send_email_to
+        assert email2 not in send_email_to
+
+    @mock.patch('airflow.dag_processing.processor.Stats.incr')
+    @mock.patch("airflow.utils.email.send_email")
+    def test_dag_file_processor_sla_miss_email_exception(self, mock_send_email, mock_stats_incr):
+        """
+        Test that the dag file processor gracefully logs an exception if there is a problem
+        sending an email
+        """
+        session = settings.Session()
+
+        # Mock the callback function so we can verify that it was not called
+        mock_send_email.side_effect = RuntimeError('Could not send an email')
+
+        test_start_date = days_ago(2)
+        dag = DAG(
+            dag_id='test_sla_miss',
+            default_args={'start_date': test_start_date, 'sla': datetime.timedelta(days=1)},
+        )
+
+        task = DummyOperator(
+            task_id='dummy', dag=dag, owner='airflow', email='test@test.com', sla=datetime.timedelta(hours=1)
+        )
+
+        session.merge(TaskInstance(task=task, execution_date=test_start_date, state='Success'))
+
+        # Create an SlaMiss where notification was sent, but email was not
+        session.merge(SlaMiss(task_id='dummy', dag_id='test_sla_miss', execution_date=test_start_date))
+
+        mock_log = mock.MagicMock()
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock_log)
+
+        dag_file_processor.manage_slas(dag=dag, session=session)
+        mock_log.exception.assert_called_once_with(
+            'Could not send SLA Miss email notification for DAG %s', 'test_sla_miss'
+        )
+        mock_stats_incr.assert_called_once_with('sla_email_notification_failure')
+
+    def test_dag_file_processor_sla_miss_deleted_task(self):
+        """
+        Test that the dag file processor will not crash when trying to send
+        sla miss notification for a deleted task
+        """
+        session = settings.Session()
+
+        test_start_date = days_ago(2)
+        dag = DAG(
+            dag_id='test_sla_miss',
+            default_args={'start_date': test_start_date, 'sla': datetime.timedelta(days=1)},
+        )
+
+        task = DummyOperator(
+            task_id='dummy', dag=dag, owner='airflow', email='test@test.com', sla=datetime.timedelta(hours=1)
+        )
+
+        session.merge(TaskInstance(task=task, execution_date=test_start_date, state='Success'))
+
+        # Create an SlaMiss where notification was sent, but email was not
+        session.merge(
+            SlaMiss(task_id='dummy_deleted', dag_id='test_sla_miss', execution_date=test_start_date)
+        )
+
+        mock_log = mock.MagicMock()
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock_log)
+        dag_file_processor.manage_slas(dag=dag, session=session)
+
+    @parameterized.expand(
+        [
+            [State.NONE, None, None],
+            [
+                State.UP_FOR_RETRY,
+                timezone.utcnow() - datetime.timedelta(minutes=30),
+                timezone.utcnow() - datetime.timedelta(minutes=15),
+            ],
+            [
+                State.UP_FOR_RESCHEDULE,
+                timezone.utcnow() - datetime.timedelta(minutes=30),
+                timezone.utcnow() - datetime.timedelta(minutes=15),
+            ],
+        ]
+    )
+    def test_dag_file_processor_process_task_instances(self, state, start_date, end_date):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(dag_id='test_scheduler_process_execute_task', start_date=DEFAULT_DATE)
+        BashOperator(task_id='dummy', dag=dag, owner='airflow', bash_command='echo hi')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+        dag.clear()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        assert dr is not None
+
+        with create_session() as session:
+            ti = dr.get_task_instances(session=session)[0]
+            ti.state = state
+            ti.start_date = start_date
+            ti.end_date = end_date
+
+            count = self.scheduler_job._schedule_dag_run(dr, session)
+            assert count == 1
+
+            session.refresh(ti)
+            assert ti.state == State.SCHEDULED
+
+    @parameterized.expand(
+        [
+            [State.NONE, None, None],
+            [
+                State.UP_FOR_RETRY,
+                timezone.utcnow() - datetime.timedelta(minutes=30),
+                timezone.utcnow() - datetime.timedelta(minutes=15),
+            ],
+            [
+                State.UP_FOR_RESCHEDULE,
+                timezone.utcnow() - datetime.timedelta(minutes=30),
+                timezone.utcnow() - datetime.timedelta(minutes=15),
+            ],
+        ]
+    )
+    def test_dag_file_processor_process_task_instances_with_task_concurrency(
+        self,
+        state,
+        start_date,
+        end_date,
+    ):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(dag_id='test_scheduler_process_execute_task_with_task_concurrency', start_date=DEFAULT_DATE)
+        BashOperator(task_id='dummy', task_concurrency=2, dag=dag, owner='airflow', bash_command='echo Hi')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+        dag.clear()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        assert dr is not None
+
+        with create_session() as session:
+            ti = dr.get_task_instances(session=session)[0]
+            ti.state = state
+            ti.start_date = start_date
+            ti.end_date = end_date
+
+            count = self.scheduler_job._schedule_dag_run(dr, session)
+            assert count == 1
+
+            session.refresh(ti)
+            assert ti.state == State.SCHEDULED
+
+    @parameterized.expand(
+        [
+            [State.NONE, None, None],
+            [
+                State.UP_FOR_RETRY,
+                timezone.utcnow() - datetime.timedelta(minutes=30),
+                timezone.utcnow() - datetime.timedelta(minutes=15),
+            ],
+            [
+                State.UP_FOR_RESCHEDULE,
+                timezone.utcnow() - datetime.timedelta(minutes=30),
+                timezone.utcnow() - datetime.timedelta(minutes=15),
+            ],
+        ]
+    )
+    def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_depends_on_past',
+            start_date=DEFAULT_DATE,
+            default_args={
+                'depends_on_past': True,
+            },
+        )
+        BashOperator(task_id='dummy1', dag=dag, owner='airflow', bash_command='echo hi')
+        BashOperator(task_id='dummy2', dag=dag, owner='airflow', bash_command='echo hi')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+        dag.clear()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        assert dr is not None
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+            count = self.scheduler_job._schedule_dag_run(dr, session)
+            assert count == 2
+
+            session.refresh(tis[0])
+            session.refresh(tis[1])
+            assert tis[0].state == State.SCHEDULED
+            assert tis[1].state == State.SCHEDULED
+
+    def test_scheduler_job_add_new_task(self):
+        """
+        Test if a task instance will be added if the dag is updated
+        """
+        dag = DAG(dag_id='test_scheduler_add_new_task', start_date=DEFAULT_DATE)
+        BashOperator(task_id='dummy', dag=dag, owner='airflow', bash_command='echo test')
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+        # Since we don't want to store the code for the DAG defined in this file
+        with mock.patch.object(settings, "STORE_DAG_CODE", False):
+            self.scheduler_job.dagbag.sync_to_db()
+
+        session = settings.Session()
+        orm_dag = session.query(DagModel).get(dag.dag_id)
+        assert orm_dag is not None
+
+        if self.scheduler_job.processor_agent:
+            self.scheduler_job.processor_agent.end()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        dag = self.scheduler_job.dagbag.get_dag('test_scheduler_add_new_task', session=session)
+        self.scheduler_job._create_dag_runs([orm_dag], session)
+
+        drs = DagRun.find(dag_id=dag.dag_id, session=session)
+        assert len(drs) == 1
+        dr = drs[0]
+
+        tis = dr.get_task_instances()
+        assert len(tis) == 1
+
+        BashOperator(task_id='dummy2', dag=dag, owner='airflow', bash_command='echo test')
+        SerializedDagModel.write_dag(dag=dag)
+
+        scheduled_tis = self.scheduler_job._schedule_dag_run(dr, session)
+        session.flush()
+        assert scheduled_tis == 2
+
+        drs = DagRun.find(dag_id=dag.dag_id, session=session)
+        assert len(drs) == 1
+        dr = drs[0]
+
+        tis = dr.get_task_instances()
+        assert len(tis) == 2
+
+    def test_runs_respected_after_clear(self):
+        """
+        Test dag after dag.clear, max_active_runs is respected
+        """
+        dag = DAG(dag_id='test_scheduler_max_active_runs_respected_after_clear', start_date=DEFAULT_DATE)
+        dag.max_active_runs = 1
+
+        BashOperator(task_id='dummy', dag=dag, owner='airflow', bash_command='echo Hi')
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+        session.close()
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag.dag_id)
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+        date = DEFAULT_DATE
+        dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=date,
+            state=State.QUEUED,
+        )
+        date = dag.following_schedule(date)
+        dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=date,
+            state=State.QUEUED,
+        )
+        date = dag.following_schedule(date)
+        dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=date,
+            state=State.QUEUED,
+        )
+        dag.clear()
+
+        assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 3
+
+        session = settings.Session()
+        self.scheduler_job._start_queued_dagruns(session)
+        session.commit()
+        # Assert that only 1 dagrun is active
+        assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)) == 1
+        # Assert that the other two are queued
+        assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 2
+
+    @patch.object(TaskInstance, 'handle_failure_with_callback')
+    def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False)
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        with create_session() as session:
+            session.query(TaskInstance).delete()
+            dag = dagbag.get_dag('example_branch_operator')
+            task = dag.get_task(task_id='run_this_first')
+
+            ti = TaskInstance(task, DEFAULT_DATE, State.RUNNING)
+
+            session.add(ti)
+            session.commit()
+
+            requests = [
+                TaskCallbackRequest(
+                    full_filepath="A", simple_task_instance=SimpleTaskInstance(ti), msg="Message"
+                )
+            ]
+            dag_file_processor.execute_callbacks(dagbag, requests)
+            mock_ti_handle_failure.assert_called_once_with(
+                error="Message",
+                test_mode=conf.getboolean('core', 'unit_test_mode'),
+            )
+
+    def test_process_file_should_failure_callback(self):
+        dag_file = os.path.join(
+            os.path.dirname(os.path.realpath(__file__)), '../dags/test_on_failure_callback.py'
+        )
+        dagbag = DagBag(dag_folder=dag_file, include_examples=False)
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        with create_session() as session, NamedTemporaryFile(delete=False) as callback_file:
+            session.query(TaskInstance).delete()
+            dag = dagbag.get_dag('test_om_failure_callback_dag')
+            task = dag.get_task(task_id='test_om_failure_callback_task')
+
+            ti = TaskInstance(task, DEFAULT_DATE, State.RUNNING)
+
+            session.add(ti)
+            session.commit()
+
+            requests = [
+                TaskCallbackRequest(
+                    full_filepath=dag.full_filepath,
+                    simple_task_instance=SimpleTaskInstance(ti),
+                    msg="Message",
+                )
+            ]
+            callback_file.close()
+
+            with mock.patch.dict("os.environ", {"AIRFLOW_CALLBACK_FILE": callback_file.name}):
+                dag_file_processor.process_file(dag_file, requests)
+            with open(callback_file.name) as callback_file2:
+                content = callback_file2.read()
+            assert "Callback fired" == content
+            os.remove(callback_file.name)
+
+    def test_should_mark_dummy_task_as_success(self):
+        dag_file = os.path.join(
+            os.path.dirname(os.path.realpath(__file__)), '../dags/test_only_dummy_tasks.py'
+        )
+
+        # Write DAGs to dag and serialized_dag table
+        dagbag = DagBag(dag_folder=dag_file, include_examples=False, read_dags_from_db=False)
+        dagbag.sync_to_db()
+
+        self.scheduler_job_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job_job.processor_agent = mock.MagicMock()
+        dag = self.scheduler_job_job.dagbag.get_dag("test_only_dummy_tasks")
+
+        # Create DagRun
+        session = settings.Session()
+        orm_dag = session.query(DagModel).get(dag.dag_id)
+        self.scheduler_job_job._create_dag_runs([orm_dag], session)
+
+        drs = DagRun.find(dag_id=dag.dag_id, session=session)
+        assert len(drs) == 1
+        dr = drs[0]
+
+        # Schedule TaskInstances
+        self.scheduler_job_job._schedule_dag_run(dr, session)
+        with create_session() as session:
+            tis = session.query(TaskInstance).all()
+
+        dags = self.scheduler_job_job.dagbag.dags.values()
+        assert ['test_only_dummy_tasks'] == [dag.dag_id for dag in dags]
+        assert 5 == len(tis)
+        assert {
+            ('test_task_a', 'success'),
+            ('test_task_b', None),
+            ('test_task_c', 'success'),
+            ('test_task_on_execute', 'scheduled'),
+            ('test_task_on_success', 'scheduled'),
+        } == {(ti.task_id, ti.state) for ti in tis}
+        for state, start_date, end_date, duration in [
+            (ti.state, ti.start_date, ti.end_date, ti.duration) for ti in tis
+        ]:
+            if state == 'success':
+                assert start_date is not None
+                assert end_date is not None
+                assert 0.0 == duration
+            else:
+                assert start_date is None
+                assert end_date is None
+                assert duration is None
+
+        self.scheduler_job_job._schedule_dag_run(dr, session)
+        with create_session() as session:
+            tis = session.query(TaskInstance).all()
+
+        assert 5 == len(tis)
+        assert {
+            ('test_task_a', 'success'),
+            ('test_task_b', 'success'),
+            ('test_task_c', 'success'),
+            ('test_task_on_execute', 'scheduled'),
+            ('test_task_on_success', 'scheduled'),
+        } == {(ti.task_id, ti.state) for ti in tis}
+        for state, start_date, end_date, duration in [
+            (ti.state, ti.start_date, ti.end_date, ti.duration) for ti in tis
+        ]:
+            if state == 'success':
+                assert start_date is not None
+                assert end_date is not None
+                assert 0.0 == duration
+            else:
+                assert start_date is None
+                assert end_date is None
+                assert duration is None
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index fe0b257..33c82d7 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -23,7 +23,6 @@ import shutil
 import unittest
 from datetime import timedelta
 from tempfile import NamedTemporaryFile, mkdtemp
-from time import sleep
 from unittest import mock
 from unittest.mock import MagicMock, patch
 from zipfile import ZipFile
@@ -1114,7 +1113,6 @@ class TestSchedulerJob(unittest.TestCase):
         session.flush()
 
         res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
-
         assert 2 == len(res)
         res_keys = map(lambda x: x.key, res)
         assert ti_no_dagrun.key in res_keys
@@ -2259,15 +2257,16 @@ class TestSchedulerJob(unittest.TestCase):
 
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job._create_dag_runs([orm_dag], session)
+        self.scheduler_job._start_queued_dagruns(session)
 
         drs = DagRun.find(dag_id=dag.dag_id, session=session)
         assert len(drs) == 1
         dr = drs[0]
 
-        # Should not be able to create a new dag run, as we are at max active runs
-        assert orm_dag.next_dagrun_create_after is None
+        # This should have a value since we control max_active_runs
+        # by DagRun State.
+        assert orm_dag.next_dagrun_create_after
         # But we should record the date of _what run_ it would be
         assert isinstance(orm_dag.next_dagrun, datetime.datetime)
 
@@ -2279,7 +2278,7 @@ class TestSchedulerJob(unittest.TestCase):
         self.scheduler_job.processor_agent = mock.Mock()
         self.scheduler_job.processor_agent.send_callback_to_execute = mock.Mock()
 
-        self.scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
         session.flush()
 
         session.refresh(dr)
@@ -2336,7 +2335,7 @@ class TestSchedulerJob(unittest.TestCase):
         self.scheduler_job.processor_agent = mock.Mock()
         self.scheduler_job.processor_agent.send_callback_to_execute = mock.Mock()
 
-        self.scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
         session.flush()
 
         session.refresh(dr)
@@ -2395,7 +2394,7 @@ class TestSchedulerJob(unittest.TestCase):
         ti = dr.get_task_instance('dummy')
         ti.set_state(state, session)
 
-        self.scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
 
         expected_callback = DagCallbackRequest(
             full_filepath=dr.dag.fileloc,
@@ -2450,7 +2449,7 @@ class TestSchedulerJob(unittest.TestCase):
         ti = dr.get_task_instance('test_task')
         ti.set_state(state, session)
 
-        self.scheduler_job._schedule_dag_run(dr, set(), session)
+        self.scheduler_job._schedule_dag_run(dr, session)
 
         # Verify Callback is not set (i.e is None) when no callbacks are set on DAG
         self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once_with(dr, None)
@@ -2830,13 +2829,13 @@ class TestSchedulerJob(unittest.TestCase):
             execution_date=DEFAULT_DATE,
             state=State.RUNNING,
         )
-        self.scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=dag.following_schedule(dr.execution_date),
             state=State.RUNNING,
         )
-        self.scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
         task_instances_list = self.scheduler_job._executable_task_instances_to_queued(
             max_tis=32, session=session
         )
@@ -2887,7 +2886,7 @@ class TestSchedulerJob(unittest.TestCase):
                 execution_date=date,
                 state=State.RUNNING,
             )
-            self.scheduler_job._schedule_dag_run(dr, {}, session)
+            self.scheduler_job._schedule_dag_run(dr, session)
             date = dag.following_schedule(date)
 
         task_instances_list = self.scheduler_job._executable_task_instances_to_queued(
@@ -2950,7 +2949,7 @@ class TestSchedulerJob(unittest.TestCase):
                 execution_date=date,
                 state=State.RUNNING,
             )
-            scheduler._schedule_dag_run(dr, {}, session)
+            scheduler._schedule_dag_run(dr, session)
             date = dag_d1.following_schedule(date)
 
         date = DEFAULT_DATE
@@ -2960,7 +2959,7 @@ class TestSchedulerJob(unittest.TestCase):
                 execution_date=date,
                 state=State.RUNNING,
             )
-            scheduler._schedule_dag_run(dr, {}, session)
+            scheduler._schedule_dag_run(dr, session)
             date = dag_d2.following_schedule(date)
 
         scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
@@ -3037,7 +3036,7 @@ class TestSchedulerJob(unittest.TestCase):
             execution_date=DEFAULT_DATE,
             state=State.RUNNING,
         )
-        self.scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
 
         task_instances_list = self.scheduler_job._executable_task_instances_to_queued(
             max_tis=32, session=session
@@ -3096,7 +3095,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         # Verify that DagRun.verify_integrity is not called
         with mock.patch('airflow.jobs.scheduler_job.DagRun.verify_integrity') as mock_verify_integrity:
-            scheduled_tis = self.scheduler_job._schedule_dag_run(dr, {}, session)
+            scheduled_tis = self.scheduler_job._schedule_dag_run(dr, session)
             mock_verify_integrity.assert_not_called()
         session.flush()
 
@@ -3159,7 +3158,7 @@ class TestSchedulerJob(unittest.TestCase):
         dag_version_2 = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session)
         assert dag_version_2 != dag_version_1
 
-        scheduled_tis = self.scheduler_job._schedule_dag_run(dr, {}, session)
+        scheduled_tis = self.scheduler_job._schedule_dag_run(dr, session)
         session.flush()
 
         assert scheduled_tis == 2
@@ -3871,14 +3870,13 @@ class TestSchedulerJob(unittest.TestCase):
                 full_filepath=dag.fileloc, dag_id=dag_id
             )
 
-    @freeze_time(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9))
-    @mock.patch('airflow.jobs.scheduler_job.Stats.timing')
-    def test_create_dag_runs(self, stats_timing):
+    def test_create_dag_runs(self):
         """
         Test various invariants of _create_dag_runs.
 
         - That the run created has the creating_job_id set
-        - That we emit the right DagRun metrics
+        - That the run created is on QUEUED State
+        - That dag_model has next_dagrun
         """
         dag = DAG(dag_id='test_create_dag_runs', start_date=DEFAULT_DATE)
 
@@ -3902,8 +3900,51 @@ class TestSchedulerJob(unittest.TestCase):
         with create_session() as session:
             self.scheduler_job._create_dag_runs([dag_model], session)
 
+        dr = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).first()
+        # Assert dr state is queued
+        assert dr.state == State.QUEUED
+        assert dr.start_date is None
+
+        assert dag.get_last_dagrun().creating_job_id == self.scheduler_job.id
+
+    @freeze_time(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9))
+    @mock.patch('airflow.jobs.scheduler_job.Stats.timing')
+    def test_start_dagruns(self, stats_timing):
+        """
+        Test that _start_dagrun:
+
+        - moves runs to RUNNING State
+        - emit the right DagRun metrics
+        """
+        dag = DAG(dag_id='test_start_dag_runs', start_date=DEFAULT_DATE)
+
+        DummyOperator(
+            task_id='dummy',
+            dag=dag,
+        )
+
+        dagbag = DagBag(
+            dag_folder=os.devnull,
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.sync_to_db()
+        dag_model = DagModel.get_dagmodel(dag.dag_id)
+
+        self.scheduler_job = SchedulerJob(executor=self.null_exec)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+
+        with create_session() as session:
+            self.scheduler_job._create_dag_runs([dag_model], session)
+            self.scheduler_job._start_queued_dagruns(session)
+
+        dr = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).first()
+        # Assert dr state is running
+        assert dr.state == State.RUNNING
+
         stats_timing.assert_called_once_with(
-            "dagrun.schedule_delay.test_create_dag_runs", datetime.timedelta(seconds=9)
+            "dagrun.schedule_delay.test_start_dag_runs", datetime.timedelta(seconds=9)
         )
 
         assert dag.get_last_dagrun().creating_job_id == self.scheduler_job.id
@@ -4102,61 +4143,7 @@ class TestSchedulerJob(unittest.TestCase):
         assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1)
         session.rollback()
 
-    def test_do_schedule_max_active_runs_upstream_failed(self):
-        """
-        Test that tasks in upstream failed don't count as actively running.
-
-        This test can be removed when adding a queued state to DagRuns.
-        """
-
-        with DAG(
-            dag_id='test_max_active_run_with_upstream_failed',
-            start_date=DEFAULT_DATE,
-            schedule_interval='@once',
-            max_active_runs=1,
-        ) as dag:
-            # Can't use DummyOperator as that goes straight to success
-            task1 = BashOperator(task_id='dummy1', bash_command='true')
-
-        session = settings.Session()
-        dagbag = DagBag(
-            dag_folder=os.devnull,
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        dagbag.sync_to_db(session=session)
-
-        run1 = dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            execution_date=DEFAULT_DATE,
-            state=State.RUNNING,
-            session=session,
-        )
-
-        ti = run1.get_task_instance(task1.task_id, session)
-        ti.state = State.UPSTREAM_FAILED
-
-        run2 = dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            execution_date=DEFAULT_DATE + timedelta(hours=1),
-            state=State.RUNNING,
-            session=session,
-        )
-
-        dag.sync_to_db(session=session)  # Update the date fields
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.executor = MockExecutor(do_update=False)
-        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
-
-        num_queued = self.scheduler_job._do_scheduling(session)
-
-        assert num_queued == 1
-        ti = run2.get_task_instance(task1.task_id, session)
-        assert ti.state == State.QUEUED
-
+    @conf_vars({('scheduler', 'use_job_schedule'): "false"})
     def test_do_schedule_max_active_runs_dag_timed_out(self):
         """Test that tasks are set to a finished state when their DAG times out"""
 
@@ -4189,33 +4176,36 @@ class TestSchedulerJob(unittest.TestCase):
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
             state=State.RUNNING,
+            start_date=timezone.utcnow() - timedelta(seconds=2),
             session=session,
         )
+
         run1_ti = run1.get_task_instance(task1.task_id, session)
         run1_ti.state = State.RUNNING
 
-        sleep(1)
-
         run2 = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE + timedelta(seconds=10),
-            state=State.RUNNING,
+            state=State.QUEUED,
             session=session,
         )
 
         dag.sync_to_db(session=session)
-
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job.executor = MockExecutor()
         self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
 
-        _ = self.scheduler_job._do_scheduling(session)
-
+        self.scheduler_job._do_scheduling(session)
+        session.add(run1)
+        session.refresh(run1)
         assert run1.state == State.FAILED
         assert run1_ti.state == State.SKIPPED
-        assert run2.state == State.RUNNING
 
-        _ = self.scheduler_job._do_scheduling(session)
+        # Run scheduling again to assert run2 has started
+        self.scheduler_job._do_scheduling(session)
+        session.add(run2)
+        session.refresh(run2)
+        assert run2.state == State.RUNNING
         run2_ti = run2.get_task_instance(task1.task_id, session)
         assert run2_ti.state == State.QUEUED
 
@@ -4265,8 +4255,8 @@ class TestSchedulerJob(unittest.TestCase):
 
     def test_do_schedule_max_active_runs_and_manual_trigger(self):
         """
-        Make sure that when a DAG is already at max_active_runs, that manually triggering a run doesn't cause
-        the dag to "stall".
+        Make sure that when a DAG is already at max_active_runs, that manually triggered
+        dagruns don't start running.
         """
 
         with DAG(
@@ -4281,7 +4271,7 @@ class TestSchedulerJob(unittest.TestCase):
 
             task1 >> task2
 
-            task3 = BashOperator(task_id='dummy3', bash_command='true')
+            BashOperator(task_id='dummy3', bash_command='true')
 
         session = settings.Session()
         dagbag = DagBag(
@@ -4296,7 +4286,7 @@ class TestSchedulerJob(unittest.TestCase):
         dag_run = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
-            state=State.RUNNING,
+            state=State.QUEUED,
             session=session,
         )
 
@@ -4314,47 +4304,23 @@ class TestSchedulerJob(unittest.TestCase):
 
         assert num_queued == 2
         assert dag_run.state == State.RUNNING
-        ti1 = dag_run.get_task_instance(task1.task_id, session)
-        assert ti1.state == State.QUEUED
-
-        # Set task1 to success (so task2 can run) but keep task3 as "running"
-        ti1.state = State.SUCCESS
-
-        ti3 = dag_run.get_task_instance(task3.task_id, session)
-        ti3.state = State.RUNNING
-
-        session.flush()
-
-        # At this point, ti2 and ti3 of the scheduled dag run should be running
-        num_queued = self.scheduler_job._do_scheduling(session)
-
-        assert num_queued == 1
-        # Should have queued task2
-        ti2 = dag_run.get_task_instance(task2.task_id, session)
-        assert ti2.state == State.QUEUED
-
-        ti2.state = None
-        session.flush()
 
         # Now that this one is running, manually trigger a dag.
 
-        manual_run = dag.create_dagrun(
+        dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             execution_date=DEFAULT_DATE + timedelta(hours=1),
-            state=State.RUNNING,
+            state=State.QUEUED,
             session=session,
         )
         session.flush()
 
-        num_queued = self.scheduler_job._do_scheduling(session)
+        self.scheduler_job._do_scheduling(session)
 
-        assert num_queued == 1
-        # Should have queued task2 again.
-        ti2 = dag_run.get_task_instance(task2.task_id, session)
-        assert ti2.state == State.QUEUED
-        # Manual run shouldn't have been started, because we're at max_active_runs with DR1
-        ti1 = manual_run.get_task_instance(task1.task_id, session)
-        assert ti1.state is None
+        # Assert that only 1 dagrun is active
+        assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)) == 1
+        # Assert that the other one is queued
+        assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 1
 
 
 @pytest.mark.xfail(reason="Work out where this goes")
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index 9b8fbd0..4f64347 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -19,6 +19,8 @@
 import datetime
 import unittest
 
+from parameterized import parameterized
+
 from airflow import settings
 from airflow.models import DAG, TaskInstance as TI, TaskReschedule, clear_task_instances
 from airflow.operators.dummy import DummyOperator
@@ -92,6 +94,41 @@ class TestClearTasks(unittest.TestCase):
             assert ti0.state is None
             assert ti0.external_executor_id is None
 
+    @parameterized.expand([(State.QUEUED, None), (State.RUNNING, DEFAULT_DATE)])
+    def test_clear_task_instances_dr_state(self, state, last_scheduling):
+        """Test that DR state is set to None after clear.
+        And that DR.last_scheduling_decision is handled OK.
+        start_date is also set to None
+        """
+        dag = DAG(
+            'test_clear_task_instances',
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+        )
+        task0 = DummyOperator(task_id='0', owner='test', dag=dag)
+        task1 = DummyOperator(task_id='1', owner='test', dag=dag, retries=2)
+        ti0 = TI(task=task0, execution_date=DEFAULT_DATE)
+        ti1 = TI(task=task1, execution_date=DEFAULT_DATE)
+        session = settings.Session()
+        dr = dag.create_dagrun(
+            execution_date=ti0.execution_date,
+            state=State.RUNNING,
+            run_type=DagRunType.SCHEDULED,
+        )
+        dr.last_scheduling_decision = DEFAULT_DATE
+        session.add(dr)
+        session.commit()
+
+        ti0.run()
+        ti1.run()
+        qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
+        clear_task_instances(qry, session, dag_run_state=state, dag=dag)
+
+        dr = ti0.get_dagrun()
+        assert dr.state == state
+        assert dr.start_date is None
+        assert dr.last_scheduling_decision == last_scheduling
+
     def test_clear_task_instances_without_task(self):
         dag = DAG(
             'test_clear_task_instances_without_task',
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 7899199..fac38bf 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -39,7 +39,7 @@ from airflow.utils.state import State
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.types import DagRunType
 from tests.models import DEFAULT_DATE
-from tests.test_utils.db import clear_db_jobs, clear_db_pools, clear_db_runs
+from tests.test_utils.db import clear_db_dags, clear_db_jobs, clear_db_pools, clear_db_runs
 
 
 class TestDagRun(unittest.TestCase):
@@ -50,6 +50,12 @@ class TestDagRun(unittest.TestCase):
     def setUp(self):
         clear_db_runs()
         clear_db_pools()
+        clear_db_dags()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
+        clear_db_pools()
+        clear_db_dags()
 
     def create_dag_run(
         self,
@@ -102,7 +108,7 @@ class TestDagRun(unittest.TestCase):
         session.commit()
         ti0.refresh_from_db()
         dr0 = session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.execution_date == now).first()
-        assert dr0.state == State.RUNNING
+        assert dr0.state == State.QUEUED
 
     def test_dagrun_find(self):
         session = settings.Session()
@@ -692,9 +698,11 @@ class TestDagRun(unittest.TestCase):
         ti.run()
         assert (ti.state == State.SUCCESS) == is_ti_success
 
-    def test_next_dagruns_to_examine_only_unpaused(self):
+    @parameterized.expand([(State.QUEUED,), (State.RUNNING,)])
+    def test_next_dagruns_to_examine_only_unpaused(self, state):
         """
         Check that "next_dagruns_to_examine" ignores runs from paused/inactive DAGs
+        and gets running/queued dagruns
         """
 
         dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
@@ -712,25 +720,22 @@ class TestDagRun(unittest.TestCase):
         session.flush()
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
-            state=State.RUNNING,
+            state=state,
             execution_date=DEFAULT_DATE,
-            start_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE if state == State.RUNNING else None,
             session=session,
         )
 
-        runs = DagRun.next_dagruns_to_examine(session).all()
+        runs = DagRun.next_dagruns_to_examine(state, session).all()
 
         assert runs == [dr]
 
         orm_dag.is_paused = True
         session.flush()
 
-        runs = DagRun.next_dagruns_to_examine(session).all()
+        runs = DagRun.next_dagruns_to_examine(state, session).all()
         assert runs == []
 
-        session.rollback()
-        session.close()
-
     @mock.patch.object(Stats, 'timing')
     def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock):
         """
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 187fdb2..274766c 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -545,16 +545,16 @@ def test_external_task_marker_clear_activate(dag_bag_parent_child):
     task_0 = dag_0.get_task("task_0")
     clear_tasks(dag_bag, dag_0, task_0, start_date=day_1, end_date=day_2)
 
-    # Assert that dagruns of all the affected dags are set to RUNNING after tasks are cleared.
+    # Assert that dagruns of all the affected dags are set to QUEUED after tasks are cleared.
     # Unaffected dagruns should be left as SUCCESS.
     dagrun_0_1 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_1)
     dagrun_0_2 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_2)
     dagrun_1_1 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_1)
     dagrun_1_2 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_2)
 
-    assert dagrun_0_1.state == State.RUNNING
-    assert dagrun_0_2.state == State.RUNNING
-    assert dagrun_1_1.state == State.RUNNING
+    assert dagrun_0_1.state == State.QUEUED
+    assert dagrun_0_2.state == State.QUEUED
+    assert dagrun_1_1.state == State.QUEUED
     assert dagrun_1_2.state == State.SUCCESS
 
 
diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py
index 58ad010..e38c184 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -35,7 +35,7 @@ from freezegun import freeze_time
 
 from airflow.configuration import conf
 from airflow.jobs.local_task_job import LocalTaskJob as LJ
-from airflow.jobs.scheduler_job import DagFileProcessorProcess
+from airflow.jobs.scheduler_job import DagFileProcessorProcess, SchedulerJob
 from airflow.models import DagBag, DagModel, TaskInstance as TI
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import SimpleTaskInstance
@@ -508,8 +508,8 @@ class TestDagFileProcessorManager(unittest.TestCase):
             child_pipe.close()
             parent_pipe.close()
 
-    @mock.patch("airflow.jobs.scheduler_job.DagFileProcessorProcess.pid", new_callable=PropertyMock)
-    @mock.patch("airflow.jobs.scheduler_job.DagFileProcessorProcess.kill")
+    @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
+    @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.kill")
     def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
         mock_pid.return_value = 1234
         manager = DagFileProcessorManager(
@@ -529,8 +529,8 @@ class TestDagFileProcessorManager(unittest.TestCase):
         manager._kill_timed_out_processors()
         mock_kill.assert_called_once_with()
 
-    @mock.patch("airflow.jobs.scheduler_job.DagFileProcessorProcess.pid", new_callable=PropertyMock)
-    @mock.patch("airflow.jobs.scheduler_job.DagFileProcessorProcess")
+    @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
+    @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess")
     def test_kill_timed_out_processors_no_kill(self, mock_dag_file_processor, mock_pid):
         mock_pid.return_value = 1234
         manager = DagFileProcessorManager(
@@ -560,7 +560,6 @@ class TestDagFileProcessorManager(unittest.TestCase):
         # We need to _actually_ parse the files here to test the behaviour.
         # Right now the parsing code lives in SchedulerJob, even though it's
         # called via utils.dag_processing.
-        from airflow.jobs.scheduler_job import SchedulerJob
 
         dag_id = 'exit_test_dag'
         dag_directory = TEST_DAG_FOLDER.parent / 'dags_with_system_exit'

[airflow] 03/03: Run mini scheduler in LocalTaskJob during task exit (#16289)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 205d6d4debe111d053b8674ff2ea991b95203be6
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Jun 10 14:29:30 2021 +0100

    Run mini scheduler in LocalTaskJob during task exit (#16289)
    
    Currently, the chances of tasks being killed by the LocalTaskJob heartbeat is high.
    
    This is because, after marking a task successful/failed in Taskinstance.py and mini scheduler is enabled,
    we start running the mini scheduler. Whenever the mini scheduling takes time and meet the next job heartbeat,
    the heartbeat detects that this task has succeeded with no return code because LocalTaskJob.handle_task_exit
    was not called after the task succeeded. Hence, the heartbeat thinks that this task was externally marked failed/successful.
    
    This change resolves this by moving the mini scheduler to LocalTaskJob at the handle_task_exit method ensuring
    that the task will no longer be killed by the next heartbeat
    
    (cherry picked from commit 408bd26c22913af93d05aa70abc3c66c52cd4588)
---
 airflow/jobs/local_task_job.py          |  68 +++++++++++++----
 airflow/models/taskinstance.py          |  60 +--------------
 tests/cli/commands/test_task_command.py |   4 +-
 tests/jobs/test_local_task_job.py       | 130 ++++++++++++++++++++++++++++++--
 tests/models/test_taskinstance.py       | 103 -------------------------
 5 files changed, 180 insertions(+), 185 deletions(-)

diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index 3afc801..e7f61f1 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -16,21 +16,24 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
 import signal
 from typing import Optional
 
 import psutil
+from sqlalchemy.exc import OperationalError
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.jobs.base_job import BaseJob
+from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
+from airflow.sentry import Sentry
 from airflow.stats import Stats
 from airflow.task.task_runner import get_task_runner
 from airflow.utils import timezone
 from airflow.utils.net import get_hostname
 from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import with_row_locks
 from airflow.utils.state import State
 
 
@@ -159,7 +162,8 @@ class LocalTaskJob(BaseJob):
             error = self.task_runner.deserialize_run_error()
         self.task_instance._run_finished_callback(error=error)
         if not self.task_instance.test_mode:
-            self._update_dagrun_state_for_paused_dag()
+            if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True):
+                self._run_mini_scheduler_on_child_tasks()
 
     def on_kill(self):
         self.task_runner.terminate()
@@ -215,14 +219,52 @@ class LocalTaskJob(BaseJob):
             self.terminating = True
 
     @provide_session
-    def _update_dagrun_state_for_paused_dag(self, session=None):
-        """
-        Checks for paused dags with DagRuns in the running state and
-        update the DagRun state if possible
-        """
-        dag = self.task_instance.task.dag
-        if dag.get_is_paused():
-            dag_run = self.task_instance.get_dagrun(session=session)
-            if dag_run:
-                dag_run.dag = dag
-                dag_run.update_state(session=session, execute_callbacks=True)
+    @Sentry.enrich_errors
+    def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
+        try:
+            # Re-select the row with a lock
+            dag_run = with_row_locks(
+                session.query(DagRun).filter_by(
+                    dag_id=self.dag_id,
+                    execution_date=self.task_instance.execution_date,
+                ),
+                session=session,
+            ).one()
+
+            # Get a partial dag with just the specific tasks we want to
+            # examine. In order for dep checks to work correctly, we
+            # include ourself (so TriggerRuleDep can check the state of the
+            # task we just executed)
+            task = self.task_instance.task
+
+            partial_dag = task.dag.partial_subset(
+                task.downstream_task_ids,
+                include_downstream=False,
+                include_upstream=False,
+                include_direct_upstream=True,
+            )
+
+            dag_run.dag = partial_dag
+            info = dag_run.task_instance_scheduling_decisions(session)
+
+            skippable_task_ids = {
+                task_id for task_id in partial_dag.task_ids if task_id not in task.downstream_task_ids
+            }
+
+            schedulable_tis = [ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids]
+            for schedulable_ti in schedulable_tis:
+                if not hasattr(schedulable_ti, "task"):
+                    schedulable_ti.task = task.dag.get_task(schedulable_ti.task_id)
+
+            num = dag_run.schedule_tis(schedulable_tis)
+            self.log.info("%d downstream tasks scheduled from follow-on schedule check", num)
+
+            session.commit()
+        except OperationalError as e:
+            # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
+            self.log.info(
+                "Skipping mini scheduling run due to exception: %s",
+                e.statement,
+                exc_info=True,
+            )
+            session.rollback()
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 8d2578f..0e10567 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -35,7 +35,6 @@ import lazy_object_proxy
 import pendulum
 from jinja2 import TemplateAssertionError, UndefinedError
 from sqlalchemy import Column, Float, Index, Integer, PickleType, String, and_, func, or_
-from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import reconstructor, relationship
 from sqlalchemy.orm.session import Session
 from sqlalchemy.sql.elements import BooleanClauseList
@@ -70,7 +69,7 @@ from airflow.utils.net import get_hostname
 from airflow.utils.operator_helpers import context_to_airflow_vars
 from airflow.utils.platform import getuser
 from airflow.utils.session import provide_session
-from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks
+from airflow.utils.sqlalchemy import UtcDateTime
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
 
@@ -1225,62 +1224,6 @@ class TaskInstance(Base, LoggingMixin):
 
         session.commit()
 
-        if not test_mode:
-            self._run_mini_scheduler_on_child_tasks(session)
-
-    @provide_session
-    @Sentry.enrich_errors
-    def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
-        if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True):
-            from airflow.models.dagrun import DagRun  # Avoid circular import
-
-            try:
-                # Re-select the row with a lock
-                dag_run = with_row_locks(
-                    session.query(DagRun).filter_by(
-                        dag_id=self.dag_id,
-                        execution_date=self.execution_date,
-                    ),
-                    session=session,
-                ).one()
-
-                # Get a partial dag with just the specific tasks we want to
-                # examine. In order for dep checks to work correctly, we
-                # include ourself (so TriggerRuleDep can check the state of the
-                # task we just executed)
-                partial_dag = self.task.dag.partial_subset(
-                    self.task.downstream_task_ids,
-                    include_downstream=False,
-                    include_upstream=False,
-                    include_direct_upstream=True,
-                )
-
-                dag_run.dag = partial_dag
-                info = dag_run.task_instance_scheduling_decisions(session)
-
-                skippable_task_ids = {
-                    task_id
-                    for task_id in partial_dag.task_ids
-                    if task_id not in self.task.downstream_task_ids
-                }
-
-                schedulable_tis = [ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids]
-                for schedulable_ti in schedulable_tis:
-                    if not hasattr(schedulable_ti, "task"):
-                        schedulable_ti.task = self.task.dag.get_task(schedulable_ti.task_id)
-
-                num = dag_run.schedule_tis(schedulable_tis)
-                self.log.info("%d downstream tasks scheduled from follow-on schedule check", num)
-
-                session.commit()
-            except OperationalError as e:
-                # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
-                self.log.info(
-                    f"Skipping mini scheduling run due to exception: {e.statement}",
-                    exc_info=True,
-                )
-                session.rollback()
-
     def _prepare_and_execute_task_with_callbacks(self, context, task):
         """Prepare Task for Execution"""
         from airflow.models.renderedtifields import RenderedTaskInstanceFields
@@ -1442,6 +1385,7 @@ class TaskInstance(Base, LoggingMixin):
             session=session,
         )
         if not res:
+            self.log.info("CHECK AND CHANGE")
             return
 
         try:
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index f50ddbc..2b93e6d 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -71,8 +71,7 @@ class TestCliTasks(unittest.TestCase):
         args = self.parser.parse_args(['tasks', 'list', 'example_bash_operator', '--tree'])
         task_command.task_list(args)
 
-    @mock.patch("airflow.models.taskinstance.TaskInstance._run_mini_scheduler_on_child_tasks")
-    def test_test(self, mock_run_mini_scheduler):
+    def test_test(self):
         """Test the `airflow test` command"""
         args = self.parser.parse_args(
             ["tasks", "test", "example_python_operator", 'print_the_context', '2018-01-01']
@@ -81,7 +80,6 @@ class TestCliTasks(unittest.TestCase):
         with redirect_stdout(io.StringIO()) as stdout:
             task_command.task_test(args)
 
-        mock_run_mini_scheduler.assert_not_called()
         # Check that prints, and log messages, are shown
         assert "'example_python_operator__print_the_context__20180101'" in stdout.getvalue()
 
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index ed43198..b985cf6 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -33,6 +33,7 @@ from airflow import settings
 from airflow.exceptions import AirflowException, AirflowFailException
 from airflow.executors.sequential_executor import SequentialExecutor
 from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.scheduler_job import SchedulerJob
 from airflow.models.dag import DAG, DagModel
 from airflow.models.dagbag import DagBag
 from airflow.models.taskinstance import TaskInstance
@@ -45,8 +46,9 @@ from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
 from airflow.utils.types import DagRunType
+from tests.test_utils import db
 from tests.test_utils.asserts import assert_queries_count
-from tests.test_utils.db import clear_db_jobs, clear_db_runs
+from tests.test_utils.config import conf_vars
 from tests.test_utils.mock_executor import MockExecutor
 
 # pylint: skip-file
@@ -57,15 +59,25 @@ TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 class TestLocalTaskJob(unittest.TestCase):
     def setUp(self):
-        clear_db_jobs()
-        clear_db_runs()
+        db.clear_db_dags()
+        db.clear_db_jobs()
+        db.clear_db_runs()
+        db.clear_db_task_fail()
         patcher = patch('airflow.jobs.base_job.sleep')
         self.addCleanup(patcher.stop)
         self.mock_base_job_sleep = patcher.start()
 
     def tearDown(self) -> None:
-        clear_db_jobs()
-        clear_db_runs()
+        db.clear_db_dags()
+        db.clear_db_jobs()
+        db.clear_db_runs()
+        db.clear_db_task_fail()
+
+    def validate_ti_states(self, dag_run, ti_state_mapping, error_message):
+        for task_id, expected_state in ti_state_mapping.items():
+            task_instance = dag_run.get_task_instance(task_id=task_id)
+            task_instance.refresh_from_db()
+            assert task_instance.state == expected_state, error_message
 
     def test_localtaskjob_essential_attr(self):
         """
@@ -660,8 +672,8 @@ class TestLocalTaskJob(unittest.TestCase):
             if ti.state == State.RUNNING and ti.pid is not None:
                 break
             time.sleep(0.2)
-        assert ti.state == State.RUNNING
         assert ti.pid is not None
+        assert ti.state == State.RUNNING
         os.kill(ti.pid, signal_type)
         process.join(timeout=10)
         assert failure_callback_called.value == 1
@@ -705,12 +717,114 @@ class TestLocalTaskJob(unittest.TestCase):
         session.refresh(dr)
         assert dr.state == State.SUCCESS
 
+    @parameterized.expand(
+        [
+            (
+                {('scheduler', 'schedule_after_task_execution'): 'True'},
+                {'A': 'B', 'B': 'C'},
+                {'A': State.QUEUED, 'B': State.NONE, 'C': State.NONE},
+                {'A': State.SUCCESS, 'B': State.SCHEDULED, 'C': State.NONE},
+                {'A': State.SUCCESS, 'B': State.SUCCESS, 'C': State.SCHEDULED},
+                "A -> B -> C, with fast-follow ON when A runs, B should be QUEUED. Same for B and C.",
+            ),
+            (
+                {('scheduler', 'schedule_after_task_execution'): 'False'},
+                {'A': 'B', 'B': 'C'},
+                {'A': State.QUEUED, 'B': State.NONE, 'C': State.NONE},
+                {'A': State.SUCCESS, 'B': State.NONE, 'C': State.NONE},
+                None,
+                "A -> B -> C, with fast-follow OFF, when A runs, B shouldn't be QUEUED.",
+            ),
+            (
+                {('scheduler', 'schedule_after_task_execution'): 'True'},
+                {'A': 'B', 'C': 'B', 'D': 'C'},
+                {'A': State.QUEUED, 'B': State.NONE, 'C': State.NONE, 'D': State.NONE},
+                {'A': State.SUCCESS, 'B': State.NONE, 'C': State.NONE, 'D': State.NONE},
+                None,
+                "D -> C -> B & A -> B, when A runs but C isn't QUEUED yet, B shouldn't be QUEUED.",
+            ),
+            (
+                {('scheduler', 'schedule_after_task_execution'): 'True'},
+                {'A': 'C', 'B': 'C'},
+                {'A': State.QUEUED, 'B': State.FAILED, 'C': State.NONE},
+                {'A': State.SUCCESS, 'B': State.FAILED, 'C': State.UPSTREAM_FAILED},
+                None,
+                "A -> C & B -> C, when A is QUEUED but B has FAILED, C is marked UPSTREAM_FAILED.",
+            ),
+        ]
+    )
+    def test_fast_follow(
+        self, conf, dependencies, init_state, first_run_state, second_run_state, error_message
+    ):
+        # pylint: disable=too-many-locals
+        with conf_vars(conf):
+            session = settings.Session()
+
+            dag = DAG('test_dagrun_fast_follow', start_date=DEFAULT_DATE)
+
+            dag_model = DagModel(
+                dag_id=dag.dag_id,
+                next_dagrun=dag.start_date,
+                is_active=True,
+            )
+            session.add(dag_model)
+            session.flush()
+
+            python_callable = lambda: True
+            with dag:
+                task_a = PythonOperator(task_id='A', python_callable=python_callable)
+                task_b = PythonOperator(task_id='B', python_callable=python_callable)
+                task_c = PythonOperator(task_id='C', python_callable=python_callable)
+                if 'D' in init_state:
+                    task_d = PythonOperator(task_id='D', python_callable=python_callable)
+                for upstream, downstream in dependencies.items():
+                    dag.set_dependency(upstream, downstream)
+
+            scheduler_job = SchedulerJob(subdir=os.devnull)
+            scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+            dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow', state=State.RUNNING)
+
+            task_instance_a = TaskInstance(task_a, dag_run.execution_date, init_state['A'])
+
+            task_instance_b = TaskInstance(task_b, dag_run.execution_date, init_state['B'])
+
+            task_instance_c = TaskInstance(task_c, dag_run.execution_date, init_state['C'])
+
+            if 'D' in init_state:
+                task_instance_d = TaskInstance(task_d, dag_run.execution_date, init_state['D'])
+                session.merge(task_instance_d)
+
+            session.merge(task_instance_a)
+            session.merge(task_instance_b)
+            session.merge(task_instance_c)
+            session.flush()
+
+            job1 = LocalTaskJob(
+                task_instance=task_instance_a, ignore_ti_state=True, executor=SequentialExecutor()
+            )
+            job1.task_runner = StandardTaskRunner(job1)
+
+            job2 = LocalTaskJob(
+                task_instance=task_instance_b, ignore_ti_state=True, executor=SequentialExecutor()
+            )
+            job2.task_runner = StandardTaskRunner(job2)
+
+            settings.engine.dispose()
+            job1.run()
+            self.validate_ti_states(dag_run, first_run_state, error_message)
+            if second_run_state:
+                job2.run()
+                self.validate_ti_states(dag_run, second_run_state, error_message)
+            if scheduler_job.processor_agent:
+                scheduler_job.processor_agent.end()
+
 
 @pytest.fixture()
 def clean_db_helper():
     yield
-    clear_db_jobs()
-    clear_db_runs()
+    db.clear_db_jobs()
+    db.clear_db_runs()
 
 
 @pytest.mark.usefixtures("clean_db_helper")
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 021809b..c1882e1 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -34,10 +34,8 @@ from sqlalchemy.orm.session import Session
 
 from airflow import models, settings
 from airflow.exceptions import AirflowException, AirflowFailException, AirflowSkipException
-from airflow.jobs.scheduler_job import SchedulerJob
 from airflow.models import (
     DAG,
-    DagModel,
     DagRun,
     Pool,
     RenderedTaskInstanceFields,
@@ -1963,107 +1961,6 @@ class TestTaskInstance(unittest.TestCase):
         with create_session() as session:
             session.query(RenderedTaskInstanceFields).delete()
 
-    def validate_ti_states(self, dag_run, ti_state_mapping, error_message):
-        for task_id, expected_state in ti_state_mapping.items():
-            task_instance = dag_run.get_task_instance(task_id=task_id)
-            assert task_instance.state == expected_state, error_message
-
-    @parameterized.expand(
-        [
-            (
-                {('scheduler', 'schedule_after_task_execution'): 'True'},
-                {'A': 'B', 'B': 'C'},
-                {'A': State.QUEUED, 'B': State.NONE, 'C': State.NONE},
-                {'A': State.SUCCESS, 'B': State.SCHEDULED, 'C': State.NONE},
-                {'A': State.SUCCESS, 'B': State.SUCCESS, 'C': State.SCHEDULED},
-                "A -> B -> C, with fast-follow ON when A runs, B should be QUEUED. Same for B and C.",
-            ),
-            (
-                {('scheduler', 'schedule_after_task_execution'): 'False'},
-                {'A': 'B', 'B': 'C'},
-                {'A': State.QUEUED, 'B': State.NONE, 'C': State.NONE},
-                {'A': State.SUCCESS, 'B': State.NONE, 'C': State.NONE},
-                None,
-                "A -> B -> C, with fast-follow OFF, when A runs, B shouldn't be QUEUED.",
-            ),
-            (
-                {('scheduler', 'schedule_after_task_execution'): 'True'},
-                {'A': 'B', 'C': 'B', 'D': 'C'},
-                {'A': State.QUEUED, 'B': State.NONE, 'C': State.NONE, 'D': State.NONE},
-                {'A': State.SUCCESS, 'B': State.NONE, 'C': State.NONE, 'D': State.NONE},
-                None,
-                "D -> C -> B & A -> B, when A runs but C isn't QUEUED yet, B shouldn't be QUEUED.",
-            ),
-            (
-                {('scheduler', 'schedule_after_task_execution'): 'True'},
-                {'A': 'C', 'B': 'C'},
-                {'A': State.QUEUED, 'B': State.FAILED, 'C': State.NONE},
-                {'A': State.SUCCESS, 'B': State.FAILED, 'C': State.UPSTREAM_FAILED},
-                None,
-                "A -> C & B -> C, when A is QUEUED but B has FAILED, C is marked UPSTREAM_FAILED.",
-            ),
-        ]
-    )
-    def test_fast_follow(
-        self, conf, dependencies, init_state, first_run_state, second_run_state, error_message
-    ):
-        with conf_vars(conf):
-            session = settings.Session()
-
-            dag = DAG('test_dagrun_fast_follow', start_date=DEFAULT_DATE)
-
-            dag_model = DagModel(
-                dag_id=dag.dag_id,
-                next_dagrun=dag.start_date,
-                is_active=True,
-            )
-            session.add(dag_model)
-            session.flush()
-
-            python_callable = lambda: True
-            with dag:
-                task_a = PythonOperator(task_id='A', python_callable=python_callable)
-                task_b = PythonOperator(task_id='B', python_callable=python_callable)
-                task_c = PythonOperator(task_id='C', python_callable=python_callable)
-                if 'D' in init_state:
-                    task_d = PythonOperator(task_id='D', python_callable=python_callable)
-                for upstream, downstream in dependencies.items():
-                    dag.set_dependency(upstream, downstream)
-
-            scheduler_job = SchedulerJob(subdir=os.devnull)
-            scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
-
-            dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow', state=State.RUNNING)
-
-            task_instance_a = dag_run.get_task_instance(task_id=task_a.task_id)
-            task_instance_a.task = task_a
-            task_instance_a.set_state(init_state['A'])
-
-            task_instance_b = dag_run.get_task_instance(task_id=task_b.task_id)
-            task_instance_b.task = task_b
-            task_instance_b.set_state(init_state['B'])
-
-            task_instance_c = dag_run.get_task_instance(task_id=task_c.task_id)
-            task_instance_c.task = task_c
-            task_instance_c.set_state(init_state['C'])
-
-            if 'D' in init_state:
-                task_instance_d = dag_run.get_task_instance(task_id=task_d.task_id)
-                task_instance_d.task = task_d
-                task_instance_d.state = init_state['D']
-
-            session.commit()
-            task_instance_a.run()
-
-            self.validate_ti_states(dag_run, first_run_state, error_message)
-
-            if second_run_state:
-                scheduler_job._critical_section_execute_task_instances(session=session)
-                task_instance_b.run()
-                self.validate_ti_states(dag_run, second_run_state, error_message)
-            if scheduler_job.processor_agent:
-                scheduler_job.processor_agent.end()
-
     def test_set_state_up_for_retry(self):
         dag = DAG('dag', start_date=DEFAULT_DATE)
         op1 = DummyOperator(task_id='op_1', owner='test', dag=dag)

[airflow] 02/03: Fix race condition with dagrun callbacks (#16741)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 38d4bab47fe268dd0f9ae3760e87ec143b56184e
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Thu Jul 29 11:05:02 2021 -0600

    Fix race condition with dagrun callbacks (#16741)
    
    Instead of immediately sending callbacks to be processed, wait until
    after we commit so the dagrun.end_date is guaranteed to be there when
    the callback runs.
    
    (cherry picked from commit fb3031acf51f95384154143553aac1a40e568ebf)
---
 airflow/jobs/scheduler_job.py          | 18 +++++---
 tests/dag_processing/test_processor.py | 20 +++++----
 tests/jobs/test_scheduler_job.py       | 80 +++++++++++++++++++++++++++++-----
 3 files changed, 94 insertions(+), 24 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index b7506b5..198635c 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1495,6 +1495,7 @@ class SchedulerJob(BaseJob):
             # Bulk fetch the currently active dag runs for the dags we are
             # examining, rather than making one query per DagRun
 
+            callback_tuples = []
             for dag_run in dag_runs:
                 # Use try_except to not stop the Scheduler when a Serialized DAG is not found
                 # This takes care of Dynamic DAGs especially
@@ -1503,13 +1504,18 @@ class SchedulerJob(BaseJob):
                 # But this would take care of the scenario when the Scheduler is restarted after DagRun is
                 # created and the DAG is deleted / renamed
                 try:
-                    self._schedule_dag_run(dag_run, session)
+                    callback_to_run = self._schedule_dag_run(dag_run, session)
+                    callback_tuples.append((dag_run, callback_to_run))
                 except SerializedDagNotFound:
                     self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
                     continue
 
             guard.commit()
 
+            # Send the callbacks after we commit to ensure the context is up to date when it gets run
+            for dag_run, callback_to_run in callback_tuples:
+                self._send_dag_callbacks_to_processor(dag_run, callback_to_run)
+
             # Without this, the session has an invalid view of the DB
             session.expunge_all()
             # END: schedule TIs
@@ -1661,12 +1667,12 @@ class SchedulerJob(BaseJob):
         self,
         dag_run: DagRun,
         session: Session,
-    ) -> int:
+    ) -> Optional[DagCallbackRequest]:
         """
         Make scheduling decisions about an individual dag run
 
         :param dag_run: The DagRun to schedule
-        :return: Number of tasks scheduled
+        :return: Callback that needs to be executed
         """
         dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
 
@@ -1713,13 +1719,13 @@ class SchedulerJob(BaseJob):
         # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
         schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
 
-        self._send_dag_callbacks_to_processor(dag_run, callback_to_run)
-
         # This will do one query per dag run. We "could" build up a complex
         # query to update all the TIs across all the execution dates and dag
         # IDs in a single query, but it turns out that can be _very very slow_
         # see #11147/commit ee90807ac for more details
-        return dag_run.schedule_tis(schedulable_tis, session)
+        dag_run.schedule_tis(schedulable_tis, session)
+
+        return callback_to_run
 
     @provide_session
     def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session=None):
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
index 9425bbb..243afc7 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -115,6 +115,10 @@ class TestDagFileProcessor(unittest.TestCase):
         non_serialized_dagbag.sync_to_db()
         cls.dagbag = DagBag(read_dags_from_db=True)
 
+    @staticmethod
+    def assert_scheduled_ti_count(session, count):
+        assert count == session.query(TaskInstance).filter_by(state=State.SCHEDULED).count()
+
     def test_dag_file_processor_sla_miss_callback(self):
         """
         Test that the dag file processor calls the sla miss callback
@@ -387,8 +391,8 @@ class TestDagFileProcessor(unittest.TestCase):
             ti.start_date = start_date
             ti.end_date = end_date
 
-            count = self.scheduler_job._schedule_dag_run(dr, session)
-            assert count == 1
+            self.scheduler_job._schedule_dag_run(dr, session)
+            self.assert_scheduled_ti_count(session, 1)
 
             session.refresh(ti)
             assert ti.state == State.SCHEDULED
@@ -444,8 +448,8 @@ class TestDagFileProcessor(unittest.TestCase):
             ti.start_date = start_date
             ti.end_date = end_date
 
-            count = self.scheduler_job._schedule_dag_run(dr, session)
-            assert count == 1
+            self.scheduler_job._schedule_dag_run(dr, session)
+            self.assert_scheduled_ti_count(session, 1)
 
             session.refresh(ti)
             assert ti.state == State.SCHEDULED
@@ -504,8 +508,8 @@ class TestDagFileProcessor(unittest.TestCase):
                 ti.start_date = start_date
                 ti.end_date = end_date
 
-            count = self.scheduler_job._schedule_dag_run(dr, session)
-            assert count == 2
+            self.scheduler_job._schedule_dag_run(dr, session)
+            self.assert_scheduled_ti_count(session, 2)
 
             session.refresh(tis[0])
             session.refresh(tis[1])
@@ -547,9 +551,9 @@ class TestDagFileProcessor(unittest.TestCase):
         BashOperator(task_id='dummy2', dag=dag, owner='airflow', bash_command='echo test')
         SerializedDagModel.write_dag(dag=dag)
 
-        scheduled_tis = self.scheduler_job._schedule_dag_run(dr, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
+        self.assert_scheduled_ti_count(session, 2)
         session.flush()
-        assert scheduled_tis == 2
 
         drs = DagRun.find(dag_id=dag.dag_id, session=session)
         assert len(drs) == 1
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 33c82d7..3a76233 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2394,10 +2394,11 @@ class TestSchedulerJob(unittest.TestCase):
         ti = dr.get_task_instance('dummy')
         ti.set_state(state, session)
 
-        self.scheduler_job._schedule_dag_run(dr, session)
+        with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
+            self.scheduler_job._do_scheduling(session)
 
         expected_callback = DagCallbackRequest(
-            full_filepath=dr.dag.fileloc,
+            full_filepath=dag.fileloc,
             dag_id=dr.dag_id,
             is_failure_callback=bool(state == State.FAILED),
             execution_date=dr.execution_date,
@@ -2413,6 +2414,64 @@ class TestSchedulerJob(unittest.TestCase):
         session.rollback()
         session.close()
 
+    def test_dagrun_callbacks_commited_before_sent(self):
+        """
+        Tests that before any callbacks are sent to the processor, the session is committed. This ensures
+        that the dagrun details are up to date when the callbacks are run.
+        """
+        dag = DAG(dag_id='test_dagrun_callbacks_commited_before_sent', start_date=DEFAULT_DATE)
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.Mock()
+        self.scheduler_job._send_dag_callbacks_to_processor = mock.Mock()
+        self.scheduler_job._schedule_dag_run = mock.Mock()
+
+        # Sync DAG into DB
+        with mock.patch.object(settings, "STORE_DAG_CODE", False):
+            self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+            self.scheduler_job.dagbag.sync_to_db()
+
+        session = settings.Session()
+        orm_dag = session.query(DagModel).get(dag.dag_id)
+        assert orm_dag is not None
+
+        # Create DagRun
+        self.scheduler_job._create_dag_runs([orm_dag], session)
+
+        drs = DagRun.find(dag_id=dag.dag_id, session=session)
+        assert len(drs) == 1
+        dr = drs[0]
+
+        ti = dr.get_task_instance('dummy')
+        ti.set_state(State.SUCCESS, session)
+
+        with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), mock.patch(
+            "airflow.jobs.scheduler_job.prohibit_commit"
+        ) as mock_gaurd:
+            mock_gaurd.return_value.__enter__.return_value.commit.side_effect = session.commit
+
+            def mock_schedule_dag_run(*args, **kwargs):
+                mock_gaurd.reset_mock()
+                return None
+
+            def mock_send_dag_callbacks_to_processor(*args, **kwargs):
+                mock_gaurd.return_value.__enter__.return_value.commit.assert_called_once()
+
+            self.scheduler_job._send_dag_callbacks_to_processor.side_effect = (
+                mock_send_dag_callbacks_to_processor
+            )
+            self.scheduler_job._schedule_dag_run.side_effect = mock_schedule_dag_run
+
+            self.scheduler_job._do_scheduling(session)
+
+        # Verify dag failure callback request is sent to file processor
+        self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once()
+        # and mock_send_dag_callbacks_to_processor has asserted the callback was sent after a commit
+
+        session.rollback()
+        session.close()
+
     @parameterized.expand([(State.SUCCESS,), (State.FAILED,)])
     def test_dagrun_callbacks_are_not_added_when_callbacks_are_not_defined(self, state):
         """
@@ -2449,10 +2508,15 @@ class TestSchedulerJob(unittest.TestCase):
         ti = dr.get_task_instance('test_task')
         ti.set_state(state, session)
 
-        self.scheduler_job._schedule_dag_run(dr, session)
+        with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
+            self.scheduler_job._do_scheduling(session)
 
         # Verify Callback is not set (i.e is None) when no callbacks are set on DAG
-        self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once_with(dr, None)
+        self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once()
+        call_args = self.scheduler_job._send_dag_callbacks_to_processor.call_args[0]
+        assert call_args[0].dag_id == dr.dag_id
+        assert call_args[0].execution_date == dr.execution_date
+        assert call_args[1] is None
 
         session.rollback()
         session.close()
@@ -3095,12 +3159,10 @@ class TestSchedulerJob(unittest.TestCase):
 
         # Verify that DagRun.verify_integrity is not called
         with mock.patch('airflow.jobs.scheduler_job.DagRun.verify_integrity') as mock_verify_integrity:
-            scheduled_tis = self.scheduler_job._schedule_dag_run(dr, session)
+            self.scheduler_job._schedule_dag_run(dr, session)
             mock_verify_integrity.assert_not_called()
         session.flush()
 
-        assert scheduled_tis == 1
-
         tis_count = (
             session.query(func.count(TaskInstance.task_id))
             .filter(
@@ -3158,11 +3220,9 @@ class TestSchedulerJob(unittest.TestCase):
         dag_version_2 = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session)
         assert dag_version_2 != dag_version_1
 
-        scheduled_tis = self.scheduler_job._schedule_dag_run(dr, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
         session.flush()
 
-        assert scheduled_tis == 2
-
         drs = DagRun.find(dag_id=dag.dag_id, session=session)
         assert len(drs) == 1
         dr = drs[0]