You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/08/13 19:49:13 UTC

[airflow] 04/08: Add 'queued' state to DagRun (#16401)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2bab3d462018efb3186f94440d7bfde3e6747901
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Jul 6 15:03:27 2021 +0100

    Add 'queued' state to DagRun (#16401)
    
    This change adds queued state to DagRun. Newly created DagRuns
    start in the queued state, are then moved to the running state after
    satisfying the DAG's max_active_runs. If the Dag doesn't have
    max_active_runs, the DagRuns are moved to running state immediately
    
    Clearing a DagRun sets the state to queued state
    
    Closes: #9975, #16366
    (cherry picked from commit 6611ffd399dce0474d8329720de7e83f568df598)
---
 airflow/api_connexion/openapi/v1.yaml              |   4 +-
 airflow/jobs/scheduler_job.py                      | 171 +++++++----------
 ...93827b8_add_queued_at_column_to_dagrun_table.py |  49 +++++
 airflow/models/dag.py                              |   4 +-
 airflow/models/dagrun.py                           |  17 +-
 airflow/models/taskinstance.py                     |   6 +-
 airflow/www/static/js/tree.js                      |   4 +-
 airflow/www/views.py                               |   5 +-
 docs/apache-airflow/migrations-ref.rst             |   2 +-
 tests/api/common/experimental/test_mark_tasks.py   |   4 +-
 .../endpoints/test_dag_run_endpoint.py             |  14 +-
 tests/api_connexion/schemas/test_dag_run_schema.py |   3 +
 tests/dag_processing/test_manager.py               |  10 +-
 tests/dag_processing/test_processor.py             |  65 ++++---
 tests/jobs/test_scheduler_job.py                   | 206 +++++++++------------
 tests/models/test_cleartasks.py                    |  37 ++++
 tests/models/test_dagrun.py                        |  25 ++-
 tests/sensors/test_external_task_sensor.py         |   8 +-
 18 files changed, 338 insertions(+), 296 deletions(-)

diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index 182f356..47fa3dc 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -1853,6 +1853,7 @@ components:
           description: |
             The start time. The time when DAG run was actually created.
           readOnly: true
+          nullable: true
         end_date:
           type: string
           format: date-time
@@ -3025,8 +3026,9 @@ components:
       description: DAG State.
       type: string
       enum:
-        - success
+        - queued
         - running
+        - success
         - failed
 
     TriggerRule:
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 5b24e00..b564717 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -26,7 +26,7 @@ import sys
 import time
 from collections import defaultdict
 from datetime import timedelta
-from typing import DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
+from typing import DefaultDict, Dict, Iterable, List, Optional, Tuple
 
 from sqlalchemy import and_, func, not_, or_, tuple_
 from sqlalchemy.exc import OperationalError
@@ -197,7 +197,7 @@ class SchedulerJob(BaseJob):
         """
         For all DAG IDs in the DagBag, look for task instances in the
         old_states and set them to new_state if the corresponding DagRun
-        does not exist or exists but is not in the running state. This
+        does not exist or exists but is not in the running or queued state. This
         normally should not happen, but it can if the state of DagRuns are
         changed manually.
 
@@ -214,7 +214,7 @@ class SchedulerJob(BaseJob):
             .filter(models.TaskInstance.state.in_(old_states))
             .filter(
                 or_(
-                    models.DagRun.state != State.RUNNING,
+                    models.DagRun.state.notin_([State.RUNNING, State.QUEUED]),
                     models.DagRun.state.is_(None),
                 )
             )
@@ -882,39 +882,12 @@ class SchedulerJob(BaseJob):
             if settings.USE_JOB_SCHEDULE:
                 self._create_dagruns_for_dags(guard, session)
 
-            dag_runs = self._get_next_dagruns_to_examine(session)
+            self._start_queued_dagruns(session)
+            guard.commit()
+            dag_runs = self._get_next_dagruns_to_examine(State.RUNNING, session)
             # Bulk fetch the currently active dag runs for the dags we are
             # examining, rather than making one query per DagRun
 
-            # TODO: This query is probably horribly inefficient (though there is an
-            # index on (dag_id,state)). It is to deal with the case when a user
-            # clears more than max_active_runs older tasks -- we don't want the
-            # scheduler to suddenly go and start running tasks from all of the
-            # runs. (AIRFLOW-137/GH #1442)
-            #
-            # The longer term fix would be to have `clear` do this, and put DagRuns
-            # in to the queued state, then take DRs out of queued before creating
-            # any new ones
-
-            # Build up a set of execution_dates that are "active" for a given
-            # dag_id -- only tasks from those runs will be scheduled.
-            active_runs_by_dag_id = defaultdict(set)
-
-            query = (
-                session.query(
-                    TI.dag_id,
-                    TI.execution_date,
-                )
-                .filter(
-                    TI.dag_id.in_(list({dag_run.dag_id for dag_run in dag_runs})),
-                    TI.state.notin_(list(State.finished) + [State.REMOVED]),
-                )
-                .group_by(TI.dag_id, TI.execution_date)
-            )
-
-            for dag_id, execution_date in query:
-                active_runs_by_dag_id[dag_id].add(execution_date)
-
             for dag_run in dag_runs:
                 # Use try_except to not stop the Scheduler when a Serialized DAG is not found
                 # This takes care of Dynamic DAGs especially
@@ -923,7 +896,7 @@ class SchedulerJob(BaseJob):
                 # But this would take care of the scenario when the Scheduler is restarted after DagRun is
                 # created and the DAG is deleted / renamed
                 try:
-                    self._schedule_dag_run(dag_run, active_runs_by_dag_id.get(dag_run.dag_id, set()), session)
+                    self._schedule_dag_run(dag_run, session)
                 except SerializedDagNotFound:
                     self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
                     continue
@@ -963,9 +936,9 @@ class SchedulerJob(BaseJob):
             return num_queued_tis
 
     @retry_db_transaction
-    def _get_next_dagruns_to_examine(self, session):
+    def _get_next_dagruns_to_examine(self, state, session):
         """Get Next DagRuns to Examine with retries"""
-        return DagRun.next_dagruns_to_examine(session)
+        return DagRun.next_dagruns_to_examine(state, session)
 
     @retry_db_transaction
     def _create_dagruns_for_dags(self, guard, session):
@@ -986,14 +959,24 @@ class SchedulerJob(BaseJob):
         # as DagModel.dag_id and DagModel.next_dagrun
         # This list is used to verify if the DagRun already exist so that we don't attempt to create
         # duplicate dag runs
-        active_dagruns = (
-            session.query(DagRun.dag_id, DagRun.execution_date)
-            .filter(
-                tuple_(DagRun.dag_id, DagRun.execution_date).in_(
-                    [(dm.dag_id, dm.next_dagrun) for dm in dag_models]
+
+        if session.bind.dialect.name == 'mssql':
+            existing_dagruns_filter = or_(
+                *(
+                    and_(
+                        DagRun.dag_id == dm.dag_id,
+                        DagRun.execution_date == dm.next_dagrun,
+                    )
+                    for dm in dag_models
                 )
             )
-            .all()
+        else:
+            existing_dagruns_filter = tuple_(DagRun.dag_id, DagRun.execution_date).in_(
+                [(dm.dag_id, dm.next_dagrun) for dm in dag_models]
+            )
+
+        existing_dagruns = (
+            session.query(DagRun.dag_id, DagRun.execution_date).filter(existing_dagruns_filter).all()
         )
 
         for dag_model in dag_models:
@@ -1009,89 +992,83 @@ class SchedulerJob(BaseJob):
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to run self._update_dag_next_dagruns if the Dag Run already exists or if we
+            # we need to set dag.next_dagrun_info if the Dag Run already exists or if we
             # create a new one. This is so that in the next Scheduling loop we try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                run = dag.create_dagrun(
+            if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+                dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
-                    start_date=timezone.utcnow(),
-                    state=State.RUNNING,
+                    state=State.QUEUED,
                     external_trigger=False,
                     session=session,
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-
-                expected_start_date = dag.following_schedule(run.execution_date)
-                if expected_start_date:
-                    schedule_delay = run.start_date - expected_start_date
-                    Stats.timing(
-                        f'dagrun.schedule_delay.{dag.dag_id}',
-                        schedule_delay,
-                    )
-
-        self._update_dag_next_dagruns(dag_models, session)
+            dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
+                dag_model.next_dagrun
+            )
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], session: Session) -> None:
-        """
-        Bulk update the next_dagrun and next_dagrun_create_after for all the dags.
+    def _start_queued_dagruns(
+        self,
+        session: Session,
+    ) -> int:
+        """Find DagRuns in queued state and decide moving them to running state"""
+        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
 
-        We batch the select queries to get info about all the dags at once
-        """
-        # Check max_active_runs, to see if we are _now_ at the limit for any of
-        # these dag? (we've just created a DagRun for them after all)
-        active_runs_of_dags = dict(
+        active_runs_of_dags = defaultdict(
+            lambda: 0,
             session.query(DagRun.dag_id, func.count('*'))
-            .filter(
-                DagRun.dag_id.in_([o.dag_id for o in dag_models]),
+            .filter(  # We use `list` here because SQLA doesn't accept a set
+                # We use set to avoid duplicate dag_ids
+                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
                 DagRun.state == State.RUNNING,
-                DagRun.external_trigger.is_(False),
             )
             .group_by(DagRun.dag_id)
-            .all()
+            .all(),
         )
 
-        for dag_model in dag_models:
-            # Get the DAG in a try_except to not stop the Scheduler when a Serialized DAG is not found
-            # This takes care of Dynamic DAGs especially
+        def _update_state(dag_run):
+            dag_run.state = State.RUNNING
+            dag_run.start_date = timezone.utcnow()
+            expected_start_date = dag.following_schedule(dag_run.execution_date)
+            if expected_start_date:
+                schedule_delay = dag_run.start_date - expected_start_date
+                Stats.timing(
+                    f'dagrun.schedule_delay.{dag.dag_id}',
+                    schedule_delay,
+                )
+
+        for dag_run in dag_runs:
             try:
-                dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
+                dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
             except SerializedDagNotFound:
-                self.log.exception("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
+                self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
                 continue
-            active_runs_of_dag = active_runs_of_dags.get(dag.dag_id, 0)
-            if dag.max_active_runs and active_runs_of_dag >= dag.max_active_runs:
-                self.log.info(
-                    "DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
+            active_runs = active_runs_of_dags[dag_run.dag_id]
+            if dag.max_active_runs and active_runs >= dag.max_active_runs:
+                self.log.debug(
+                    "DAG %s already has %d active runs, not moving any more runs to RUNNING state %s",
                     dag.dag_id,
-                    active_runs_of_dag,
-                    dag.max_active_runs,
+                    active_runs,
+                    dag_run.execution_date,
                 )
-                dag_model.next_dagrun_create_after = None
             else:
-                dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
-                    dag_model.next_dagrun
-                )
+                active_runs_of_dags[dag_run.dag_id] += 1
+                _update_state(dag_run)
 
     def _schedule_dag_run(
         self,
         dag_run: DagRun,
-        currently_active_runs: Set[datetime.datetime],
         session: Session,
     ) -> int:
         """
         Make scheduling decisions about an individual dag run
 
-        ``currently_active_runs`` is passed in so that a batch query can be
-        used to ask this for all dag runs in the batch, to avoid an n+1 query.
-
         :param dag_run: The DagRun to schedule
-        :param currently_active_runs: Number of currently active runs of this DAG
         :return: Number of tasks scheduled
         """
         dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
@@ -1118,9 +1095,6 @@ class SchedulerJob(BaseJob):
             session.flush()
             self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
 
-            # Work out if we should allow creating a new DagRun now?
-            self._update_dag_next_dagruns([session.query(DagModel).get(dag_run.dag_id)], session)
-
             callback_to_execute = DagCallbackRequest(
                 full_filepath=dag.fileloc,
                 dag_id=dag.dag_id,
@@ -1138,19 +1112,6 @@ class SchedulerJob(BaseJob):
             self.log.error("Execution date is in future: %s", dag_run.execution_date)
             return 0
 
-        if dag.max_active_runs:
-            if (
-                len(currently_active_runs) >= dag.max_active_runs
-                and dag_run.execution_date not in currently_active_runs
-            ):
-                self.log.info(
-                    "DAG %s already has %d active runs, not queuing any tasks for run %s",
-                    dag.dag_id,
-                    len(currently_active_runs),
-                    dag_run.execution_date,
-                )
-                return 0
-
         self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session)
         # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
         schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
diff --git a/airflow/migrations/versions/97cdd93827b8_add_queued_at_column_to_dagrun_table.py b/airflow/migrations/versions/97cdd93827b8_add_queued_at_column_to_dagrun_table.py
new file mode 100644
index 0000000..db08d8b
--- /dev/null
+++ b/airflow/migrations/versions/97cdd93827b8_add_queued_at_column_to_dagrun_table.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add queued_at column to dagrun table
+
+Revision ID: 83f031fd9f1c
+Revises: 30867afad44a
+Create Date: 2021-06-29 21:53:48.059438
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mssql
+
+# revision identifiers, used by Alembic.
+revision = '97cdd93827b8'
+down_revision = '83f031fd9f1c'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add queued_at column to dagrun table"""
+    conn = op.get_bind()
+    if conn.dialect.name == "mssql":
+        op.add_column('dag_run', sa.Column('queued_at', mssql.DATETIME2(precision=6), nullable=True))
+    else:
+        op.add_column('dag_run', sa.Column('queued_at', sa.DateTime(), nullable=True))
+
+
+def downgrade():
+    """Unapply Add queued_at column to dagrun table"""
+    op.drop_column('dag_run', "queued_at")
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 13d69c2..a3d06db 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1153,7 +1153,7 @@ class DAG(LoggingMixin):
         confirm_prompt=False,
         include_subdags=True,
         include_parentdag=True,
-        dag_run_state: str = State.RUNNING,
+        dag_run_state: str = State.QUEUED,
         dry_run=False,
         session=None,
         get_tis=False,
@@ -1369,7 +1369,7 @@ class DAG(LoggingMixin):
         confirm_prompt=False,
         include_subdags=True,
         include_parentdag=False,
-        dag_run_state=State.RUNNING,
+        dag_run_state=State.QUEUED,
         dry_run=False,
     ):
         all_tis = []
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 07f309d..a061dcc 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -61,12 +61,15 @@ class DagRun(Base, LoggingMixin):
 
     __tablename__ = "dag_run"
 
+    __NO_VALUE = object()
+
     id = Column(Integer, primary_key=True)
     dag_id = Column(String(ID_LEN))
+    queued_at = Column(UtcDateTime)
     execution_date = Column(UtcDateTime, default=timezone.utcnow)
-    start_date = Column(UtcDateTime, default=timezone.utcnow)
+    start_date = Column(UtcDateTime)
     end_date = Column(UtcDateTime)
-    _state = Column('state', String(50), default=State.RUNNING)
+    _state = Column('state', String(50), default=State.QUEUED)
     run_id = Column(String(ID_LEN))
     creating_job_id = Column(Integer)
     external_trigger = Column(Boolean, default=True)
@@ -102,6 +105,7 @@ class DagRun(Base, LoggingMixin):
         self,
         dag_id: Optional[str] = None,
         run_id: Optional[str] = None,
+        queued_at: Optional[datetime] = __NO_VALUE,
         execution_date: Optional[datetime] = None,
         start_date: Optional[datetime] = None,
         external_trigger: Optional[bool] = None,
@@ -118,6 +122,10 @@ class DagRun(Base, LoggingMixin):
         self.external_trigger = external_trigger
         self.conf = conf or {}
         self.state = state
+        if queued_at is self.__NO_VALUE:
+            self.queued_at = timezone.utcnow() if state == State.QUEUED else None
+        else:
+            self.queued_at = queued_at
         self.run_type = run_type
         self.dag_hash = dag_hash
         self.creating_job_id = creating_job_id
@@ -140,6 +148,8 @@ class DagRun(Base, LoggingMixin):
         if self._state != state:
             self._state = state
             self.end_date = timezone.utcnow() if self._state in State.finished else None
+            if state == State.QUEUED:
+                self.queued_at = timezone.utcnow()
 
     @declared_attr
     def state(self):
@@ -160,6 +170,7 @@ class DagRun(Base, LoggingMixin):
     @classmethod
     def next_dagruns_to_examine(
         cls,
+        state: str,
         session: Session,
         max_number: Optional[int] = None,
     ):
@@ -180,7 +191,7 @@ class DagRun(Base, LoggingMixin):
         # TODO: Bake this query, it is run _A lot_
         query = (
             session.query(cls)
-            .filter(cls.state == State.RUNNING, cls.run_type != DagRunType.BACKFILL_JOB)
+            .filter(cls.state == state, cls.run_type != DagRunType.BACKFILL_JOB)
             .join(
                 DagModel,
                 DagModel.dag_id == cls.dag_id,
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 5fb8155..0e10567 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -137,7 +137,7 @@ def clear_task_instances(
     session,
     activate_dag_runs=None,
     dag=None,
-    dag_run_state: Union[str, Literal[False]] = State.RUNNING,
+    dag_run_state: Union[str, Literal[False]] = State.QUEUED,
 ):
     """
     Clears a set of task instances, but makes sure the running ones
@@ -239,7 +239,9 @@ def clear_task_instances(
         )
         for dr in drs:
             dr.state = dag_run_state
-            dr.start_date = timezone.utcnow()
+            dr.start_date = None
+            if dag_run_state == State.QUEUED:
+                dr.last_scheduling_decision = None
 
 
 class TaskInstanceKey(NamedTuple):
diff --git a/airflow/www/static/js/tree.js b/airflow/www/static/js/tree.js
index 4bf366a..d45c880 100644
--- a/airflow/www/static/js/tree.js
+++ b/airflow/www/static/js/tree.js
@@ -58,7 +58,9 @@ document.addEventListener('DOMContentLoaded', () => {
   const tree = d3.layout.tree().nodeSize([0, 25]);
   let nodes = tree.nodes(data);
   const nodeobj = {};
-  const getActiveRuns = () => data.instances.filter((run) => run.state === 'running').length > 0;
+  const runActiveStates = ['queued', 'running'];
+  const getActiveRuns = () => data.instances
+    .filter((run) => runActiveStates.includes(run.state)).length > 0;
 
   const now = Date.now() / 1000;
   const devicePixelRatio = window.devicePixelRatio || 1;
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 150ecc9..830047b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1526,7 +1526,7 @@ class Airflow(AirflowBaseView):
         dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             execution_date=execution_date,
-            state=State.RUNNING,
+            state=State.QUEUED,
             conf=run_conf,
             external_trigger=True,
             dag_hash=current_app.dag_bag.dags_hash.get(dag_id),
@@ -3454,6 +3454,7 @@ class DagRunModelView(AirflowModelView):
         'execution_date',
         'run_id',
         'run_type',
+        'queued_at',
         'start_date',
         'end_date',
         'external_trigger',
@@ -3789,7 +3790,7 @@ class TaskInstanceModelView(AirflowModelView):
         lazy_gettext('Clear'),
         lazy_gettext(
             'Are you sure you want to clear the state of the selected task'
-            ' instance(s) and set their dagruns to the running state?'
+            ' instance(s) and set their dagruns to the QUEUED state?'
         ),
         single=False,
     )
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 0c663da..c119bcc 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``e9304a3141f0`` (head)        | ``83f031fd9f1c`` |                 | Make XCom primary key columns non-nullable                                            |
+| ``97cdd93827b8`` (head)        | ``83f031fd9f1c`` |                 | Add ``queued_at`` column in ``dag_run`` table                                         |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``83f031fd9f1c``               | ``a13f7613ad25`` |                 | Improve MSSQL compatibility                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/tests/api/common/experimental/test_mark_tasks.py b/tests/api/common/experimental/test_mark_tasks.py
index 4dab57e..49008d3 100644
--- a/tests/api/common/experimental/test_mark_tasks.py
+++ b/tests/api/common/experimental/test_mark_tasks.py
@@ -414,7 +414,9 @@ class TestMarkDAGRun(unittest.TestCase):
             assert ti.state == state
 
     def _create_test_dag_run(self, state, date):
-        return self.dag1.create_dagrun(run_type=DagRunType.MANUAL, state=state, execution_date=date)
+        return self.dag1.create_dagrun(
+            run_type=DagRunType.MANUAL, state=state, start_date=date, execution_date=date
+        )
 
     def _verify_dag_run_state(self, dag, date, state):
         drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date)
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index e51eca8..0aa13b2 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -90,7 +90,7 @@ class TestDagRunEndpoint:
 
     def teardown_method(self) -> None:
         clear_db_runs()
-        # clear_db_dags()
+        clear_db_dags()
 
     def _create_dag(self, dag_id):
         dag_instance = DagModel(dag_id=dag_id)
@@ -118,6 +118,7 @@ class TestDagRunEndpoint:
             execution_date=timezone.parse(self.default_time_2),
             start_date=timezone.parse(self.default_time),
             external_trigger=True,
+            state=state,
         )
         dag_runs.append(dagrun_model_2)
         if extra_dag:
@@ -131,6 +132,7 @@ class TestDagRunEndpoint:
                         execution_date=timezone.parse(self.default_time_2),
                         start_date=timezone.parse(self.default_time),
                         external_trigger=True,
+                        state=state,
                     )
                 )
         if commit:
@@ -193,6 +195,7 @@ class TestGetDagRun(TestDagRunEndpoint):
             execution_date=timezone.parse(self.default_time),
             start_date=timezone.parse(self.default_time),
             external_trigger=True,
+            state='running',
         )
         session.add(dagrun_model)
         session.commit()
@@ -532,7 +535,7 @@ class TestGetDagRunsEndDateFilters(TestDagRunEndpoint):
             (
                 f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte="
                 f"{(timezone.utcnow() + timedelta(days=1)).isoformat()}",
-                ["TEST_DAG_RUN_ID_1"],
+                ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
             ),
         ]
     )
@@ -750,6 +753,7 @@ class TestGetDagRunBatchPagination(TestDagRunEndpoint):
             DagRun(
                 dag_id="TEST_DAG_ID",
                 run_id="TEST_DAG_RUN_ID" + str(i),
+                state='running',
                 run_type=DagRunType.MANUAL,
                 execution_date=timezone.parse(self.default_time) + timedelta(minutes=i),
                 start_date=timezone.parse(self.default_time),
@@ -884,7 +888,7 @@ class TestGetDagRunBatchDateFilters(TestDagRunEndpoint):
             ),
             (
                 {"end_date_lte": f"{(timezone.utcnow() + timedelta(days=1)).isoformat()}"},
-                ["TEST_DAG_RUN_ID_1"],
+                ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
             ),
         ]
     )
@@ -927,8 +931,8 @@ class TestPostDagRun(TestDagRunEndpoint):
             "end_date": None,
             "execution_date": response.json["execution_date"],
             "external_trigger": True,
-            "start_date": response.json["start_date"],
-            "state": "running",
+            "start_date": None,
+            "state": "queued",
         } == response.json
 
     @parameterized.expand(
diff --git a/tests/api_connexion/schemas/test_dag_run_schema.py b/tests/api_connexion/schemas/test_dag_run_schema.py
index 3e6bf2e..9e4a9e8 100644
--- a/tests/api_connexion/schemas/test_dag_run_schema.py
+++ b/tests/api_connexion/schemas/test_dag_run_schema.py
@@ -49,6 +49,7 @@ class TestDAGRunSchema(TestDAGRunBase):
     def test_serialize(self, session):
         dagrun_model = DagRun(
             run_id="my-dag-run",
+            state='running',
             run_type=DagRunType.MANUAL.value,
             execution_date=timezone.parse(self.default_time),
             start_date=timezone.parse(self.default_time),
@@ -124,6 +125,7 @@ class TestDagRunCollection(TestDAGRunBase):
     def test_serialize(self, session):
         dagrun_model_1 = DagRun(
             run_id="my-dag-run",
+            state='running',
             execution_date=timezone.parse(self.default_time),
             run_type=DagRunType.MANUAL.value,
             start_date=timezone.parse(self.default_time),
@@ -131,6 +133,7 @@ class TestDagRunCollection(TestDAGRunBase):
         )
         dagrun_model_2 = DagRun(
             run_id="my-dag-run-2",
+            state='running',
             execution_date=timezone.parse(self.default_time),
             start_date=timezone.parse(self.default_time),
             run_type=DagRunType.MANUAL.value,
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index 0ab7f2b..02613ec 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -43,6 +43,7 @@ from airflow.dag_processing.manager import (
 )
 from airflow.dag_processing.processor import DagFileProcessorProcess
 from airflow.jobs.local_task_job import LocalTaskJob as LJ
+from airflow.jobs.scheduler_job import SchedulerJob
 from airflow.models import DagBag, DagModel, TaskInstance as TI
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import SimpleTaskInstance
@@ -508,8 +509,8 @@ class TestDagFileProcessorManager(unittest.TestCase):
             child_pipe.close()
             parent_pipe.close()
 
-    @mock.patch("airflow.jobs.scheduler_job.DagFileProcessorProcess.pid", new_callable=PropertyMock)
-    @mock.patch("airflow.jobs.scheduler_job.DagFileProcessorProcess.kill")
+    @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
+    @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.kill")
     def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
         mock_pid.return_value = 1234
         manager = DagFileProcessorManager(
@@ -529,8 +530,8 @@ class TestDagFileProcessorManager(unittest.TestCase):
         manager._kill_timed_out_processors()
         mock_kill.assert_called_once_with()
 
-    @mock.patch("airflow.jobs.scheduler_job.DagFileProcessorProcess.pid", new_callable=PropertyMock)
-    @mock.patch("airflow.jobs.scheduler_job.DagFileProcessorProcess")
+    @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
+    @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess")
     def test_kill_timed_out_processors_no_kill(self, mock_dag_file_processor, mock_pid):
         mock_pid.return_value = 1234
         manager = DagFileProcessorManager(
@@ -560,7 +561,6 @@ class TestDagFileProcessorManager(unittest.TestCase):
         # We need to _actually_ parse the files here to test the behaviour.
         # Right now the parsing code lives in SchedulerJob, even though it's
         # called via utils.dag_processing.
-        from airflow.jobs.scheduler_job import SchedulerJob
 
         dag_id = 'exit_test_dag'
         dag_directory = TEST_DAG_FOLDER.parent / 'dags_with_system_exit'
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
index 5953517..feb3497 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -277,7 +277,7 @@ class TestDagFileProcessor(unittest.TestCase):
         assert email1 in send_email_to
         assert email2 not in send_email_to
 
-    @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+    @mock.patch('airflow.dag_processing.processor.Stats.incr')
     @mock.patch("airflow.utils.email.send_email")
     def test_dag_file_processor_sla_miss_email_exception(self, mock_send_email, mock_stats_incr):
         """
@@ -387,7 +387,7 @@ class TestDagFileProcessor(unittest.TestCase):
             ti.start_date = start_date
             ti.end_date = end_date
 
-            count = self.scheduler_job._schedule_dag_run(dr, set(), session)
+            count = self.scheduler_job._schedule_dag_run(dr, session)
             assert count == 1
 
             session.refresh(ti)
@@ -444,7 +444,7 @@ class TestDagFileProcessor(unittest.TestCase):
             ti.start_date = start_date
             ti.end_date = end_date
 
-            count = self.scheduler_job._schedule_dag_run(dr, set(), session)
+            count = self.scheduler_job._schedule_dag_run(dr, session)
             assert count == 1
 
             session.refresh(ti)
@@ -504,7 +504,7 @@ class TestDagFileProcessor(unittest.TestCase):
                 ti.start_date = start_date
                 ti.end_date = end_date
 
-            count = self.scheduler_job._schedule_dag_run(dr, set(), session)
+            count = self.scheduler_job._schedule_dag_run(dr, session)
             assert count == 2
 
             session.refresh(tis[0])
@@ -547,7 +547,7 @@ class TestDagFileProcessor(unittest.TestCase):
         BashOperator(task_id='dummy2', dag=dag, owner='airflow', bash_command='echo test')
         SerializedDagModel.write_dag(dag=dag)
 
-        scheduled_tis = self.scheduler_job._schedule_dag_run(dr, set(), session)
+        scheduled_tis = self.scheduler_job._schedule_dag_run(dr, session)
         session.flush()
         assert scheduled_tis == 2
 
@@ -560,11 +560,10 @@ class TestDagFileProcessor(unittest.TestCase):
 
     def test_runs_respected_after_clear(self):
         """
-        Test if _process_task_instances only schedules ti's up to max_active_runs
-        (related to issue AIRFLOW-137)
+        Test dag after dag.clear, max_active_runs is respected
         """
         dag = DAG(dag_id='test_scheduler_max_active_runs_respected_after_clear', start_date=DEFAULT_DATE)
-        dag.max_active_runs = 3
+        dag.max_active_runs = 1
 
         BashOperator(task_id='dummy', dag=dag, owner='airflow', bash_command='echo Hi')
 
@@ -575,48 +574,46 @@ class TestDagFileProcessor(unittest.TestCase):
         session.close()
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag.dag_id)
+
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job.processor_agent = mock.MagicMock()
         self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
-        dag.clear()
 
         date = DEFAULT_DATE
-        dr1 = dag.create_dagrun(
+        dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=date,
-            state=State.RUNNING,
+            state=State.QUEUED,
         )
         date = dag.following_schedule(date)
-        dr2 = dag.create_dagrun(
+        dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=date,
-            state=State.RUNNING,
+            state=State.QUEUED,
         )
         date = dag.following_schedule(date)
-        dr3 = dag.create_dagrun(
+        dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=date,
-            state=State.RUNNING,
+            state=State.QUEUED,
         )
+        dag.clear()
 
-        # First create up to 3 dagruns in RUNNING state.
-        assert dr1 is not None
-        assert dr2 is not None
-        assert dr3 is not None
-        assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)) == 3
-
-        # Reduce max_active_runs to 1
-        dag.max_active_runs = 1
+        assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 3
 
-        # and schedule them in, so we can check how many
-        # tasks are put on the task_instances_list (should be one, not 3)
-        with create_session() as session:
-            num_scheduled = self.scheduler_job._schedule_dag_run(dr1, set(), session)
-            assert num_scheduled == 1
-            num_scheduled = self.scheduler_job._schedule_dag_run(dr2, {dr1.execution_date}, session)
-            assert num_scheduled == 0
-            num_scheduled = self.scheduler_job._schedule_dag_run(dr3, {dr1.execution_date}, session)
-            assert num_scheduled == 0
+        session = settings.Session()
+        self.scheduler_job._start_queued_dagruns(session)
+        session.commit()
+        # Assert that only 1 dagrun is active
+        assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)) == 1
+        # Assert that the other two are queued
+        assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 2
 
     @patch.object(TaskInstance, 'handle_failure_with_callback')
     def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
@@ -698,7 +695,7 @@ class TestDagFileProcessor(unittest.TestCase):
         dr = drs[0]
 
         # Schedule TaskInstances
-        self.scheduler_job_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job_job._schedule_dag_run(dr, session)
         with create_session() as session:
             tis = session.query(TaskInstance).all()
 
@@ -724,7 +721,7 @@ class TestDagFileProcessor(unittest.TestCase):
                 assert end_date is None
                 assert duration is None
 
-        self.scheduler_job_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job_job._schedule_dag_run(dr, session)
         with create_session() as session:
             tis = session.query(TaskInstance).all()
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 9fe8517..0ee6f5f 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -23,7 +23,6 @@ import shutil
 import unittest
 from datetime import timedelta
 from tempfile import mkdtemp
-from time import sleep
 from unittest import mock
 from unittest.mock import MagicMock, patch
 from zipfile import ZipFile
@@ -430,7 +429,6 @@ class TestSchedulerJob(unittest.TestCase):
         session.flush()
 
         res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
-
         assert 2 == len(res)
         res_keys = map(lambda x: x.key, res)
         assert ti_no_dagrun.key in res_keys
@@ -1575,15 +1573,16 @@ class TestSchedulerJob(unittest.TestCase):
 
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job._create_dag_runs([orm_dag], session)
+        self.scheduler_job._start_queued_dagruns(session)
 
         drs = DagRun.find(dag_id=dag.dag_id, session=session)
         assert len(drs) == 1
         dr = drs[0]
 
-        # Should not be able to create a new dag run, as we are at max active runs
-        assert orm_dag.next_dagrun_create_after is None
+        # This should have a value since we control max_active_runs
+        # by DagRun State.
+        assert orm_dag.next_dagrun_create_after
         # But we should record the date of _what run_ it would be
         assert isinstance(orm_dag.next_dagrun, datetime.datetime)
 
@@ -1595,7 +1594,7 @@ class TestSchedulerJob(unittest.TestCase):
         self.scheduler_job.processor_agent = mock.Mock()
         self.scheduler_job.processor_agent.send_callback_to_execute = mock.Mock()
 
-        self.scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
         session.flush()
 
         session.refresh(dr)
@@ -1652,7 +1651,7 @@ class TestSchedulerJob(unittest.TestCase):
         self.scheduler_job.processor_agent = mock.Mock()
         self.scheduler_job.processor_agent.send_callback_to_execute = mock.Mock()
 
-        self.scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
         session.flush()
 
         session.refresh(dr)
@@ -1711,7 +1710,7 @@ class TestSchedulerJob(unittest.TestCase):
         ti = dr.get_task_instance('dummy')
         ti.set_state(state, session)
 
-        self.scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
 
         expected_callback = DagCallbackRequest(
             full_filepath=dr.dag.fileloc,
@@ -1766,7 +1765,7 @@ class TestSchedulerJob(unittest.TestCase):
         ti = dr.get_task_instance('test_task')
         ti.set_state(state, session)
 
-        self.scheduler_job._schedule_dag_run(dr, set(), session)
+        self.scheduler_job._schedule_dag_run(dr, session)
 
         # Verify Callback is not set (i.e is None) when no callbacks are set on DAG
         self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once_with(dr, None)
@@ -2146,13 +2145,13 @@ class TestSchedulerJob(unittest.TestCase):
             execution_date=DEFAULT_DATE,
             state=State.RUNNING,
         )
-        self.scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=dag.following_schedule(dr.execution_date),
             state=State.RUNNING,
         )
-        self.scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
         task_instances_list = self.scheduler_job._executable_task_instances_to_queued(
             max_tis=32, session=session
         )
@@ -2203,7 +2202,7 @@ class TestSchedulerJob(unittest.TestCase):
                 execution_date=date,
                 state=State.RUNNING,
             )
-            self.scheduler_job._schedule_dag_run(dr, {}, session)
+            self.scheduler_job._schedule_dag_run(dr, session)
             date = dag.following_schedule(date)
 
         task_instances_list = self.scheduler_job._executable_task_instances_to_queued(
@@ -2266,7 +2265,7 @@ class TestSchedulerJob(unittest.TestCase):
                 execution_date=date,
                 state=State.RUNNING,
             )
-            scheduler._schedule_dag_run(dr, {}, session)
+            scheduler._schedule_dag_run(dr, session)
             date = dag_d1.following_schedule(date)
 
         date = DEFAULT_DATE
@@ -2276,7 +2275,7 @@ class TestSchedulerJob(unittest.TestCase):
                 execution_date=date,
                 state=State.RUNNING,
             )
-            scheduler._schedule_dag_run(dr, {}, session)
+            scheduler._schedule_dag_run(dr, session)
             date = dag_d2.following_schedule(date)
 
         scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
@@ -2353,7 +2352,7 @@ class TestSchedulerJob(unittest.TestCase):
             execution_date=DEFAULT_DATE,
             state=State.RUNNING,
         )
-        self.scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, session)
 
         task_instances_list = self.scheduler_job._executable_task_instances_to_queued(
             max_tis=32, session=session
@@ -2412,7 +2411,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         # Verify that DagRun.verify_integrity is not called
         with mock.patch('airflow.jobs.scheduler_job.DagRun.verify_integrity') as mock_verify_integrity:
-            scheduled_tis = self.scheduler_job._schedule_dag_run(dr, {}, session)
+            scheduled_tis = self.scheduler_job._schedule_dag_run(dr, session)
             mock_verify_integrity.assert_not_called()
         session.flush()
 
@@ -2475,7 +2474,7 @@ class TestSchedulerJob(unittest.TestCase):
         dag_version_2 = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session)
         assert dag_version_2 != dag_version_1
 
-        scheduled_tis = self.scheduler_job._schedule_dag_run(dr, {}, session)
+        scheduled_tis = self.scheduler_job._schedule_dag_run(dr, session)
         session.flush()
 
         assert scheduled_tis == 2
@@ -3187,14 +3186,13 @@ class TestSchedulerJob(unittest.TestCase):
                 full_filepath=dag.fileloc, dag_id=dag_id
             )
 
-    @freeze_time(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9))
-    @mock.patch('airflow.jobs.scheduler_job.Stats.timing')
-    def test_create_dag_runs(self, stats_timing):
+    def test_create_dag_runs(self):
         """
         Test various invariants of _create_dag_runs.
 
         - That the run created has the creating_job_id set
-        - That we emit the right DagRun metrics
+        - That the run created is on QUEUED State
+        - That dag_model has next_dagrun
         """
         dag = DAG(dag_id='test_create_dag_runs', start_date=DEFAULT_DATE)
 
@@ -3218,8 +3216,51 @@ class TestSchedulerJob(unittest.TestCase):
         with create_session() as session:
             self.scheduler_job._create_dag_runs([dag_model], session)
 
+        dr = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).first()
+        # Assert dr state is queued
+        assert dr.state == State.QUEUED
+        assert dr.start_date is None
+
+        assert dag.get_last_dagrun().creating_job_id == self.scheduler_job.id
+
+    @freeze_time(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9))
+    @mock.patch('airflow.jobs.scheduler_job.Stats.timing')
+    def test_start_dagruns(self, stats_timing):
+        """
+        Test that _start_dagrun:
+
+        - moves runs to RUNNING State
+        - emit the right DagRun metrics
+        """
+        dag = DAG(dag_id='test_start_dag_runs', start_date=DEFAULT_DATE)
+
+        DummyOperator(
+            task_id='dummy',
+            dag=dag,
+        )
+
+        dagbag = DagBag(
+            dag_folder=os.devnull,
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.sync_to_db()
+        dag_model = DagModel.get_dagmodel(dag.dag_id)
+
+        self.scheduler_job = SchedulerJob(executor=self.null_exec)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+
+        with create_session() as session:
+            self.scheduler_job._create_dag_runs([dag_model], session)
+            self.scheduler_job._start_queued_dagruns(session)
+
+        dr = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).first()
+        # Assert dr state is running
+        assert dr.state == State.RUNNING
+
         stats_timing.assert_called_once_with(
-            "dagrun.schedule_delay.test_create_dag_runs", datetime.timedelta(seconds=9)
+            "dagrun.schedule_delay.test_start_dag_runs", datetime.timedelta(seconds=9)
         )
 
         assert dag.get_last_dagrun().creating_job_id == self.scheduler_job.id
@@ -3418,61 +3459,7 @@ class TestSchedulerJob(unittest.TestCase):
         assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1)
         session.rollback()
 
-    def test_do_schedule_max_active_runs_upstream_failed(self):
-        """
-        Test that tasks in upstream failed don't count as actively running.
-
-        This test can be removed when adding a queued state to DagRuns.
-        """
-
-        with DAG(
-            dag_id='test_max_active_run_with_upstream_failed',
-            start_date=DEFAULT_DATE,
-            schedule_interval='@once',
-            max_active_runs=1,
-        ) as dag:
-            # Can't use DummyOperator as that goes straight to success
-            task1 = BashOperator(task_id='dummy1', bash_command='true')
-
-        session = settings.Session()
-        dagbag = DagBag(
-            dag_folder=os.devnull,
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        dagbag.sync_to_db(session=session)
-
-        run1 = dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            execution_date=DEFAULT_DATE,
-            state=State.RUNNING,
-            session=session,
-        )
-
-        ti = run1.get_task_instance(task1.task_id, session)
-        ti.state = State.UPSTREAM_FAILED
-
-        run2 = dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            execution_date=DEFAULT_DATE + timedelta(hours=1),
-            state=State.RUNNING,
-            session=session,
-        )
-
-        dag.sync_to_db(session=session)  # Update the date fields
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.executor = MockExecutor(do_update=False)
-        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
-
-        num_queued = self.scheduler_job._do_scheduling(session)
-
-        assert num_queued == 1
-        ti = run2.get_task_instance(task1.task_id, session)
-        assert ti.state == State.QUEUED
-
+    @conf_vars({('scheduler', 'use_job_schedule'): "false"})
     def test_do_schedule_max_active_runs_dag_timed_out(self):
         """Test that tasks are set to a finished state when their DAG times out"""
 
@@ -3505,33 +3492,36 @@ class TestSchedulerJob(unittest.TestCase):
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
             state=State.RUNNING,
+            start_date=timezone.utcnow() - timedelta(seconds=2),
             session=session,
         )
+
         run1_ti = run1.get_task_instance(task1.task_id, session)
         run1_ti.state = State.RUNNING
 
-        sleep(1)
-
         run2 = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE + timedelta(seconds=10),
-            state=State.RUNNING,
+            state=State.QUEUED,
             session=session,
         )
 
         dag.sync_to_db(session=session)
-
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job.executor = MockExecutor()
         self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
 
-        _ = self.scheduler_job._do_scheduling(session)
-
+        self.scheduler_job._do_scheduling(session)
+        session.add(run1)
+        session.refresh(run1)
         assert run1.state == State.FAILED
         assert run1_ti.state == State.SKIPPED
-        assert run2.state == State.RUNNING
 
-        _ = self.scheduler_job._do_scheduling(session)
+        # Run scheduling again to assert run2 has started
+        self.scheduler_job._do_scheduling(session)
+        session.add(run2)
+        session.refresh(run2)
+        assert run2.state == State.RUNNING
         run2_ti = run2.get_task_instance(task1.task_id, session)
         assert run2_ti.state == State.QUEUED
 
@@ -3581,8 +3571,8 @@ class TestSchedulerJob(unittest.TestCase):
 
     def test_do_schedule_max_active_runs_and_manual_trigger(self):
         """
-        Make sure that when a DAG is already at max_active_runs, that manually triggering a run doesn't cause
-        the dag to "stall".
+        Make sure that when a DAG is already at max_active_runs, that manually triggered
+        dagruns don't start running.
         """
 
         with DAG(
@@ -3597,7 +3587,7 @@ class TestSchedulerJob(unittest.TestCase):
 
             task1 >> task2
 
-            task3 = BashOperator(task_id='dummy3', bash_command='true')
+            BashOperator(task_id='dummy3', bash_command='true')
 
         session = settings.Session()
         dagbag = DagBag(
@@ -3612,7 +3602,7 @@ class TestSchedulerJob(unittest.TestCase):
         dag_run = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
-            state=State.RUNNING,
+            state=State.QUEUED,
             session=session,
         )
 
@@ -3630,47 +3620,23 @@ class TestSchedulerJob(unittest.TestCase):
 
         assert num_queued == 2
         assert dag_run.state == State.RUNNING
-        ti1 = dag_run.get_task_instance(task1.task_id, session)
-        assert ti1.state == State.QUEUED
-
-        # Set task1 to success (so task2 can run) but keep task3 as "running"
-        ti1.state = State.SUCCESS
-
-        ti3 = dag_run.get_task_instance(task3.task_id, session)
-        ti3.state = State.RUNNING
-
-        session.flush()
-
-        # At this point, ti2 and ti3 of the scheduled dag run should be running
-        num_queued = self.scheduler_job._do_scheduling(session)
-
-        assert num_queued == 1
-        # Should have queued task2
-        ti2 = dag_run.get_task_instance(task2.task_id, session)
-        assert ti2.state == State.QUEUED
-
-        ti2.state = None
-        session.flush()
 
         # Now that this one is running, manually trigger a dag.
 
-        manual_run = dag.create_dagrun(
+        dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             execution_date=DEFAULT_DATE + timedelta(hours=1),
-            state=State.RUNNING,
+            state=State.QUEUED,
             session=session,
         )
         session.flush()
 
-        num_queued = self.scheduler_job._do_scheduling(session)
+        self.scheduler_job._do_scheduling(session)
 
-        assert num_queued == 1
-        # Should have queued task2 again.
-        ti2 = dag_run.get_task_instance(task2.task_id, session)
-        assert ti2.state == State.QUEUED
-        # Manual run shouldn't have been started, because we're at max_active_runs with DR1
-        ti1 = manual_run.get_task_instance(task1.task_id, session)
-        assert ti1.state is None
+        # Assert that only 1 dagrun is active
+        assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)) == 1
+        # Assert that the other one is queued
+        assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 1
 
 
 @pytest.mark.xfail(reason="Work out where this goes")
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index 9b8fbd0..4f64347 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -19,6 +19,8 @@
 import datetime
 import unittest
 
+from parameterized import parameterized
+
 from airflow import settings
 from airflow.models import DAG, TaskInstance as TI, TaskReschedule, clear_task_instances
 from airflow.operators.dummy import DummyOperator
@@ -92,6 +94,41 @@ class TestClearTasks(unittest.TestCase):
             assert ti0.state is None
             assert ti0.external_executor_id is None
 
+    @parameterized.expand([(State.QUEUED, None), (State.RUNNING, DEFAULT_DATE)])
+    def test_clear_task_instances_dr_state(self, state, last_scheduling):
+        """Test that DR state is set to None after clear.
+        And that DR.last_scheduling_decision is handled OK.
+        start_date is also set to None
+        """
+        dag = DAG(
+            'test_clear_task_instances',
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+        )
+        task0 = DummyOperator(task_id='0', owner='test', dag=dag)
+        task1 = DummyOperator(task_id='1', owner='test', dag=dag, retries=2)
+        ti0 = TI(task=task0, execution_date=DEFAULT_DATE)
+        ti1 = TI(task=task1, execution_date=DEFAULT_DATE)
+        session = settings.Session()
+        dr = dag.create_dagrun(
+            execution_date=ti0.execution_date,
+            state=State.RUNNING,
+            run_type=DagRunType.SCHEDULED,
+        )
+        dr.last_scheduling_decision = DEFAULT_DATE
+        session.add(dr)
+        session.commit()
+
+        ti0.run()
+        ti1.run()
+        qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
+        clear_task_instances(qry, session, dag_run_state=state, dag=dag)
+
+        dr = ti0.get_dagrun()
+        assert dr.state == state
+        assert dr.start_date is None
+        assert dr.last_scheduling_decision == last_scheduling
+
     def test_clear_task_instances_without_task(self):
         dag = DAG(
             'test_clear_task_instances_without_task',
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 7899199..fac38bf 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -39,7 +39,7 @@ from airflow.utils.state import State
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.types import DagRunType
 from tests.models import DEFAULT_DATE
-from tests.test_utils.db import clear_db_jobs, clear_db_pools, clear_db_runs
+from tests.test_utils.db import clear_db_dags, clear_db_jobs, clear_db_pools, clear_db_runs
 
 
 class TestDagRun(unittest.TestCase):
@@ -50,6 +50,12 @@ class TestDagRun(unittest.TestCase):
     def setUp(self):
         clear_db_runs()
         clear_db_pools()
+        clear_db_dags()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
+        clear_db_pools()
+        clear_db_dags()
 
     def create_dag_run(
         self,
@@ -102,7 +108,7 @@ class TestDagRun(unittest.TestCase):
         session.commit()
         ti0.refresh_from_db()
         dr0 = session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.execution_date == now).first()
-        assert dr0.state == State.RUNNING
+        assert dr0.state == State.QUEUED
 
     def test_dagrun_find(self):
         session = settings.Session()
@@ -692,9 +698,11 @@ class TestDagRun(unittest.TestCase):
         ti.run()
         assert (ti.state == State.SUCCESS) == is_ti_success
 
-    def test_next_dagruns_to_examine_only_unpaused(self):
+    @parameterized.expand([(State.QUEUED,), (State.RUNNING,)])
+    def test_next_dagruns_to_examine_only_unpaused(self, state):
         """
         Check that "next_dagruns_to_examine" ignores runs from paused/inactive DAGs
+        and gets running/queued dagruns
         """
 
         dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
@@ -712,25 +720,22 @@ class TestDagRun(unittest.TestCase):
         session.flush()
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
-            state=State.RUNNING,
+            state=state,
             execution_date=DEFAULT_DATE,
-            start_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE if state == State.RUNNING else None,
             session=session,
         )
 
-        runs = DagRun.next_dagruns_to_examine(session).all()
+        runs = DagRun.next_dagruns_to_examine(state, session).all()
 
         assert runs == [dr]
 
         orm_dag.is_paused = True
         session.flush()
 
-        runs = DagRun.next_dagruns_to_examine(session).all()
+        runs = DagRun.next_dagruns_to_examine(state, session).all()
         assert runs == []
 
-        session.rollback()
-        session.close()
-
     @mock.patch.object(Stats, 'timing')
     def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock):
         """
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 187fdb2..274766c 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -545,16 +545,16 @@ def test_external_task_marker_clear_activate(dag_bag_parent_child):
     task_0 = dag_0.get_task("task_0")
     clear_tasks(dag_bag, dag_0, task_0, start_date=day_1, end_date=day_2)
 
-    # Assert that dagruns of all the affected dags are set to RUNNING after tasks are cleared.
+    # Assert that dagruns of all the affected dags are set to QUEUED after tasks are cleared.
     # Unaffected dagruns should be left as SUCCESS.
     dagrun_0_1 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_1)
     dagrun_0_2 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_2)
     dagrun_1_1 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_1)
     dagrun_1_2 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_2)
 
-    assert dagrun_0_1.state == State.RUNNING
-    assert dagrun_0_2.state == State.RUNNING
-    assert dagrun_1_1.state == State.RUNNING
+    assert dagrun_0_1.state == State.QUEUED
+    assert dagrun_0_2.state == State.QUEUED
+    assert dagrun_1_1.state == State.QUEUED
     assert dagrun_1_2.state == State.SUCCESS