You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/30 16:37:18 UTC

[airflow] branch master updated: Test that DagFileProcessor can operate against on a Serialized DAG (#8739)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 735bf45  Test that DagFileProcessor can operate against on a Serialized DAG (#8739)
735bf45 is described below

commit 735bf45de7afafe8235be1035a4319da7cd03e1f
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Sat May 30 17:36:53 2020 +0100

    Test that DagFileProcessor can operate against on a Serialized DAG (#8739)
    
    As part of the scheduler HA work we are going to want to separate the
    parsing from the scheduling, so this changes the tests to ensure that
    the important methods of DagFileProcessor can do everything the need to
    when given a SerializedDAG, not just a DAG. i.e. that we have correctly
    serialized all the necessary fields.
---
 airflow/models/dagbag.py            |  3 +-
 airflow/models/serialized_dag.py    |  5 +--
 tests/jobs/test_scheduler_job.py    | 77 ++++++++++++++++++++++++++++++++++---
 tests/models/test_serialized_dag.py |  2 -
 4 files changed, 74 insertions(+), 13 deletions(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 42fd111..48c0866 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -456,4 +456,5 @@ class DagBag(BaseDagBag, LoggingMixin):
         from airflow.models.dag import DAG
         from airflow.models.serialized_dag import SerializedDagModel
         DAG.bulk_sync_to_db(self.dags.values())
-        SerializedDagModel.bulk_sync_to_db(self.dags.values())
+        if self.store_serialized_dags:
+            SerializedDagModel.bulk_sync_to_db(self.dags.values())
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 2e7ac68..bac2c4c 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -30,7 +30,7 @@ from airflow.models.base import ID_LEN, Base
 from airflow.models.dag import DAG, DagModel
 from airflow.models.dagcode import DagCode
 from airflow.serialization.serialized_objects import SerializedDAG
-from airflow.settings import MIN_SERIALIZED_DAG_UPDATE_INTERVAL, STORE_SERIALIZED_DAGS, json
+from airflow.settings import MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json
 from airflow.utils import timezone
 from airflow.utils.session import provide_session
 from airflow.utils.sqlalchemy import UtcDateTime
@@ -205,9 +205,6 @@ class SerializedDagModel(Base):
         :type dags: List[airflow.models.dag.DAG]
         :return: None
         """
-        if not STORE_SERIALIZED_DAGS:
-            return
-
         for dag in dags:
             if not dag.is_subdag:
                 SerializedDagModel.write_dag(
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 644df90..a8bfd69 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -44,6 +44,7 @@ from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import SimpleTaskInstance
 from airflow.operators.bash import BashOperator
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.utils import timezone
 from airflow.utils.dag_processing import FailureCallbackRequest, SimpleDag, SimpleDagBag
 from airflow.utils.dates import days_ago
@@ -122,7 +123,11 @@ class TestDagFileProcessor(unittest.TestCase):
 
     @classmethod
     def setUpClass(cls):
-        cls.dagbag = DagBag()
+        # Ensure the DAGs we are looking at from the DB are up-to-date
+        non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False)
+        non_serialized_dagbag.store_serialized_dags = True
+        non_serialized_dagbag.sync_to_db()
+        cls.dagbag = DagBag(store_serialized_dags=True)
 
     def test_dag_file_processor_sla_miss_callback(self):
         """
@@ -415,6 +420,7 @@ class TestDagFileProcessor(unittest.TestCase):
             schedule_interval="@once")
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dag.clear()
         dr = dag_file_processor.create_dag_run(dag)
         self.assertIsNotNone(dr)
@@ -491,6 +497,8 @@ class TestDagFileProcessor(unittest.TestCase):
             orm_dag = DagModel(dag_id=dag.dag_id)
             session.merge(orm_dag)
 
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
         dag.clear()
         dr = dag_file_processor.create_dag_run(dag)
@@ -537,6 +545,8 @@ class TestDagFileProcessor(unittest.TestCase):
             orm_dag = DagModel(dag_id=dag.dag_id)
             session.merge(orm_dag)
 
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
         dag.clear()
         dr = dag_file_processor.create_dag_run(dag)
@@ -587,6 +597,8 @@ class TestDagFileProcessor(unittest.TestCase):
             orm_dag = DagModel(dag_id=dag.dag_id)
             session.merge(orm_dag)
 
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
         dag.clear()
         dr = dag_file_processor.create_dag_run(dag)
@@ -621,6 +633,8 @@ class TestDagFileProcessor(unittest.TestCase):
         session.commit()
         session.close()
 
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
         dag.clear()
 
@@ -628,9 +642,11 @@ class TestDagFileProcessor(unittest.TestCase):
         self.assertIsNotNone(dr)
 
         dr = DagRun.find(run_id=dr.run_id)[0]
+        # Re-create the DAG, but remove the task
         dag = DAG(
             dag_id='test_scheduler_do_not_schedule_removed_task',
             start_date=DEFAULT_DATE)
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         mock_list = dag_file_processor._process_task_instances(dag, dag_runs=[dr])
 
@@ -651,6 +667,8 @@ class TestDagFileProcessor(unittest.TestCase):
         session.commit()
         session.close()
 
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
         dag.clear()
 
@@ -668,6 +686,8 @@ class TestDagFileProcessor(unittest.TestCase):
         with create_session() as session:
             orm_dag = DagModel(dag_id=dag.dag_id)
             session.merge(orm_dag)
+
+            dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
             dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
             dag.clear(session=session)
             dag.start_date = None
@@ -688,6 +708,8 @@ class TestDagFileProcessor(unittest.TestCase):
         session.merge(orm_dag)
         session.commit()
 
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
         dag.clear()
 
@@ -724,6 +746,8 @@ class TestDagFileProcessor(unittest.TestCase):
         session.commit()
         session.close()
 
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
         dag.clear()
 
@@ -737,6 +761,7 @@ class TestDagFileProcessor(unittest.TestCase):
             task_id='dummy2',
             dag=dag,
             owner='airflow')
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         dag_file_processor._process_task_instances(dag, dag_runs=[dr])
 
@@ -763,6 +788,7 @@ class TestDagFileProcessor(unittest.TestCase):
         session.commit()
         session.close()
 
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
         dag.clear()
 
@@ -791,6 +817,8 @@ class TestDagFileProcessor(unittest.TestCase):
         session.merge(orm_dag)
         session.commit()
 
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
         dag.clear()
 
@@ -831,6 +859,8 @@ class TestDagFileProcessor(unittest.TestCase):
         session.commit()
         session.close()
 
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
         dag.clear()
 
@@ -868,6 +898,7 @@ class TestDagFileProcessor(unittest.TestCase):
         session.merge(orm_dag)
         session.commit()
         session.close()
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
         dag.clear()
@@ -892,8 +923,6 @@ class TestDagFileProcessor(unittest.TestCase):
 
     def test_find_dags_to_run_includes_subdags(self):
         dag = self.dagbag.get_dag('test_subdag_operator')
-        print(self.dagbag.dag_ids)
-        print(self.dagbag.dag_folder)
         self.assertGreater(len(dag.subdags), 0)
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
         dags = dag_file_processor._find_dags_to_process(self.dagbag.dags.values())
@@ -931,7 +960,7 @@ class TestDagFileProcessor(unittest.TestCase):
             session.commit()
             session.close()
 
-            return dag
+            return SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         now = timezone.utcnow()
         six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(
@@ -1002,6 +1031,8 @@ class TestDagFileProcessor(unittest.TestCase):
         session.merge(orm_dag)
         session.commit()
 
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
 
         dr = dag_file_processor.create_dag_run(dag)
@@ -1022,6 +1053,7 @@ class TestDagFileProcessor(unittest.TestCase):
         orm_dag = DagModel(dag_id=dag.dag_id)
         session.merge(orm_dag)
         session.commit()
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         dag.clear()
 
@@ -1332,7 +1364,11 @@ class TestSchedulerJob(unittest.TestCase):
 
     @classmethod
     def setUpClass(cls):
-        cls.dagbag = DagBag()
+        # Ensure the DAGs we are looking at from the DB are up-to-date
+        non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False)
+        non_serialized_dagbag.store_serialized_dags = True
+        non_serialized_dagbag.sync_to_db()
+        cls.dagbag = DagBag(store_serialized_dags=True)
 
     def test_is_alive(self):
         job = SchedulerJob(None, heartrate=10, state=State.RUNNING)
@@ -1454,6 +1490,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = self._make_simple_dag_bag([dag])
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1485,6 +1522,8 @@ class TestSchedulerJob(unittest.TestCase):
 
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
         dagbag = self._make_simple_dag_bag([dag])
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1511,6 +1550,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = self._make_simple_dag_bag([dag])
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1537,6 +1577,7 @@ class TestSchedulerJob(unittest.TestCase):
         task_id_1 = 'dummy'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = self._make_simple_dag_bag([dag])
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1577,6 +1618,7 @@ class TestSchedulerJob(unittest.TestCase):
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
         task1 = DummyOperator(dag=dag, task_id=task_id_1, pool='a')
         task2 = DummyOperator(dag=dag, task_id=task_id_2, pool='b')
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = self._make_simple_dag_bag([dag])
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1620,6 +1662,7 @@ class TestSchedulerJob(unittest.TestCase):
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
         op1 = DummyOperator(dag=dag, task_id='dummy1')
         op2 = DummyOperator(dag=dag, task_id='dummy2')
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = self._make_simple_dag_bag([dag])
 
         executor = MockExecutor(do_update=True)
@@ -1661,6 +1704,7 @@ class TestSchedulerJob(unittest.TestCase):
         task_id = 'dummy_wrong_pool'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
         task = DummyOperator(dag=dag, task_id=task_id, pool="this_pool_doesnt_exist")
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = self._make_simple_dag_bag([dag])
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1685,6 +1729,7 @@ class TestSchedulerJob(unittest.TestCase):
         task_id_1 = 'dummy'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
         DummyOperator(dag=dag, task_id=task_id_1)
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = self._make_simple_dag_bag([dag])
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1703,6 +1748,7 @@ class TestSchedulerJob(unittest.TestCase):
         task_id_1 = 'dummy'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2)
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = self._make_simple_dag_bag([dag])
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1749,6 +1795,7 @@ class TestSchedulerJob(unittest.TestCase):
         task1 = DummyOperator(dag=dag, task_id='dummy1')
         task2 = DummyOperator(dag=dag, task_id='dummy2')
         task3 = DummyOperator(dag=dag, task_id='dummy3')
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = self._make_simple_dag_bag([dag])
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1783,6 +1830,7 @@ class TestSchedulerJob(unittest.TestCase):
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
         task1 = DummyOperator(dag=dag, task_id=task_id_1, task_concurrency=2)
         task2 = DummyOperator(dag=dag, task_id=task_id_2)
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = self._make_simple_dag_bag([dag])
 
         executor = MockExecutor(do_update=True)
@@ -1877,6 +1925,7 @@ class TestSchedulerJob(unittest.TestCase):
         task_id_1 = 'dummy'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2)
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         self._make_simple_dag_bag([dag])
 
         scheduler = SchedulerJob()
@@ -1909,6 +1958,7 @@ class TestSchedulerJob(unittest.TestCase):
         task_id_1 = 'dummy'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = self._make_simple_dag_bag([dag])
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1931,6 +1981,7 @@ class TestSchedulerJob(unittest.TestCase):
         task_id_1 = 'dummy'
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2)
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = SimpleDagBag([])
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1956,6 +2007,7 @@ class TestSchedulerJob(unittest.TestCase):
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=3)
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         task2 = DummyOperator(dag=dag, task_id=task_id_2)
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = self._make_simple_dag_bag([dag])
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -2026,6 +2078,7 @@ class TestSchedulerJob(unittest.TestCase):
         dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
         task1 = DummyOperator(dag=dag, task_id=task_id_1)
         task2 = DummyOperator(dag=dag, task_id=task_id_2)
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dagbag = self._make_simple_dag_bag([dag])
 
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -2062,15 +2115,20 @@ class TestSchedulerJob(unittest.TestCase):
         DummyOperator(task_id='dummy', dag=dag1, owner='airflow')
 
         DummyOperator(task_id='dummy_b', dag=dag1, owner='airflow')
+        dag1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag1))
 
         dag2 = DAG(dag_id='test_change_state_for_tis_without_dagrun_dont_change', start_date=DEFAULT_DATE)
 
         DummyOperator(task_id='dummy', dag=dag2, owner='airflow')
 
+        dag2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag2))
+
         dag3 = DAG(dag_id='test_change_state_for_tis_without_dagrun_no_dagrun', start_date=DEFAULT_DATE)
 
         DummyOperator(task_id='dummy', dag=dag3, owner='airflow')
 
+        dag3 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag3))
+
         session = settings.Session()
         dr1 = dag1.create_dagrun(run_id=DagRunType.SCHEDULED.value,
                                  state=State.RUNNING,
@@ -2154,6 +2212,7 @@ class TestSchedulerJob(unittest.TestCase):
             task_id='task_id',
             dag=dag,
             owner='airflow')
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         # If there's no left over task in executor.queued_tasks, nothing happens
         session = settings.Session()
@@ -2202,6 +2261,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         with dag:
             op1 = DummyOperator(task_id='op1')
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         dag.clear()
         dr = dag.create_dagrun(run_id=f"{DagRunType.SCHEDULED.value}__",
@@ -2250,6 +2310,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         with dag:
             op1 = DummyOperator(task_id='op1')
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         # Create DAG run with FAILED state
         dag.clear()
@@ -2311,6 +2372,7 @@ class TestSchedulerJob(unittest.TestCase):
             self.null_exec.mock_task_fail(dag_id, tid, ex_date)
 
         try:
+            # This needs a _REAL_ dag, not the serialized version
             dag.run(start_date=ex_date, end_date=ex_date, executor=self.null_exec, **run_kwargs)
         except AirflowException:
             pass
@@ -2452,7 +2514,7 @@ class TestSchedulerJob(unittest.TestCase):
             dag_id = 'test_start_date_scheduling'
             dag = self.dagbag.get_dag(dag_id)
             dag.clear()
-            self.assertTrue(dag.start_date > datetime.datetime.utcnow())
+            self.assertGreater(dag.start_date, datetime.datetime.utcnow())
 
             scheduler = SchedulerJob(dag_id,
                                      executor=self.null_exec,
@@ -2592,6 +2654,8 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(orm_dag)
         session.commit()
 
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
         scheduler = SchedulerJob(executor=self.null_exec)
 
@@ -2643,6 +2707,7 @@ class TestSchedulerJob(unittest.TestCase):
             bash_command='echo 1',
         )
 
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dag.clear()
         dag.is_subdag = False
 
diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py
index a578307..fe96d73 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -19,7 +19,6 @@
 """Unit tests for SerializedDagModel."""
 
 import unittest
-from unittest import mock
 
 from airflow import DAG, example_dags as example_dags_module
 from airflow.models import DagBag
@@ -117,7 +116,6 @@ class SerializedDagModelTest(unittest.TestCase):
         self.assertFalse(SDM.has_dag(stale_dag.dag_id))
         self.assertTrue(SDM.has_dag(fresh_dag.dag_id))
 
-    @mock.patch('airflow.models.serialized_dag.STORE_SERIALIZED_DAGS', True)
     def test_bulk_sync_to_db(self):
         dags = [
             DAG("dag_1"), DAG("dag_2"), DAG("dag_3"),