You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/03/23 03:26:08 UTC

[airflow] 21/34: Fixes some of the flaky tests in test_scheduler_job (#14792)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f6c0332b6c15128047ad9b5614dd7754cd9d7266
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Thu Mar 18 14:01:10 2021 +0100

    Fixes some of the flaky tests in test_scheduler_job (#14792)
    
    The Parallel tests from #14531 created a good opportunity to
    reproduce some of the race conditions that cause some of the
    scheduler job test to be flaky.
    
    This change is an attempt to fix three of the flaky tests
    there by removing side effects between tests. The previous
    implementation did not take into account that scheduler job
    processes might still be running when the test finishes and
    the tests could have unintended side effects - especially
    when they were run on a busy machine.
    
    This PR adds mechanism that stops all running
    schedulerJob processes in tearDown before cleaning
    the database.
    
    Fixes: #14778
    Fixes: #14773
    Fixes: #14772
    Fixes: #14771
    Fixes: #11571
    Fixes: #12861
    Fixes: #11676
    Fixes: #11454
    Fixes: #11442
    Fixes: #11441
    (cherry picked from commit 45cf89ce51b203bdf4a2545c67449b67ac5e94f1)
---
 .github/workflows/build-images-workflow-run.yml |   8 +-
 .github/workflows/ci.yml                        |  27 +-
 scripts/ci/tools/ci_free_space_on_ci.sh         |   2 +-
 tests/jobs/test_scheduler_job.py                | 711 +++++++++++++-----------
 tests/models/test_taskinstance.py               |   8 +-
 tests/test_utils/asserts.py                     |   5 +-
 6 files changed, 392 insertions(+), 369 deletions(-)

diff --git a/.github/workflows/build-images-workflow-run.yml b/.github/workflows/build-images-workflow-run.yml
index b158fd2..193d26a 100644
--- a/.github/workflows/build-images-workflow-run.yml
+++ b/.github/workflows/build-images-workflow-run.yml
@@ -361,9 +361,7 @@ jobs:
         if: steps.defaults.outputs.proceed == 'true'
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: |
-          !contains(needs.build-info.outputs.runsOn, 'self-hosted') &&
-          steps.defaults.outputs.proceed == 'true'
+        if: steps.defaults.outputs.proceed == 'true'
       - name: "Build CI images ${{ matrix.python-version }}:${{ github.event.workflow_run.id }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
         if: steps.defaults.outputs.proceed == 'true'
@@ -491,9 +489,7 @@ jobs:
         if: steps.defaults.outputs.proceed == 'true'
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: |
-          !contains(needs.build-info.outputs.runsOn, 'self-hosted') &&
-          steps.defaults.outputs.proceed == 'true'
+        if: steps.defaults.outputs.proceed == 'true'
       - name: "Build CI images ${{ matrix.python-version }}:${{ github.event.workflow_run.id }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
         # Pull images built in the previous step
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c0523d6..1d0d7b2 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -255,8 +255,7 @@ jobs:
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
         if: |
-          needs.build-info.outputs.waitForImage == 'true' &&
-          !contains(needs.build-info.outputs.runsOn, 'self-hosted')
+          needs.build-info.outputs.waitForImage == 'true'
       - name: >
           Wait for CI images
           ${{ needs.build-info.outputs.pythonVersions }}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}
@@ -296,8 +295,7 @@ jobs:
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
         if: |
-          needs.build-info.outputs.waitForImage == 'true' &&
-          !contains(needs.build-info.outputs.runsOn, 'self-hosted')
+          needs.build-info.outputs.waitForImage == 'true'
       - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Verify CI image Py${{matrix.python-version}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
@@ -326,7 +324,6 @@ jobs:
           python-version: ${{needs.build-info.outputs.defaultPythonVersion}}
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Get Python version"
@@ -430,7 +427,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           python-version: ${{needs.build-info.outputs.defaultPythonVersion}}
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Get Python version"
@@ -521,7 +517,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Prepare provider documentation"
@@ -575,7 +570,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Prepare provider documentation"
@@ -622,7 +616,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Prepare provider documentation"
@@ -658,7 +651,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Tests: ${{needs.build-info.outputs.testTypes}}"
@@ -718,7 +710,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Tests: ${{needs.build-info.outputs.testTypes}}"
@@ -777,7 +768,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Tests: ${{needs.build-info.outputs.testTypes}}"
@@ -833,7 +823,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Tests: ${{needs.build-info.outputs.testTypes}}"
@@ -906,7 +895,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           echo "ISSUE_ID=10128" >> $GITHUB_ENV
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Tests: Quarantined"
@@ -994,8 +982,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
         if: |
-          needs.build-info.outputs.waitForImage == 'true' &&
-          !contains(needs.build-info.outputs.runsOn, 'self-hosted')
+          needs.build-info.outputs.waitForImage == 'true'
       - name: >
           Wait for PROD images
           ${{ needs.build-info.outputs.pythonVersions }}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}
@@ -1031,9 +1018,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           persist-credentials: false
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: |
-          needs.build-info.outputs.waitForImage == 'true' &&
-          !contains(needs.build-info.outputs.runsOn, 'self-hosted')
+        if: needs.build-info.outputs.waitForImage == 'true'
       - name: "Prepare PROD Image"
         run: ./scripts/ci/images/ci_prepare_prod_image_on_ci.sh
       - name: "Verify PROD image Py${{matrix.python-version}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
@@ -1155,7 +1140,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name:
           "Prepare PROD image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
         run: ./scripts/ci/images/ci_prepare_prod_image_on_ci.sh
@@ -1204,7 +1188,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Push CI image ${{ matrix.python-version }}:${{ env.GITHUB_REGISTRY_PUSH_IMAGE_TAG }}"
@@ -1239,7 +1222,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ github.sha }}"
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Generate constraints with PyPI providers"
@@ -1331,7 +1313,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           submodules: recursive
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-        if: "!contains(needs.build-info.outputs.runsOn, 'self-hosted')"
       - name: "Tag commit"
         run: |
           BRANCH_NAME=$(echo "${{ github.ref }}" | sed 's/refs\/heads\///')
diff --git a/scripts/ci/tools/ci_free_space_on_ci.sh b/scripts/ci/tools/ci_free_space_on_ci.sh
index e351895..e21e463 100755
--- a/scripts/ci/tools/ci_free_space_on_ci.sh
+++ b/scripts/ci/tools/ci_free_space_on_ci.sh
@@ -21,5 +21,5 @@
 sudo swapoff -a
 sudo rm -f /swapfile
 sudo apt clean
-docker system prune --all --force
+docker system prune --all --force --volumes
 df -h
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index e0a4723..f530705 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
+# pylint: disable=attribute-defined-outside-init
 import datetime
 import os
 import shutil
@@ -117,8 +117,12 @@ class TestDagFileProcessor(unittest.TestCase):
         # Speed up some tests by not running the tasks, just look at what we
         # enqueue!
         self.null_exec = MockExecutor()
+        self.scheduler_job = None
 
     def tearDown(self) -> None:
+        if self.scheduler_job and self.scheduler_job.processor_agent:
+            self.scheduler_job.processor_agent.end()
+            self.scheduler_job = None
         self.clean_db()
 
     def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timedelta(hours=1), **kwargs):
@@ -398,9 +402,9 @@ class TestDagFileProcessor(unittest.TestCase):
 
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.processor_agent = mock.MagicMock()
-        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
         dag.clear()
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
@@ -415,7 +419,7 @@ class TestDagFileProcessor(unittest.TestCase):
             ti.start_date = start_date
             ti.end_date = end_date
 
-            count = scheduler._schedule_dag_run(dr, set(), session)
+            count = self.scheduler_job._schedule_dag_run(dr, set(), session)
             assert count == 1
 
             session.refresh(ti)
@@ -455,9 +459,9 @@ class TestDagFileProcessor(unittest.TestCase):
 
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.processor_agent = mock.MagicMock()
-        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
         dag.clear()
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
@@ -472,7 +476,7 @@ class TestDagFileProcessor(unittest.TestCase):
             ti.start_date = start_date
             ti.end_date = end_date
 
-            count = scheduler._schedule_dag_run(dr, set(), session)
+            count = self.scheduler_job._schedule_dag_run(dr, set(), session)
             assert count == 1
 
             session.refresh(ti)
@@ -514,9 +518,9 @@ class TestDagFileProcessor(unittest.TestCase):
 
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.processor_agent = mock.MagicMock()
-        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
         dag.clear()
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
@@ -532,7 +536,7 @@ class TestDagFileProcessor(unittest.TestCase):
                 ti.start_date = start_date
                 ti.end_date = end_date
 
-            count = scheduler._schedule_dag_run(dr, set(), session)
+            count = self.scheduler_job._schedule_dag_run(dr, set(), session)
             assert count == 2
 
             session.refresh(tis[0])
@@ -547,21 +551,23 @@ class TestDagFileProcessor(unittest.TestCase):
         dag = DAG(dag_id='test_scheduler_add_new_task', start_date=DEFAULT_DATE)
         BashOperator(task_id='dummy', dag=dag, owner='airflow', bash_command='echo test')
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
 
         # Since we don't want to store the code for the DAG defined in this file
         with mock.patch.object(settings, "STORE_DAG_CODE", False):
-            scheduler.dagbag.sync_to_db()
+            self.scheduler_job.dagbag.sync_to_db()
 
         session = settings.Session()
         orm_dag = session.query(DagModel).get(dag.dag_id)
         assert orm_dag is not None
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.processor_agent = mock.MagicMock()
-        dag = scheduler.dagbag.get_dag('test_scheduler_add_new_task', session=session)
-        scheduler._create_dag_runs([orm_dag], session)
+        if self.scheduler_job.processor_agent:
+            self.scheduler_job.processor_agent.end()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        dag = self.scheduler_job.dagbag.get_dag('test_scheduler_add_new_task', session=session)
+        self.scheduler_job._create_dag_runs([orm_dag], session)
 
         drs = DagRun.find(dag_id=dag.dag_id, session=session)
         assert len(drs) == 1
@@ -573,7 +579,7 @@ class TestDagFileProcessor(unittest.TestCase):
         BashOperator(task_id='dummy2', dag=dag, owner='airflow', bash_command='echo test')
         SerializedDagModel.write_dag(dag=dag)
 
-        scheduled_tis = scheduler._schedule_dag_run(dr, set(), session)
+        scheduled_tis = self.scheduler_job._schedule_dag_run(dr, set(), session)
         session.flush()
         assert scheduled_tis == 2
 
@@ -601,9 +607,9 @@ class TestDagFileProcessor(unittest.TestCase):
         session.close()
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.processor_agent = mock.MagicMock()
-        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
         dag.clear()
 
         date = DEFAULT_DATE
@@ -637,11 +643,11 @@ class TestDagFileProcessor(unittest.TestCase):
         # and schedule them in, so we can check how many
         # tasks are put on the task_instances_list (should be one, not 3)
         with create_session() as session:
-            num_scheduled = scheduler._schedule_dag_run(dr1, set(), session)
+            num_scheduled = self.scheduler_job._schedule_dag_run(dr1, set(), session)
             assert num_scheduled == 1
-            num_scheduled = scheduler._schedule_dag_run(dr2, {dr1.execution_date}, session)
+            num_scheduled = self.scheduler_job._schedule_dag_run(dr2, {dr1.execution_date}, session)
             assert num_scheduled == 0
-            num_scheduled = scheduler._schedule_dag_run(dr3, {dr1.execution_date}, session)
+            num_scheduled = self.scheduler_job._schedule_dag_run(dr3, {dr1.execution_date}, session)
             assert num_scheduled == 0
 
     @patch.object(TaskInstance, 'handle_failure_with_callback')
@@ -710,25 +716,25 @@ class TestDagFileProcessor(unittest.TestCase):
         dagbag = DagBag(dag_folder=dag_file, include_examples=False, read_dags_from_db=False)
         dagbag.sync_to_db()
 
-        scheduler_job = SchedulerJob(subdir=os.devnull)
-        scheduler_job.processor_agent = mock.MagicMock()
-        dag = scheduler_job.dagbag.get_dag("test_only_dummy_tasks")
+        self.scheduler_job_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job_job.processor_agent = mock.MagicMock()
+        dag = self.scheduler_job_job.dagbag.get_dag("test_only_dummy_tasks")
 
         # Create DagRun
         session = settings.Session()
         orm_dag = session.query(DagModel).get(dag.dag_id)
-        scheduler_job._create_dag_runs([orm_dag], session)
+        self.scheduler_job_job._create_dag_runs([orm_dag], session)
 
         drs = DagRun.find(dag_id=dag.dag_id, session=session)
         assert len(drs) == 1
         dr = drs[0]
 
         # Schedule TaskInstances
-        scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job_job._schedule_dag_run(dr, {}, session)
         with create_session() as session:
             tis = session.query(TaskInstance).all()
 
-        dags = scheduler_job.dagbag.dags.values()
+        dags = self.scheduler_job_job.dagbag.dags.values()
         assert ['test_only_dummy_tasks'] == [dag.dag_id for dag in dags]
         assert 5 == len(tis)
         assert {
@@ -750,7 +756,7 @@ class TestDagFileProcessor(unittest.TestCase):
                 assert end_date is None
                 assert duration is None
 
-        scheduler_job._schedule_dag_run(dr, {}, session)
+        self.scheduler_job_job._schedule_dag_run(dr, {}, session)
         with create_session() as session:
             tis = session.query(TaskInstance).all()
 
@@ -777,13 +783,20 @@ class TestDagFileProcessor(unittest.TestCase):
 
 @pytest.mark.usefixtures("disable_load_example")
 class TestSchedulerJob(unittest.TestCase):
-    def setUp(self):
+    @staticmethod
+    def clean_db():
         clear_db_runs()
         clear_db_pools()
         clear_db_dags()
         clear_db_sla_miss()
         clear_db_errors()
+        clear_db_jobs()
+        # DO NOT try to run clear_db_serialized_dags() here - this will break the tests
+        # The tests expect DAGs to be fully loaded here via setUpClass method below
 
+    def setUp(self):
+        self.clean_db()
+        self.scheduler_job = None
         # Speed up some tests by not running the tasks, just look at what we
         # enqueue!
         self.null_exec = MockExecutor()
@@ -795,8 +808,12 @@ class TestSchedulerJob(unittest.TestCase):
         self.patcher_dag_code.start()
 
     def tearDown(self):
+        if self.scheduler_job and self.scheduler_job.processor_agent:
+            self.scheduler_job.processor_agent.end()
+            self.scheduler_job = None
         self.patcher.stop()
         self.patcher_dag_code.stop()
+        self.clean_db()
 
     @classmethod
     def setUpClass(cls):
@@ -806,23 +823,25 @@ class TestSchedulerJob(unittest.TestCase):
         cls.dagbag = DagBag(read_dags_from_db=True)
 
     def test_is_alive(self):
-        job = SchedulerJob(None, heartrate=10, state=State.RUNNING)
-        assert job.is_alive()
+        self.scheduler_job = SchedulerJob(None, heartrate=10, state=State.RUNNING)
+        assert self.scheduler_job.is_alive()
 
-        job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=20)
-        assert job.is_alive()
+        self.scheduler_job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=20)
+        assert self.scheduler_job.is_alive()
 
-        job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=31)
-        assert not job.is_alive()
+        self.scheduler_job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=31)
+        assert not self.scheduler_job.is_alive()
 
         # test because .seconds was used before instead of total_seconds
         # internal repr of datetime is (days, seconds)
-        job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(days=1)
-        assert not job.is_alive()
+        self.scheduler_job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(days=1)
+        assert not self.scheduler_job.is_alive()
 
-        job.state = State.SUCCESS
-        job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10)
-        assert not job.is_alive(), "Completed jobs even with recent heartbeat should not be alive"
+        self.scheduler_job.state = State.SUCCESS
+        self.scheduler_job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10)
+        assert (
+            not self.scheduler_job.is_alive()
+        ), "Completed jobs even with recent heartbeat should not be alive"
 
     def run_single_scheduler_loop_with_no_dags(self, dags_folder):
         """
@@ -834,19 +853,20 @@ class TestSchedulerJob(unittest.TestCase):
         :param dags_folder: the directory to traverse
         :type dags_folder: str
         """
-        scheduler = SchedulerJob(
+        self.scheduler_job = SchedulerJob(
             executor=self.null_exec, num_times_parse_dags=1, subdir=os.path.join(dags_folder)
         )
-        scheduler.heartrate = 0
-        scheduler.run()
+        self.scheduler_job.heartrate = 0
+        self.scheduler_job.run()
 
-    @pytest.mark.quarantined
     def test_no_orphan_process_will_be_left(self):
         empty_dir = mkdtemp()
         current_process = psutil.Process()
         old_children = current_process.children(recursive=True)
-        scheduler = SchedulerJob(subdir=empty_dir, num_runs=1, executor=MockExecutor(do_update=False))
-        scheduler.run()
+        self.scheduler_job = SchedulerJob(
+            subdir=empty_dir, num_runs=1, executor=MockExecutor(do_update=False)
+        )
+        self.scheduler_job.run()
         shutil.rmtree(empty_dir)
 
         # Remove potential noise created by previous tests.
@@ -870,8 +890,8 @@ class TestSchedulerJob(unittest.TestCase):
         executor = MockExecutor(do_update=False)
         task_callback = mock.MagicMock()
         mock_task_callback.return_value = task_callback
-        scheduler = SchedulerJob(executor=executor)
-        scheduler.processor_agent = mock.MagicMock()
+        self.scheduler_job = SchedulerJob(executor=executor)
+        self.scheduler_job.processor_agent = mock.MagicMock()
 
         session = settings.Session()
         dag.sync_to_db(session=session)
@@ -884,7 +904,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         executor.event_buffer[ti1.key] = State.FAILED, None
 
-        scheduler._process_executor_events(session=session)
+        self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
         assert ti1.state == State.QUEUED
         mock_task_callback.assert_called_once_with(
@@ -895,8 +915,8 @@ class TestSchedulerJob(unittest.TestCase):
             'finished (failed) although the task says its queued. (Info: None) '
             'Was the task killed externally?',
         )
-        scheduler.processor_agent.send_callback_to_execute.assert_called_once_with(task_callback)
-        scheduler.processor_agent.reset_mock()
+        self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once_with(task_callback)
+        self.scheduler_job.processor_agent.reset_mock()
 
         # ti in success state
         ti1.state = State.SUCCESS
@@ -904,10 +924,10 @@ class TestSchedulerJob(unittest.TestCase):
         session.commit()
         executor.event_buffer[ti1.key] = State.SUCCESS, None
 
-        scheduler._process_executor_events(session=session)
+        self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
         assert ti1.state == State.SUCCESS
-        scheduler.processor_agent.send_callback_to_execute.assert_not_called()
+        self.scheduler_job.processor_agent.send_callback_to_execute.assert_not_called()
 
         mock_stats_incr.assert_called_once_with('scheduler.tasks.killed_externally')
 
@@ -918,8 +938,8 @@ class TestSchedulerJob(unittest.TestCase):
         try_number = 42
 
         executor = MagicMock()
-        scheduler = SchedulerJob(executor=executor)
-        scheduler.processor_agent = MagicMock()
+        self.scheduler_job = SchedulerJob(executor=executor)
+        self.scheduler_job.processor_agent = MagicMock()
         event_buffer = {TaskInstanceKey(dag_id, task_id, execution_date, try_number): (State.SUCCESS, None)}
         executor.get_event_buffer.return_value = event_buffer
 
@@ -931,7 +951,7 @@ class TestSchedulerJob(unittest.TestCase):
             ti.state = State.SUCCESS
             session.merge(ti)
 
-        scheduler._process_executor_events()
+        self.scheduler_job._process_executor_events()
         # Assert that the even_buffer is empty so the task was popped using right
         # task instance key
         assert event_buffer == {}
@@ -944,7 +964,7 @@ class TestSchedulerJob(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         dagmodel = DagModel(
@@ -965,7 +985,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.add(dagmodel)
         session.flush()
 
-        scheduler._critical_section_execute_task_instances(session)
+        self.scheduler_job._critical_section_execute_task_instances(session)
         session.flush()
         ti1.refresh_from_db()
         assert State.SCHEDULED == ti1.state
@@ -982,7 +1002,7 @@ class TestSchedulerJob(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         dag_model = DagModel(
@@ -1003,7 +1023,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti1)
         session.flush()
 
-        scheduler._critical_section_execute_task_instances(session)
+        self.scheduler_job._critical_section_execute_task_instances(session)
         session.flush()
         ti1.refresh_from_db()
         assert State.QUEUED == ti1.state
@@ -1020,7 +1040,7 @@ class TestSchedulerJob(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         dag_model = DagModel(
@@ -1044,7 +1064,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         assert dr1.is_backfill
 
-        scheduler._critical_section_execute_task_instances(session)
+        self.scheduler_job._critical_section_execute_task_instances(session)
         session.flush()
         ti1.refresh_from_db()
         assert State.SCHEDULED == ti1.state
@@ -1057,7 +1077,7 @@ class TestSchedulerJob(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         dag_model = DagModel(
@@ -1092,7 +1112,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti_with_dagrun)
         session.flush()
 
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
 
         assert 2 == len(res)
         res_keys = map(lambda x: x.key, res)
@@ -1109,7 +1129,7 @@ class TestSchedulerJob(unittest.TestCase):
         task2 = DummyOperator(dag=dag, task_id=task_id_2, pool='b')
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         dag_model = DagModel(
@@ -1145,7 +1165,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.add(pool2)
         session.flush()
 
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
         session.flush()
         assert 3 == len(res)
         res_keys = []
@@ -1166,7 +1186,7 @@ class TestSchedulerJob(unittest.TestCase):
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         executor = MockExecutor(do_update=True)
-        scheduler = SchedulerJob(executor=executor)
+        self.scheduler_job = SchedulerJob(executor=executor)
         session = settings.Session()
         dag_model = DagModel(
             dag_id=dag_id,
@@ -1196,7 +1216,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.flush()
 
         # Two tasks w/o pool up for execution and our default pool size is 1
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
         assert 1 == len(res)
 
         ti2.state = State.RUNNING
@@ -1204,7 +1224,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.flush()
 
         # One task w/o pool up for execution and one task task running
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
         assert 0 == len(res)
 
         session.rollback()
@@ -1217,7 +1237,7 @@ class TestSchedulerJob(unittest.TestCase):
         task = DummyOperator(dag=dag, task_id=task_id, pool="this_pool_doesnt_exist")
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         dag_model = DagModel(
@@ -1238,7 +1258,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti)
         session.commit()
 
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
         session.flush()
         assert 0 == len(res)
         session.rollback()
@@ -1250,7 +1270,7 @@ class TestSchedulerJob(unittest.TestCase):
         DummyOperator(dag=dag, task_id=task_id_1)
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         dag_model = DagModel(
@@ -1267,7 +1287,7 @@ class TestSchedulerJob(unittest.TestCase):
         )
         session.flush()
 
-        assert 0 == len(scheduler._executable_task_instances_to_queued(max_tis=32, session=session))
+        assert 0 == len(self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session))
         session.rollback()
 
     def test_find_executable_task_instances_concurrency(self):
@@ -1277,7 +1297,7 @@ class TestSchedulerJob(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         dag_model = DagModel(
@@ -1315,7 +1335,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         session.flush()
 
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
 
         assert 1 == len(res)
         res_keys = map(lambda x: x.key, res)
@@ -1325,7 +1345,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti2)
         session.flush()
 
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
 
         assert 0 == len(res)
         session.rollback()
@@ -1338,7 +1358,7 @@ class TestSchedulerJob(unittest.TestCase):
         task3 = DummyOperator(dag=dag, task_id='dummy3')
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
         dag_model = DagModel(
             dag_id=dag_id,
@@ -1366,7 +1386,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         session.flush()
 
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
 
         assert 1 == len(res)
         assert res[0].key == ti3.key
@@ -1382,11 +1402,11 @@ class TestSchedulerJob(unittest.TestCase):
         task2 = DummyOperator(dag=dag, task_id=task_id_2)
 
         executor = MockExecutor(do_update=True)
-        scheduler = SchedulerJob(executor=executor)
+        self.scheduler_job = SchedulerJob(executor=executor)
         session = settings.Session()
 
-        scheduler.dagbag.bag_dag(dag, root_dag=dag)
-        scheduler.dagbag.sync_to_db(session=session)
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+        self.scheduler_job.dagbag.sync_to_db(session=session)
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dr1 = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
@@ -1413,7 +1433,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti2)
         session.flush()
 
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
 
         assert 2 == len(res)
 
@@ -1426,7 +1446,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti1_2)
         session.flush()
 
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
 
         assert 1 == len(res)
 
@@ -1437,7 +1457,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti1_3)
         session.flush()
 
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
 
         assert 0 == len(res)
 
@@ -1449,7 +1469,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti1_3)
         session.flush()
 
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
 
         assert 2 == len(res)
 
@@ -1461,7 +1481,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti1_3)
         session.flush()
 
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
 
         assert 1 == len(res)
         session.rollback()
@@ -1473,7 +1493,7 @@ class TestSchedulerJob(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         date = DEFAULT_DATE
@@ -1507,7 +1527,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         session.flush()
 
-        res = scheduler._executable_task_instances_to_queued(max_tis=100, session=session)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=100, session=session)
         assert 0 == len(res)
 
         session.rollback()
@@ -1519,7 +1539,7 @@ class TestSchedulerJob(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         dag_model = DagModel(
@@ -1540,7 +1560,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.flush()
 
         with patch.object(BaseExecutor, 'queue_command') as mock_queue_command:
-            scheduler._enqueue_task_instances_with_queued_state([ti1])
+            self.scheduler_job._enqueue_task_instances_with_queued_state([ti1])
 
         assert mock_queue_command.called
         session.rollback()
@@ -1558,7 +1578,7 @@ class TestSchedulerJob(unittest.TestCase):
         task2 = DummyOperator(dag=dag, task_id=task_id_2)
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         # create first dag run with 1 running and 1 queued
@@ -1607,7 +1627,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         assert State.RUNNING == dr2.state
 
-        res = scheduler._critical_section_execute_task_instances(session)
+        res = self.scheduler_job._critical_section_execute_task_instances(session)
 
         # check that concurrency is respected
         ti1.refresh_from_db()
@@ -1635,7 +1655,7 @@ class TestSchedulerJob(unittest.TestCase):
         task2 = DummyOperator(dag=dag, task_id=task_id_2)
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         dag_model = DagModel(
@@ -1665,20 +1685,20 @@ class TestSchedulerJob(unittest.TestCase):
             session.merge(ti1)
             session.merge(ti2)
             session.flush()
-        scheduler.max_tis_per_query = 2
-        res = scheduler._critical_section_execute_task_instances(session)
+        self.scheduler_job.max_tis_per_query = 2
+        res = self.scheduler_job._critical_section_execute_task_instances(session)
         assert 2 == res
 
-        scheduler.max_tis_per_query = 8
+        self.scheduler_job.max_tis_per_query = 8
         with mock.patch.object(
-            type(scheduler.executor), 'slots_available', new_callable=mock.PropertyMock
+            type(self.scheduler_job.executor), 'slots_available', new_callable=mock.PropertyMock
         ) as mock_slots:
             mock_slots.return_value = 2
             # Check that we don't "overfill" the executor
             assert 2 == res
-            res = scheduler._critical_section_execute_task_instances(session)
+            res = self.scheduler_job._critical_section_execute_task_instances(session)
 
-        res = scheduler._critical_section_execute_task_instances(session)
+        res = self.scheduler_job._critical_section_execute_task_instances(session)
         assert 4 == res
         for ti in tis:
             ti.refresh_from_db()
@@ -1696,7 +1716,7 @@ class TestSchedulerJob(unittest.TestCase):
         task2 = DummyOperator(dag=dag, task_id=task_id_2)
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         dag_model = DagModel(
@@ -1726,10 +1746,10 @@ class TestSchedulerJob(unittest.TestCase):
             session.merge(ti1)
             session.merge(ti2)
             session.flush()
-        scheduler.max_tis_per_query = 0
-        scheduler.executor = MagicMock(slots_available=36)
+        self.scheduler_job.max_tis_per_query = 0
+        self.scheduler_job.executor = MagicMock(slots_available=36)
 
-        res = scheduler._critical_section_execute_task_instances(session)
+        res = self.scheduler_job._critical_section_execute_task_instances(session)
         # 20 dag runs * 2 tasks each = 40, but limited by number of slots available
         self.assertEqual(36, res)
         session.rollback()
@@ -1788,10 +1808,10 @@ class TestSchedulerJob(unittest.TestCase):
         dagbag.sync_to_db(session)
         session.commit()
 
-        scheduler = SchedulerJob(num_runs=0)
-        scheduler.dagbag.collect_dags_from_db()
+        self.scheduler_job = SchedulerJob(num_runs=0)
+        self.scheduler_job.dagbag.collect_dags_from_db()
 
-        scheduler._change_state_for_tis_without_dagrun(
+        self.scheduler_job._change_state_for_tis_without_dagrun(
             old_states=[State.SCHEDULED, State.QUEUED], new_state=State.NONE, session=session
         )
 
@@ -1820,7 +1840,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(dr1)
         session.commit()
 
-        scheduler._change_state_for_tis_without_dagrun(
+        self.scheduler_job._change_state_for_tis_without_dagrun(
             old_states=[State.SCHEDULED, State.QUEUED], new_state=State.NONE, session=session
         )
 
@@ -1845,12 +1865,12 @@ class TestSchedulerJob(unittest.TestCase):
 
         # If there's no left over task in executor.queued_tasks, nothing happens
         session = settings.Session()
-        scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         mock_logger = mock.MagicMock()
         test_executor = MockExecutor(do_update=False)
-        scheduler_job.executor = test_executor
-        scheduler_job._logger = mock_logger
-        scheduler_job._change_state_for_tasks_failed_to_execute()  # pylint: disable=no-value-for-parameter
+        self.scheduler_job.executor = test_executor
+        self.scheduler_job._logger = mock_logger
+        self.scheduler_job._change_state_for_tasks_failed_to_execute()
         mock_logger.info.assert_not_called()
 
         # Tasks failed to execute with QUEUED state will be set to SCHEDULED state.
@@ -1863,7 +1883,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti)  # pylint: disable=no-value-for-parameter
         session.commit()
 
-        scheduler_job._change_state_for_tasks_failed_to_execute()  # pylint: disable=no-value-for-parameter
+        self.scheduler_job._change_state_for_tasks_failed_to_execute()
 
         ti.refresh_from_db()
         assert State.SCHEDULED == ti.state
@@ -1876,7 +1896,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti)
         session.commit()
 
-        scheduler_job._change_state_for_tasks_failed_to_execute()  # pylint: disable=no-value-for-parameter
+        self.scheduler_job._change_state_for_tasks_failed_to_execute()
 
         ti.refresh_from_db()
         assert State.RUNNING == ti.state
@@ -1916,10 +1936,10 @@ class TestSchedulerJob(unittest.TestCase):
 
         processor = mock.MagicMock()
 
-        scheduler = SchedulerJob(num_runs=0)
-        scheduler.processor_agent = processor
+        self.scheduler_job = SchedulerJob(num_runs=0)
+        self.scheduler_job.processor_agent = processor
 
-        scheduler.adopt_or_reset_orphaned_tasks()
+        self.scheduler_job.adopt_or_reset_orphaned_tasks()
 
         ti = dr.get_task_instance(task_id=op1.task_id, session=session)
         assert ti.state == State.NONE
@@ -1966,19 +1986,19 @@ class TestSchedulerJob(unittest.TestCase):
 
         # This poll interval is large, bug the scheduler doesn't sleep that
         # long, instead we hit the clean_tis_without_dagrun interval instead
-        scheduler = SchedulerJob(num_runs=2, processor_poll_interval=30)
-        scheduler.dagbag = dagbag
+        self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30)
+        self.scheduler_job.dagbag = dagbag
         executor = MockExecutor(do_update=False)
         executor.queued_tasks
-        scheduler.executor = executor
+        self.scheduler_job.executor = executor
         processor = mock.MagicMock()
         processor.done = False
-        scheduler.processor_agent = processor
+        self.scheduler_job.processor_agent = processor
 
         with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars(
             {('scheduler', 'clean_tis_without_dagrun_interval'): '0.001'}
         ):
-            scheduler._run_scheduler_loop()
+            self.scheduler_job._run_scheduler_loop()
 
         ti = dr.get_task_instance(task_id=op1.task_id, session=session)
         assert ti.state == expected_task_state
@@ -2002,10 +2022,10 @@ class TestSchedulerJob(unittest.TestCase):
 
         DummyOperator(task_id='dummy', dag=dag, owner='airflow')
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
 
-        scheduler.dagbag.sync_to_db()
+        self.scheduler_job.dagbag.sync_to_db()
 
         session = settings.Session()
         orm_dag = session.query(DagModel).get(dag.dag_id)
@@ -2013,8 +2033,8 @@ class TestSchedulerJob(unittest.TestCase):
 
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler._create_dag_runs([orm_dag], session)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job._create_dag_runs([orm_dag], session)
 
         drs = DagRun.find(dag_id=dag.dag_id, session=session)
         assert len(drs) == 1
@@ -2030,10 +2050,10 @@ class TestSchedulerJob(unittest.TestCase):
         session.flush()
 
         # Mock that processor_agent is started
-        scheduler.processor_agent = mock.Mock()
-        scheduler.processor_agent.send_callback_to_execute = mock.Mock()
+        self.scheduler_job.processor_agent = mock.Mock()
+        self.scheduler_job.processor_agent.send_callback_to_execute = mock.Mock()
 
-        scheduler._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, {}, session)
         session.flush()
 
         session.refresh(dr)
@@ -2051,7 +2071,7 @@ class TestSchedulerJob(unittest.TestCase):
         )
 
         # Verify dag failure callback request is sent to file processor
-        scheduler.processor_agent.send_callback_to_execute.assert_called_once_with(expected_callback)
+        self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once_with(expected_callback)
 
         session.rollback()
         session.close()
@@ -2065,9 +2085,9 @@ class TestSchedulerJob(unittest.TestCase):
 
         DummyOperator(task_id='dummy', dag=dag, owner='airflow')
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.dagbag.bag_dag(dag, root_dag=dag)
-        scheduler.dagbag.sync_to_db()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+        self.scheduler_job.dagbag.sync_to_db()
 
         session = settings.Session()
         orm_dag = session.query(DagModel).get(dag.dag_id)
@@ -2075,8 +2095,8 @@ class TestSchedulerJob(unittest.TestCase):
 
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler._create_dag_runs([orm_dag], session)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job._create_dag_runs([orm_dag], session)
 
         drs = DagRun.find(dag_id=dag.dag_id, session=session)
         assert len(drs) == 1
@@ -2087,10 +2107,10 @@ class TestSchedulerJob(unittest.TestCase):
         session.flush()
 
         # Mock that processor_agent is started
-        scheduler.processor_agent = mock.Mock()
-        scheduler.processor_agent.send_callback_to_execute = mock.Mock()
+        self.scheduler_job.processor_agent = mock.Mock()
+        self.scheduler_job.processor_agent.send_callback_to_execute = mock.Mock()
 
-        scheduler._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, {}, session)
         session.flush()
 
         session.refresh(dr)
@@ -2105,7 +2125,7 @@ class TestSchedulerJob(unittest.TestCase):
         )
 
         # Verify dag failure callback request is sent to file processor
-        scheduler.processor_agent.send_callback_to_execute.assert_called_once_with(expected_callback)
+        self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once_with(expected_callback)
 
         session.rollback()
         session.close()
@@ -2125,22 +2145,22 @@ class TestSchedulerJob(unittest.TestCase):
 
         DummyOperator(task_id='dummy', dag=dag, owner='airflow')
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.processor_agent = mock.Mock()
-        scheduler.processor_agent.send_callback_to_execute = mock.Mock()
-        scheduler._send_sla_callbacks_to_processor = mock.Mock()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.Mock()
+        self.scheduler_job.processor_agent.send_callback_to_execute = mock.Mock()
+        self.scheduler_job._send_sla_callbacks_to_processor = mock.Mock()
 
         # Sync DAG into DB
         with mock.patch.object(settings, "STORE_DAG_CODE", False):
-            scheduler.dagbag.bag_dag(dag, root_dag=dag)
-            scheduler.dagbag.sync_to_db()
+            self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+            self.scheduler_job.dagbag.sync_to_db()
 
         session = settings.Session()
         orm_dag = session.query(DagModel).get(dag.dag_id)
         assert orm_dag is not None
 
         # Create DagRun
-        scheduler._create_dag_runs([orm_dag], session)
+        self.scheduler_job._create_dag_runs([orm_dag], session)
 
         drs = DagRun.find(dag_id=dag.dag_id, session=session)
         assert len(drs) == 1
@@ -2149,7 +2169,7 @@ class TestSchedulerJob(unittest.TestCase):
         ti = dr.get_task_instance('dummy')
         ti.set_state(state, session)
 
-        scheduler._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, {}, session)
 
         expected_callback = DagCallbackRequest(
             full_filepath=dr.dag.fileloc,
@@ -2160,10 +2180,10 @@ class TestSchedulerJob(unittest.TestCase):
         )
 
         # Verify dag failure callback request is sent to file processor
-        scheduler.processor_agent.send_callback_to_execute.assert_called_once_with(expected_callback)
+        self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once_with(expected_callback)
         # This is already tested separately
         # In this test we just want to verify that this function is called
-        scheduler._send_sla_callbacks_to_processor.assert_called_once_with(dag)
+        self.scheduler_job._send_sla_callbacks_to_processor.assert_called_once_with(dag)
 
         session.rollback()
         session.close()
@@ -2180,22 +2200,22 @@ class TestSchedulerJob(unittest.TestCase):
 
         BashOperator(task_id='test_task', dag=dag, owner='airflow', bash_command='echo hi')
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.processor_agent = mock.Mock()
-        scheduler.processor_agent.send_callback_to_execute = mock.Mock()
-        scheduler._send_dag_callbacks_to_processor = mock.Mock()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.Mock()
+        self.scheduler_job.processor_agent.send_callback_to_execute = mock.Mock()
+        self.scheduler_job._send_dag_callbacks_to_processor = mock.Mock()
 
         # Sync DAG into DB
         with mock.patch.object(settings, "STORE_DAG_CODE", False):
-            scheduler.dagbag.bag_dag(dag, root_dag=dag)
-            scheduler.dagbag.sync_to_db()
+            self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+            self.scheduler_job.dagbag.sync_to_db()
 
         session = settings.Session()
         orm_dag = session.query(DagModel).get(dag.dag_id)
         assert orm_dag is not None
 
         # Create DagRun
-        scheduler._create_dag_runs([orm_dag], session)
+        self.scheduler_job._create_dag_runs([orm_dag], session)
 
         drs = DagRun.find(dag_id=dag.dag_id, session=session)
         assert len(drs) == 1
@@ -2204,10 +2224,10 @@ class TestSchedulerJob(unittest.TestCase):
         ti = dr.get_task_instance('test_task')
         ti.set_state(state, session)
 
-        scheduler._schedule_dag_run(dr, set(), session)
+        self.scheduler_job._schedule_dag_run(dr, set(), session)
 
         # Verify Callback is not set (i.e is None) when no callbacks are set on DAG
-        scheduler._send_dag_callbacks_to_processor.assert_called_once_with(dr, None)
+        self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once_with(dr, None)
 
         session.rollback()
         session.close()
@@ -2233,8 +2253,8 @@ class TestSchedulerJob(unittest.TestCase):
         # Re-create the DAG, but remove the task
         dag = DAG(dag_id='test_scheduler_do_not_schedule_removed_task', start_date=DEFAULT_DATE)
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        res = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
 
         assert [] == res
         session.rollback()
@@ -2378,8 +2398,8 @@ class TestSchedulerJob(unittest.TestCase):
         dag_id = 'test_dagrun_states_root_future'
         dag = self.dagbag.get_dag(dag_id)
         dag.sync_to_db()
-        scheduler = SchedulerJob(num_runs=1, executor=self.null_exec, subdir=dag.fileloc)
-        scheduler.run()
+        self.scheduler_job = SchedulerJob(num_runs=1, executor=self.null_exec, subdir=dag.fileloc)
+        self.scheduler_job.run()
 
         first_run = DagRun.find(dag_id=dag_id, execution_date=DEFAULT_DATE)[0]
         ti_ids = [(ti.task_id, ti.state) for ti in first_run.get_task_instances()]
@@ -2438,8 +2458,8 @@ class TestSchedulerJob(unittest.TestCase):
             other_dag.is_paused_upon_creation = True
             other_dag.sync_to_db()
 
-            scheduler = SchedulerJob(executor=self.null_exec, subdir=dag.fileloc, num_runs=1)
-            scheduler.run()
+            self.scheduler_job = SchedulerJob(executor=self.null_exec, subdir=dag.fileloc, num_runs=1)
+            self.scheduler_job.run()
 
             # zero tasks ran
             assert len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()) == 0
@@ -2461,15 +2481,14 @@ class TestSchedulerJob(unittest.TestCase):
             ] == bf_exec.sorted_tasks
             session.commit()
 
-            scheduler = SchedulerJob(dag.fileloc, executor=self.null_exec, num_runs=1)
-            scheduler.run()
+            self.scheduler_job = SchedulerJob(dag.fileloc, executor=self.null_exec, num_runs=1)
+            self.scheduler_job.run()
 
             # still one task
             assert len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()) == 1
             session.commit()
             assert [] == self.null_exec.sorted_tasks
 
-    @pytest.mark.quarantined
     def test_scheduler_task_start_date(self):
         """
         Test that the scheduler respects task start dates that are different from DAG start dates
@@ -2488,8 +2507,8 @@ class TestSchedulerJob(unittest.TestCase):
 
         dagbag.sync_to_db()
 
-        scheduler = SchedulerJob(executor=self.null_exec, subdir=dag.fileloc, num_runs=2)
-        scheduler.run()
+        self.scheduler_job = SchedulerJob(executor=self.null_exec, subdir=dag.fileloc, num_runs=2)
+        self.scheduler_job.run()
 
         session = settings.Session()
         tiq = session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id)
@@ -2509,12 +2528,12 @@ class TestSchedulerJob(unittest.TestCase):
             dag = self.dagbag.get_dag(dag_id)
             dag.clear()
 
-        scheduler = SchedulerJob(
+        self.scheduler_job = SchedulerJob(
             executor=self.null_exec,
             subdir=os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py'),
             num_runs=1,
         )
-        scheduler.run()
+        self.scheduler_job.run()
 
         # zero tasks ran
         dag_id = 'test_start_date_scheduling'
@@ -2532,13 +2551,13 @@ class TestSchedulerJob(unittest.TestCase):
             dag = self.dagbag.get_dag(dag_id)
             dag.clear()
 
-        scheduler = SchedulerJob(
+        self.scheduler_job = SchedulerJob(
             executor=self.null_exec,
             subdir=os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py'),
             num_runs=1,
         )
 
-        scheduler.run()
+        self.scheduler_job.run()
 
         # zero tasks ran
         dag_id = 'test_start_date_scheduling'
@@ -2573,9 +2592,10 @@ class TestSchedulerJob(unittest.TestCase):
         session.flush()
 
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+        SerializedDagModel.write_dag(dag)
 
-        scheduler = SchedulerJob(executor=self.null_exec)
-        scheduler.processor_agent = mock.MagicMock()
+        self.scheduler_job = SchedulerJob(executor=self.null_exec)
+        self.scheduler_job.processor_agent = mock.MagicMock()
 
         # Create 2 dagruns, which will create 2 task instances.
         dr = dag.create_dagrun(
@@ -2583,15 +2603,16 @@ class TestSchedulerJob(unittest.TestCase):
             execution_date=DEFAULT_DATE,
             state=State.RUNNING,
         )
-        scheduler._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, {}, session)
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=dag.following_schedule(dr.execution_date),
             state=State.RUNNING,
         )
-        scheduler._schedule_dag_run(dr, {}, session)
-
-        task_instances_list = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        self.scheduler_job._schedule_dag_run(dr, {}, session)
+        task_instances_list = self.scheduler_job._executable_task_instances_to_queued(
+            max_tis=32, session=session
+        )
 
         assert len(task_instances_list) == 1
 
@@ -2624,11 +2645,12 @@ class TestSchedulerJob(unittest.TestCase):
         pool = Pool(pool='test_scheduler_verify_pool_full_2_slots_per_task', slots=6)
         session.add(pool)
         session.commit()
+        SerializedDagModel.write_dag(dag)
 
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
-        scheduler = SchedulerJob(executor=self.null_exec)
-        scheduler.processor_agent = mock.MagicMock()
+        self.scheduler_job = SchedulerJob(executor=self.null_exec)
+        self.scheduler_job.processor_agent = mock.MagicMock()
 
         # Create 5 dagruns, which will create 5 task instances.
         date = DEFAULT_DATE
@@ -2638,10 +2660,12 @@ class TestSchedulerJob(unittest.TestCase):
                 execution_date=date,
                 state=State.RUNNING,
             )
-            scheduler._schedule_dag_run(dr, {}, session)
+            self.scheduler_job._schedule_dag_run(dr, {}, session)
             date = dag.following_schedule(date)
 
-        task_instances_list = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        task_instances_list = self.scheduler_job._executable_task_instances_to_queued(
+            max_tis=32, session=session
+        )
 
         # As tasks require 2 slots, only 3 can fit into 6 available
         assert len(task_instances_list) == 3
@@ -2700,18 +2724,21 @@ class TestSchedulerJob(unittest.TestCase):
         session.commit()
 
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+        SerializedDagModel.write_dag(dag)
 
-        scheduler = SchedulerJob(executor=self.null_exec)
-        scheduler.processor_agent = mock.MagicMock()
+        self.scheduler_job = SchedulerJob(executor=self.null_exec)
+        self.scheduler_job.processor_agent = mock.MagicMock()
 
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
             state=State.RUNNING,
         )
-        scheduler._schedule_dag_run(dr, {}, session)
+        self.scheduler_job._schedule_dag_run(dr, {}, session)
 
-        task_instances_list = scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
+        task_instances_list = self.scheduler_job._executable_task_instances_to_queued(
+            max_tis=32, session=session
+        )
 
         # Only second and third
         assert len(task_instances_list) == 2
@@ -2747,18 +2774,18 @@ class TestSchedulerJob(unittest.TestCase):
         dag = DAG(dag_id='test_verify_integrity_if_dag_not_changed', start_date=DEFAULT_DATE)
         BashOperator(task_id='dummy', dag=dag, owner='airflow', bash_command='echo hi')
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.dagbag.bag_dag(dag, root_dag=dag)
-        scheduler.dagbag.sync_to_db()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+        self.scheduler_job.dagbag.sync_to_db()
 
         session = settings.Session()
         orm_dag = session.query(DagModel).get(dag.dag_id)
         assert orm_dag is not None
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.processor_agent = mock.MagicMock()
-        dag = scheduler.dagbag.get_dag('test_verify_integrity_if_dag_not_changed', session=session)
-        scheduler._create_dag_runs([orm_dag], session)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        dag = self.scheduler_job.dagbag.get_dag('test_verify_integrity_if_dag_not_changed', session=session)
+        self.scheduler_job._create_dag_runs([orm_dag], session)
 
         drs = DagRun.find(dag_id=dag.dag_id, session=session)
         assert len(drs) == 1
@@ -2766,7 +2793,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         # Verify that DagRun.verify_integrity is not called
         with mock.patch('airflow.jobs.scheduler_job.DagRun.verify_integrity') as mock_verify_integrity:
-            scheduled_tis = scheduler._schedule_dag_run(dr, {}, session)
+            scheduled_tis = self.scheduler_job._schedule_dag_run(dr, {}, session)
             mock_verify_integrity.assert_not_called()
         session.flush()
 
@@ -2800,18 +2827,18 @@ class TestSchedulerJob(unittest.TestCase):
         dag = DAG(dag_id='test_verify_integrity_if_dag_changed', start_date=DEFAULT_DATE)
         BashOperator(task_id='dummy', dag=dag, owner='airflow', bash_command='echo hi')
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.dagbag.bag_dag(dag, root_dag=dag)
-        scheduler.dagbag.sync_to_db()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+        self.scheduler_job.dagbag.sync_to_db()
 
         session = settings.Session()
         orm_dag = session.query(DagModel).get(dag.dag_id)
         assert orm_dag is not None
 
-        scheduler = SchedulerJob(subdir=os.devnull)
-        scheduler.processor_agent = mock.MagicMock()
-        dag = scheduler.dagbag.get_dag('test_verify_integrity_if_dag_changed', session=session)
-        scheduler._create_dag_runs([orm_dag], session)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        dag = self.scheduler_job.dagbag.get_dag('test_verify_integrity_if_dag_changed', session=session)
+        self.scheduler_job._create_dag_runs([orm_dag], session)
 
         drs = DagRun.find(dag_id=dag.dag_id, session=session)
         assert len(drs) == 1
@@ -2819,8 +2846,8 @@ class TestSchedulerJob(unittest.TestCase):
 
         dag_version_1 = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session)
         assert dr.dag_hash == dag_version_1
-        assert scheduler.dagbag.dags == {'test_verify_integrity_if_dag_changed': dag}
-        assert len(scheduler.dagbag.dags.get("test_verify_integrity_if_dag_changed").tasks) == 1
+        assert self.scheduler_job.dagbag.dags == {'test_verify_integrity_if_dag_changed': dag}
+        assert len(self.scheduler_job.dagbag.dags.get("test_verify_integrity_if_dag_changed").tasks) == 1
 
         # Now let's say the DAG got updated (new task got added)
         BashOperator(task_id='bash_task_1', dag=dag, bash_command='echo hi')
@@ -2829,7 +2856,7 @@ class TestSchedulerJob(unittest.TestCase):
         dag_version_2 = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session)
         assert dag_version_2 != dag_version_1
 
-        scheduled_tis = scheduler._schedule_dag_run(dr, {}, session)
+        scheduled_tis = self.scheduler_job._schedule_dag_run(dr, {}, session)
         session.flush()
 
         assert scheduled_tis == 2
@@ -2838,8 +2865,8 @@ class TestSchedulerJob(unittest.TestCase):
         assert len(drs) == 1
         dr = drs[0]
         assert dr.dag_hash == dag_version_2
-        assert scheduler.dagbag.dags == {'test_verify_integrity_if_dag_changed': dag}
-        assert len(scheduler.dagbag.dags.get("test_verify_integrity_if_dag_changed").tasks) == 2
+        assert self.scheduler_job.dagbag.dags == {'test_verify_integrity_if_dag_changed': dag}
+        assert len(self.scheduler_job.dagbag.dags.get("test_verify_integrity_if_dag_changed").tasks) == 2
 
         tis_count = (
             session.query(func.count(TaskInstance.task_id))
@@ -2888,11 +2915,11 @@ class TestSchedulerJob(unittest.TestCase):
             # Use a empty file since the above mock will return the
             # expected DAGs. Also specify only a single file so that it doesn't
             # try to schedule the above DAG repeatedly.
-            scheduler = SchedulerJob(
+            self.scheduler_job = SchedulerJob(
                 num_runs=1, executor=executor, subdir=os.path.join(settings.DAGS_FOLDER, "no_dags.py")
             )
-            scheduler.heartrate = 0
-            scheduler.run()
+            self.scheduler_job.heartrate = 0
+            self.scheduler_job.run()
 
         do_schedule()  # pylint: disable=no-value-for-parameter
         with create_session() as session:
@@ -2933,7 +2960,7 @@ class TestSchedulerJob(unittest.TestCase):
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
 
-    @pytest.mark.quarantined
+    @pytest.mark.skip(reason="This test needs fixing. It's very wrong now and always fails")
     def test_retry_handling_job(self):
         """
         Integration test of the scheduler not accidentally resetting
@@ -2943,9 +2970,9 @@ class TestSchedulerJob(unittest.TestCase):
         dag_task1 = dag.get_task("test_retry_handling_op")
         dag.clear()
 
-        scheduler = SchedulerJob(dag_id=dag.dag_id, num_runs=1)
-        scheduler.heartrate = 0
-        scheduler.run()
+        self.scheduler_job = SchedulerJob(dag_id=dag.dag_id, num_runs=1)
+        self.scheduler_job.heartrate = 0
+        self.scheduler_job.run()
 
         session = settings.Session()
         ti = (
@@ -2953,7 +2980,6 @@ class TestSchedulerJob(unittest.TestCase):
             .filter(TaskInstance.dag_id == dag.dag_id, TaskInstance.task_id == dag_task1.task_id)
             .first()
         )
-
         # make sure the counter has increased
         assert ti.try_number == 2
         assert ti.state == State.UP_FOR_RETRY
@@ -3303,9 +3329,9 @@ class TestSchedulerJob(unittest.TestCase):
 
     def test_adopt_or_reset_orphaned_tasks_nothing(self):
         """Try with nothing. """
-        scheduler = SchedulerJob()
+        self.scheduler_job = SchedulerJob()
         session = settings.Session()
-        assert 0 == scheduler.adopt_or_reset_orphaned_tasks(session=session)
+        assert 0 == self.scheduler_job.adopt_or_reset_orphaned_tasks(session=session)
 
     def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self):
         dag_id = 'test_reset_orphaned_tasks_external_triggered_dag'
@@ -3313,7 +3339,7 @@ class TestSchedulerJob(unittest.TestCase):
         task_id = dag_id + '_task'
         DummyOperator(task_id=task_id, dag=dag)
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         dr1 = dag.create_dagrun(
@@ -3330,7 +3356,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(dr1)
         session.commit()
 
-        num_reset_tis = scheduler.adopt_or_reset_orphaned_tasks(session=session)
+        num_reset_tis = self.scheduler_job.adopt_or_reset_orphaned_tasks(session=session)
         assert 1 == num_reset_tis
 
     def test_adopt_or_reset_orphaned_tasks_backfill_dag(self):
@@ -3339,9 +3365,9 @@ class TestSchedulerJob(unittest.TestCase):
         task_id = dag_id + '_task'
         DummyOperator(task_id=task_id, dag=dag)
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
-        session.add(scheduler)
+        session.add(self.scheduler_job)
         session.flush()
 
         dr1 = dag.create_dagrun(
@@ -3358,7 +3384,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.flush()
 
         assert dr1.is_backfill
-        assert 0 == scheduler.adopt_or_reset_orphaned_tasks(session=session)
+        assert 0 == self.scheduler_job.adopt_or_reset_orphaned_tasks(session=session)
         session.rollback()
 
     def test_reset_orphaned_tasks_nonexistent_dagrun(self):
@@ -3368,7 +3394,7 @@ class TestSchedulerJob(unittest.TestCase):
         task_id = dag_id + '_task'
         task = DummyOperator(task_id=task_id, dag=dag)
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
         ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
@@ -3380,7 +3406,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti)
         session.flush()
 
-        assert 0 == scheduler.adopt_or_reset_orphaned_tasks(session=session)
+        assert 0 == self.scheduler_job.adopt_or_reset_orphaned_tasks(session=session)
         session.rollback()
 
     def test_reset_orphaned_tasks_no_orphans(self):
@@ -3389,9 +3415,9 @@ class TestSchedulerJob(unittest.TestCase):
         task_id = dag_id + '_task'
         DummyOperator(task_id=task_id, dag=dag)
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
-        session.add(scheduler)
+        session.add(self.scheduler_job)
         session.flush()
 
         dr1 = dag.create_dagrun(
@@ -3403,12 +3429,12 @@ class TestSchedulerJob(unittest.TestCase):
         )
         tis = dr1.get_task_instances(session=session)
         tis[0].state = State.RUNNING
-        tis[0].queued_by_job_id = scheduler.id
+        tis[0].queued_by_job_id = self.scheduler_job.id
         session.merge(dr1)
         session.merge(tis[0])
         session.flush()
 
-        assert 0 == scheduler.adopt_or_reset_orphaned_tasks(session=session)
+        assert 0 == self.scheduler_job.adopt_or_reset_orphaned_tasks(session=session)
         tis[0].refresh_from_db()
         assert State.RUNNING == tis[0].state
 
@@ -3419,9 +3445,9 @@ class TestSchedulerJob(unittest.TestCase):
         task_id = dag_id + '_task'
         DummyOperator(task_id=task_id, dag=dag)
 
-        scheduler = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
-        session.add(scheduler)
+        session.add(self.scheduler_job)
         session.flush()
 
         dr1 = dag.create_dagrun(
@@ -3434,12 +3460,12 @@ class TestSchedulerJob(unittest.TestCase):
         tis = dr1.get_task_instances(session=session)
         assert 1 == len(tis)
         tis[0].state = State.SCHEDULED
-        tis[0].queued_by_job_id = scheduler.id
+        tis[0].queued_by_job_id = self.scheduler_job.id
         session.merge(dr1)
         session.merge(tis[0])
         session.flush()
 
-        assert 0 == scheduler.adopt_or_reset_orphaned_tasks(session=session)
+        assert 0 == self.scheduler_job.adopt_or_reset_orphaned_tasks(session=session)
         session.rollback()
 
     def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self):
@@ -3448,11 +3474,11 @@ class TestSchedulerJob(unittest.TestCase):
         DummyOperator(task_id='task1', dag=dag)
         DummyOperator(task_id='task2', dag=dag)
 
-        scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
-        scheduler_job.state = State.RUNNING
-        scheduler_job.latest_heartbeat = timezone.utcnow()
-        session.add(scheduler_job)
+        self.scheduler_job.state = State.RUNNING
+        self.scheduler_job.latest_heartbeat = timezone.utcnow()
+        session.add(self.scheduler_job)
 
         old_job = SchedulerJob(subdir=os.devnull)
         old_job.state = State.RUNNING
@@ -3476,11 +3502,11 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti1)
 
         ti2.state = State.SCHEDULED
-        ti2.queued_by_job_id = scheduler_job.id
+        ti2.queued_by_job_id = self.scheduler_job.id
         session.merge(ti2)
         session.flush()
 
-        num_reset_tis = scheduler_job.adopt_or_reset_orphaned_tasks(session=session)
+        num_reset_tis = self.scheduler_job.adopt_or_reset_orphaned_tasks(session=session)
 
         assert 1 == num_reset_tis
 
@@ -3489,6 +3515,8 @@ class TestSchedulerJob(unittest.TestCase):
         session.refresh(ti2)
         assert State.SCHEDULED == ti2.state
         session.rollback()
+        if old_job.processor_agent:
+            old_job.processor_agent.end()
 
     def test_send_sla_callbacks_to_processor_sla_disabled(self):
         """Test SLA Callbacks are not sent when check_slas is False"""
@@ -3497,13 +3525,13 @@ class TestSchedulerJob(unittest.TestCase):
         DummyOperator(task_id='task1', dag=dag)
 
         with patch.object(settings, "CHECK_SLAS", False):
-            scheduler_job = SchedulerJob(subdir=os.devnull)
+            self.scheduler_job = SchedulerJob(subdir=os.devnull)
             mock_agent = mock.MagicMock()
 
-            scheduler_job.processor_agent = mock_agent
+            self.scheduler_job.processor_agent = mock_agent
 
-            scheduler_job._send_sla_callbacks_to_processor(dag)
-            scheduler_job.processor_agent.send_sla_callback_request_to_execute.assert_not_called()
+            self.scheduler_job._send_sla_callbacks_to_processor(dag)
+            self.scheduler_job.processor_agent.send_sla_callback_request_to_execute.assert_not_called()
 
     def test_send_sla_callbacks_to_processor_sla_no_task_slas(self):
         """Test SLA Callbacks are not sent when no task SLAs are defined"""
@@ -3557,13 +3585,13 @@ class TestSchedulerJob(unittest.TestCase):
         dagbag.sync_to_db()
         dag_model = DagModel.get_dagmodel(dag.dag_id)
 
-        scheduler = SchedulerJob(executor=self.null_exec)
-        scheduler.processor_agent = mock.MagicMock()
+        self.scheduler_job = SchedulerJob(executor=self.null_exec)
+        self.scheduler_job.processor_agent = mock.MagicMock()
 
         with create_session() as session:
-            scheduler._create_dag_runs([dag_model], session)
+            self.scheduler_job._create_dag_runs([dag_model], session)
 
-        assert dag.get_last_dagrun().creating_job_id == scheduler.id
+        assert dag.get_last_dagrun().creating_job_id == self.scheduler_job.id
 
     def test_extra_operator_links_not_loaded_in_scheduler_loop(self):
         """
@@ -3591,12 +3619,12 @@ class TestSchedulerJob(unittest.TestCase):
         # Test that custom_task has >= 1 Operator Links (after de-serialization)
         assert custom_task.operator_extra_links
 
-        scheduler = SchedulerJob(executor=self.null_exec)
-        scheduler.processor_agent = mock.MagicMock()
-        scheduler._run_scheduler_loop()
+        self.scheduler_job = SchedulerJob(executor=self.null_exec)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        self.scheduler_job._run_scheduler_loop()
 
         # Get serialized dag
-        s_dag_2 = scheduler.dagbag.get_dag(dag.dag_id)
+        s_dag_2 = self.scheduler_job.dagbag.get_dag(dag.dag_id)
         custom_task = s_dag_2.task_dict['custom_task']
         # Test that custom_task has no Operator Links (after de-serialization) in the Scheduling Loop
         assert not custom_task.operator_extra_links
@@ -3623,13 +3651,13 @@ class TestSchedulerJob(unittest.TestCase):
         DAG.bulk_write_to_db(dagbag.dags.values())
         dag_model = DagModel.get_dagmodel(dag.dag_id)
 
-        scheduler = SchedulerJob(subdir=os.devnull, executor=self.null_exec)
-        scheduler.processor_agent = mock.MagicMock()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull, executor=self.null_exec)
+        self.scheduler_job.processor_agent = mock.MagicMock()
 
         with create_session() as session, self.assertLogs(
             'airflow.jobs.scheduler_job', level="ERROR"
         ) as log_output:
-            scheduler._create_dag_runs([dag_model], session)
+            self.scheduler_job._create_dag_runs([dag_model], session)
 
             assert (
                 "airflow.exceptions.SerializedDagNotFound: DAG "
@@ -3667,13 +3695,13 @@ class TestSchedulerJob(unittest.TestCase):
         dag_model = session.query(DagModel).get(dag.dag_id)
         assert dag_model.next_dagrun == DEFAULT_DATE
 
-        job = SchedulerJob(subdir=os.devnull)
-        job.executor = MockExecutor(do_update=False)
-        job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=False)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
 
         # Verify a DagRun is created with the correct execution_date
         # when Scheduler._do_scheduling is run in the Scheduler Loop
-        job._do_scheduling(session)
+        self.scheduler_job._do_scheduling(session)
         dr1 = dag.get_dagrun(DEFAULT_DATE, session)
         assert dr1 is not None
         assert dr1.state == State.RUNNING
@@ -3748,12 +3776,12 @@ class TestSchedulerJob(unittest.TestCase):
 
         assert dag.get_last_dagrun(session) == dagrun
 
-        scheduler = SchedulerJob(subdir=os.devnull, executor=self.null_exec)
-        scheduler.dagbag = dagbag
-        scheduler.processor_agent = mock.MagicMock()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull, executor=self.null_exec)
+        self.scheduler_job.dagbag = dagbag
+        self.scheduler_job.processor_agent = mock.MagicMock()
 
         # Test that this does not raise any error
-        scheduler._create_dag_runs([dag_model], session)
+        self.scheduler_job._create_dag_runs([dag_model], session)
 
         # Assert dag_model.next_dagrun is set correctly to next execution date
         assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1)
@@ -3804,11 +3832,11 @@ class TestSchedulerJob(unittest.TestCase):
 
         dag.sync_to_db(session=session)  # Update the date fields
 
-        job = SchedulerJob(subdir=os.devnull)
-        job.executor = MockExecutor(do_update=False)
-        job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=False)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
 
-        num_queued = job._do_scheduling(session)
+        num_queued = self.scheduler_job._do_scheduling(session)
 
         assert num_queued == 1
         ti = run2.get_task_instance(task1.task_id, session)
@@ -3862,17 +3890,17 @@ class TestSchedulerJob(unittest.TestCase):
 
         dag.sync_to_db(session=session)
 
-        job = SchedulerJob(subdir=os.devnull)
-        job.executor = MockExecutor()
-        job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor()
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
 
-        _ = job._do_scheduling(session)
+        _ = self.scheduler_job._do_scheduling(session)
 
         assert run1.state == State.FAILED
         assert run1_ti.state == State.SKIPPED
         assert run2.state == State.RUNNING
 
-        _ = job._do_scheduling(session)
+        _ = self.scheduler_job._do_scheduling(session)
         run2_ti = run2.get_task_instance(task1.task_id, session)
         assert run2_ti.state == State.QUEUED
 
@@ -3910,11 +3938,11 @@ class TestSchedulerJob(unittest.TestCase):
 
         dag.sync_to_db(session=session)  # Update the date fields
 
-        job = SchedulerJob(subdir=os.devnull)
-        job.executor = MockExecutor(do_update=False)
-        job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=False)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
 
-        num_queued = job._do_scheduling(session)
+        num_queued = self.scheduler_job._do_scheduling(session)
 
         assert num_queued == 1
         ti = run1.get_task_instance(task1.task_id, session)
@@ -3959,11 +3987,11 @@ class TestSchedulerJob(unittest.TestCase):
 
         dag.sync_to_db(session=session)  # Update the date fields
 
-        job = SchedulerJob(subdir=os.devnull)
-        job.executor = MockExecutor(do_update=False)
-        job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=False)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
 
-        num_queued = job._do_scheduling(session)
+        num_queued = self.scheduler_job._do_scheduling(session)
         # Add it back in to the session so we can refresh it. (_do_scheduling does an expunge_all to reduce
         # memory)
         session.add(dag_run)
@@ -3983,7 +4011,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.flush()
 
         # At this point, ti2 and ti3 of the scheduled dag run should be running
-        num_queued = job._do_scheduling(session)
+        num_queued = self.scheduler_job._do_scheduling(session)
 
         assert num_queued == 1
         # Should have queued task2
@@ -4003,7 +4031,7 @@ class TestSchedulerJob(unittest.TestCase):
         )
         session.flush()
 
-        num_queued = job._do_scheduling(session)
+        num_queued = self.scheduler_job._do_scheduling(session)
 
         assert num_queued == 1
         # Should have queued task2 again.
@@ -4059,27 +4087,36 @@ class TestSchedulerJobQueriesCount(unittest.TestCase):
     made that affects the performance of the SchedulerJob.
     """
 
-    def setUp(self) -> None:
+    @staticmethod
+    def clean_db():
         clear_db_runs()
         clear_db_pools()
         clear_db_dags()
         clear_db_sla_miss()
         clear_db_errors()
+        clear_db_jobs()
         clear_db_serialized_dags()
-        clear_db_dags()
+
+    def setUp(self) -> None:
+        self.clean_db()
+
+    def tearDown(self):
+        if self.scheduler_job and self.scheduler_job.processor_agent:
+            self.scheduler_job.processor_agent.end()
+            self.scheduler_job = None
+        self.clean_db()
 
     @parameterized.expand(
         [
             # expected, dag_count, task_count
             # One DAG with one task per DAG file
-            (23, 1, 1),  # noqa
+            (24, 1, 1),  # noqa
             # One DAG with five tasks per DAG  file
-            (23, 1, 5),  # noqa
+            (28, 1, 5),  # noqa
             # 10 DAGs with 10 tasks per DAG file
-            (95, 10, 10),  # noqa
+            (195, 10, 10),  # noqa
         ]
     )
-    @pytest.mark.quarantined
     def test_execute_queries_count_with_harvested_dags(self, expected_query_count, dag_count, task_count):
         with mock.patch.dict(
             "os.environ",
@@ -4094,10 +4131,13 @@ class TestSchedulerJobQueriesCount(unittest.TestCase):
             {
                 ('scheduler', 'use_job_schedule'): 'True',
                 ('core', 'load_examples'): 'False',
-                ('core', 'store_serialized_dags'): 'True',
+                # For longer running tests under heavy load, the min_serialized_dag_fetch_interval
+                # and min_serialized_dag_update_interval might kick-in and re-retrieve the record.
+                # This will increase the count of serliazied_dag.py.get() count.
+                # That's why we keep the values high
+                ('core', 'min_serialized_dag_update_interval'): '100',
+                ('core', 'min_serialized_dag_fetch_interval'): '100',
             }
-        ), mock.patch.object(
-            settings, 'STORE_SERIALIZED_DAGS', True
         ):
             dagruns = []
             dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False, read_dags_from_db=False)
@@ -4118,53 +4158,52 @@ class TestSchedulerJobQueriesCount(unittest.TestCase):
 
             mock_agent = mock.MagicMock()
 
-            job = SchedulerJob(subdir=PERF_DAGS_FOLDER, num_runs=1)
-            job.executor = MockExecutor(do_update=False)
-            job.heartbeat = mock.MagicMock()
-            job.processor_agent = mock_agent
+            self.scheduler_job = SchedulerJob(subdir=PERF_DAGS_FOLDER, num_runs=1)
+            self.scheduler_job.executor = MockExecutor(do_update=False)
+            self.scheduler_job.heartbeat = mock.MagicMock()
+            self.scheduler_job.processor_agent = mock_agent
 
             with assert_queries_count(expected_query_count):
                 with mock.patch.object(DagRun, 'next_dagruns_to_examine') as mock_dagruns:
                     mock_dagruns.return_value = dagruns
 
-                    job._run_scheduler_loop()
+                    self.scheduler_job._run_scheduler_loop()
 
     @parameterized.expand(
         [
             # expected, dag_count, task_count, start_ago, schedule_interval, shape
             # One DAG with one task per DAG file
-            ([8, 8, 8, 8], 1, 1, "1d", "None", "no_structure"),  # noqa
-            ([8, 8, 8, 8], 1, 1, "1d", "None", "linear"),  # noqa
-            ([20, 11, 11, 11], 1, 1, "1d", "@once", "no_structure"),  # noqa
-            ([20, 11, 11, 11], 1, 1, "1d", "@once", "linear"),  # noqa
-            ([20, 21, 23, 25], 1, 1, "1d", "30m", "no_structure"),  # noqa
-            ([20, 21, 23, 25], 1, 1, "1d", "30m", "linear"),  # noqa
-            ([20, 21, 23, 25], 1, 1, "1d", "30m", "binary_tree"),  # noqa
-            ([20, 21, 23, 25], 1, 1, "1d", "30m", "star"),  # noqa
-            ([20, 21, 23, 25], 1, 1, "1d", "30m", "grid"),  # noqa
+            ([9, 9, 9, 9], 1, 1, "1d", "None", "no_structure"),  # noqa
+            ([9, 9, 9, 9], 1, 1, "1d", "None", "linear"),  # noqa
+            ([21, 12, 12, 12], 1, 1, "1d", "@once", "no_structure"),  # noqa
+            ([21, 12, 12, 12], 1, 1, "1d", "@once", "linear"),  # noqa
+            ([21, 22, 24, 26], 1, 1, "1d", "30m", "no_structure"),  # noqa
+            ([21, 22, 24, 26], 1, 1, "1d", "30m", "linear"),  # noqa
+            ([21, 22, 24, 26], 1, 1, "1d", "30m", "binary_tree"),  # noqa
+            ([21, 22, 24, 26], 1, 1, "1d", "30m", "star"),  # noqa
+            ([21, 22, 24, 26], 1, 1, "1d", "30m", "grid"),  # noqa
             # One DAG with five tasks per DAG  file
-            ([8, 8, 8, 8], 1, 5, "1d", "None", "no_structure"),  # noqa
-            ([8, 8, 8, 8], 1, 5, "1d", "None", "linear"),  # noqa
-            ([20, 11, 11, 11], 1, 5, "1d", "@once", "no_structure"),  # noqa
-            ([21, 12, 12, 12], 1, 5, "1d", "@once", "linear"),  # noqa
-            ([20, 21, 23, 25], 1, 5, "1d", "30m", "no_structure"),  # noqa
-            ([21, 23, 26, 29], 1, 5, "1d", "30m", "linear"),  # noqa
-            ([21, 23, 26, 29], 1, 5, "1d", "30m", "binary_tree"),  # noqa
-            ([21, 23, 26, 29], 1, 5, "1d", "30m", "star"),  # noqa
-            ([21, 23, 26, 29], 1, 5, "1d", "30m", "grid"),  # noqa
+            ([9, 9, 9, 9], 1, 5, "1d", "None", "no_structure"),  # noqa
+            ([9, 9, 9, 9], 1, 5, "1d", "None", "linear"),  # noqa
+            ([21, 12, 12, 12], 1, 5, "1d", "@once", "no_structure"),  # noqa
+            ([22, 13, 13, 13], 1, 5, "1d", "@once", "linear"),  # noqa
+            ([21, 22, 24, 26], 1, 5, "1d", "30m", "no_structure"),  # noqa
+            ([22, 24, 27, 30], 1, 5, "1d", "30m", "linear"),  # noqa
+            ([22, 24, 27, 30], 1, 5, "1d", "30m", "binary_tree"),  # noqa
+            ([22, 24, 27, 30], 1, 5, "1d", "30m", "star"),  # noqa
+            ([22, 24, 27, 30], 1, 5, "1d", "30m", "grid"),  # noqa
             # 10 DAGs with 10 tasks per DAG file
-            ([8, 8, 8, 8], 10, 10, "1d", "None", "no_structure"),  # noqa
-            ([8, 8, 8, 8], 10, 10, "1d", "None", "linear"),  # noqa
-            ([83, 26, 26, 26], 10, 10, "1d", "@once", "no_structure"),  # noqa
-            ([93, 39, 39, 39], 10, 10, "1d", "@once", "linear"),  # noqa
-            ([83, 87, 87, 87], 10, 10, "1d", "30m", "no_structure"),  # noqa
-            ([93, 113, 113, 113], 10, 10, "1d", "30m", "linear"),  # noqa
-            ([93, 107, 107, 107], 10, 10, "1d", "30m", "binary_tree"),  # noqa
-            ([93, 107, 107, 107], 10, 10, "1d", "30m", "star"),  # noqa
-            ([93, 107, 107, 107], 10, 10, "1d", "30m", "grid"),  # noqa
+            ([9, 9, 9, 9], 10, 10, "1d", "None", "no_structure"),  # noqa
+            ([9, 9, 9, 9], 10, 10, "1d", "None", "linear"),  # noqa
+            ([84, 27, 27, 27], 10, 10, "1d", "@once", "no_structure"),  # noqa
+            ([94, 40, 40, 40], 10, 10, "1d", "@once", "linear"),  # noqa
+            ([84, 88, 88, 88], 10, 10, "1d", "30m", "no_structure"),  # noqa
+            ([94, 114, 114, 114], 10, 10, "1d", "30m", "linear"),  # noqa
+            ([94, 108, 108, 108], 10, 10, "1d", "30m", "binary_tree"),  # noqa
+            ([94, 108, 108, 108], 10, 10, "1d", "30m", "star"),  # noqa
+            ([94, 108, 108, 108], 10, 10, "1d", "30m", "grid"),  # noqa
         ]
     )
-    @pytest.mark.quarantined
     def test_process_dags_queries_count(
         self, expected_query_counts, dag_count, task_count, start_ago, schedule_interval, shape
     ):
@@ -4181,6 +4220,12 @@ class TestSchedulerJobQueriesCount(unittest.TestCase):
             {
                 ('scheduler', 'use_job_schedule'): 'True',
                 ('core', 'store_serialized_dags'): 'True',
+                # For longer running tests under heavy load, the min_serialized_dag_fetch_interval
+                # and min_serialized_dag_update_interval might kick-in and re-retrieve the record.
+                # This will increase the count of serliazied_dag.py.get() count.
+                # That's why we keep the values high
+                ('core', 'min_serialized_dag_update_interval'): '100',
+                ('core', 'min_serialized_dag_fetch_interval'): '100',
             }
         ):
 
@@ -4189,11 +4234,11 @@ class TestSchedulerJobQueriesCount(unittest.TestCase):
 
             mock_agent = mock.MagicMock()
 
-            job = SchedulerJob(subdir=PERF_DAGS_FOLDER, num_runs=1)
-            job.executor = MockExecutor(do_update=False)
-            job.heartbeat = mock.MagicMock()
-            job.processor_agent = mock_agent
+            self.scheduler_job = SchedulerJob(subdir=PERF_DAGS_FOLDER, num_runs=1)
+            self.scheduler_job.executor = MockExecutor(do_update=False)
+            self.scheduler_job.heartbeat = mock.MagicMock()
+            self.scheduler_job.processor_agent = mock_agent
             for expected_query_count in expected_query_counts:
                 with create_session() as session:
                     with assert_queries_count(expected_query_count):
-                        job._do_scheduling(session)
+                        self.scheduler_job._do_scheduling(session)
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index b9ec2c8..43a3756 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1971,8 +1971,8 @@ class TestTaskInstance(unittest.TestCase):
                 for upstream, downstream in dependencies.items():
                     dag.set_dependency(upstream, downstream)
 
-            scheduler = SchedulerJob(subdir=os.devnull)
-            scheduler.dagbag.bag_dag(dag, root_dag=dag)
+            scheduler_job = SchedulerJob(subdir=os.devnull)
+            scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
 
             dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow', state=State.RUNNING)
 
@@ -1999,9 +1999,11 @@ class TestTaskInstance(unittest.TestCase):
             self.validate_ti_states(dag_run, first_run_state, error_message)
 
             if second_run_state:
-                scheduler._critical_section_execute_task_instances(session=session)
+                scheduler_job._critical_section_execute_task_instances(session=session)
                 task_instance_b.run()
                 self.validate_ti_states(dag_run, second_run_state, error_message)
+            if scheduler_job.processor_agent:
+                scheduler_job.processor_agent.end()
 
     def test_set_state_up_for_retry(self):
         dag = DAG('dag', start_date=DEFAULT_DATE)
diff --git a/tests/test_utils/asserts.py b/tests/test_utils/asserts.py
index dccaad1..fe8e7db 100644
--- a/tests/test_utils/asserts.py
+++ b/tests/test_utils/asserts.py
@@ -63,9 +63,8 @@ class CountQueries:
             and __file__ != f.filename
             and ('session.py' not in f.filename and f.name != 'wrapper')
         ]
-        stack_info = ">".join([f"{f.filename.rpartition('/')[-1]}:{f.name}" for f in stack][-3:])
-        lineno = stack[-1].lineno
-        self.result[f"{stack_info}:{lineno}"] += 1
+        stack_info = ">".join([f"{f.filename.rpartition('/')[-1]}:{f.name}:{f.lineno}" for f in stack][-3:])
+        self.result[f"{stack_info}"] += 1
 
 
 count_queries = CountQueries  # pylint: disable=invalid-name