You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/30 16:37:18 UTC
[airflow] branch master updated: Test that DagFileProcessor can
operate against on a Serialized DAG (#8739)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 735bf45 Test that DagFileProcessor can operate against on a Serialized DAG (#8739)
735bf45 is described below
commit 735bf45de7afafe8235be1035a4319da7cd03e1f
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Sat May 30 17:36:53 2020 +0100
Test that DagFileProcessor can operate against on a Serialized DAG (#8739)
As part of the scheduler HA work we are going to want to separate the
parsing from the scheduling, so this changes the tests to ensure that
the important methods of DagFileProcessor can do everything the need to
when given a SerializedDAG, not just a DAG. i.e. that we have correctly
serialized all the necessary fields.
---
airflow/models/dagbag.py | 3 +-
airflow/models/serialized_dag.py | 5 +--
tests/jobs/test_scheduler_job.py | 77 ++++++++++++++++++++++++++++++++++---
tests/models/test_serialized_dag.py | 2 -
4 files changed, 74 insertions(+), 13 deletions(-)
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 42fd111..48c0866 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -456,4 +456,5 @@ class DagBag(BaseDagBag, LoggingMixin):
from airflow.models.dag import DAG
from airflow.models.serialized_dag import SerializedDagModel
DAG.bulk_sync_to_db(self.dags.values())
- SerializedDagModel.bulk_sync_to_db(self.dags.values())
+ if self.store_serialized_dags:
+ SerializedDagModel.bulk_sync_to_db(self.dags.values())
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 2e7ac68..bac2c4c 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -30,7 +30,7 @@ from airflow.models.base import ID_LEN, Base
from airflow.models.dag import DAG, DagModel
from airflow.models.dagcode import DagCode
from airflow.serialization.serialized_objects import SerializedDAG
-from airflow.settings import MIN_SERIALIZED_DAG_UPDATE_INTERVAL, STORE_SERIALIZED_DAGS, json
+from airflow.settings import MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime
@@ -205,9 +205,6 @@ class SerializedDagModel(Base):
:type dags: List[airflow.models.dag.DAG]
:return: None
"""
- if not STORE_SERIALIZED_DAGS:
- return
-
for dag in dags:
if not dag.is_subdag:
SerializedDagModel.write_dag(
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 644df90..a8bfd69 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -44,6 +44,7 @@ from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
+from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils import timezone
from airflow.utils.dag_processing import FailureCallbackRequest, SimpleDag, SimpleDagBag
from airflow.utils.dates import days_ago
@@ -122,7 +123,11 @@ class TestDagFileProcessor(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.dagbag = DagBag()
+ # Ensure the DAGs we are looking at from the DB are up-to-date
+ non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False)
+ non_serialized_dagbag.store_serialized_dags = True
+ non_serialized_dagbag.sync_to_db()
+ cls.dagbag = DagBag(store_serialized_dags=True)
def test_dag_file_processor_sla_miss_callback(self):
"""
@@ -415,6 +420,7 @@ class TestDagFileProcessor(unittest.TestCase):
schedule_interval="@once")
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dag.clear()
dr = dag_file_processor.create_dag_run(dag)
self.assertIsNotNone(dr)
@@ -491,6 +497,8 @@ class TestDagFileProcessor(unittest.TestCase):
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
dr = dag_file_processor.create_dag_run(dag)
@@ -537,6 +545,8 @@ class TestDagFileProcessor(unittest.TestCase):
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
dr = dag_file_processor.create_dag_run(dag)
@@ -587,6 +597,8 @@ class TestDagFileProcessor(unittest.TestCase):
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
dr = dag_file_processor.create_dag_run(dag)
@@ -621,6 +633,8 @@ class TestDagFileProcessor(unittest.TestCase):
session.commit()
session.close()
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
@@ -628,9 +642,11 @@ class TestDagFileProcessor(unittest.TestCase):
self.assertIsNotNone(dr)
dr = DagRun.find(run_id=dr.run_id)[0]
+ # Re-create the DAG, but remove the task
dag = DAG(
dag_id='test_scheduler_do_not_schedule_removed_task',
start_date=DEFAULT_DATE)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
mock_list = dag_file_processor._process_task_instances(dag, dag_runs=[dr])
@@ -651,6 +667,8 @@ class TestDagFileProcessor(unittest.TestCase):
session.commit()
session.close()
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
@@ -668,6 +686,8 @@ class TestDagFileProcessor(unittest.TestCase):
with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
+
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear(session=session)
dag.start_date = None
@@ -688,6 +708,8 @@ class TestDagFileProcessor(unittest.TestCase):
session.merge(orm_dag)
session.commit()
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
@@ -724,6 +746,8 @@ class TestDagFileProcessor(unittest.TestCase):
session.commit()
session.close()
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
@@ -737,6 +761,7 @@ class TestDagFileProcessor(unittest.TestCase):
task_id='dummy2',
dag=dag,
owner='airflow')
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dag_file_processor._process_task_instances(dag, dag_runs=[dr])
@@ -763,6 +788,7 @@ class TestDagFileProcessor(unittest.TestCase):
session.commit()
session.close()
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
@@ -791,6 +817,8 @@ class TestDagFileProcessor(unittest.TestCase):
session.merge(orm_dag)
session.commit()
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
@@ -831,6 +859,8 @@ class TestDagFileProcessor(unittest.TestCase):
session.commit()
session.close()
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
@@ -868,6 +898,7 @@ class TestDagFileProcessor(unittest.TestCase):
session.merge(orm_dag)
session.commit()
session.close()
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
@@ -892,8 +923,6 @@ class TestDagFileProcessor(unittest.TestCase):
def test_find_dags_to_run_includes_subdags(self):
dag = self.dagbag.get_dag('test_subdag_operator')
- print(self.dagbag.dag_ids)
- print(self.dagbag.dag_folder)
self.assertGreater(len(dag.subdags), 0)
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dags = dag_file_processor._find_dags_to_process(self.dagbag.dags.values())
@@ -931,7 +960,7 @@ class TestDagFileProcessor(unittest.TestCase):
session.commit()
session.close()
- return dag
+ return SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
now = timezone.utcnow()
six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(
@@ -1002,6 +1031,8 @@ class TestDagFileProcessor(unittest.TestCase):
session.merge(orm_dag)
session.commit()
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dr = dag_file_processor.create_dag_run(dag)
@@ -1022,6 +1053,7 @@ class TestDagFileProcessor(unittest.TestCase):
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
session.commit()
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dag.clear()
@@ -1332,7 +1364,11 @@ class TestSchedulerJob(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.dagbag = DagBag()
+ # Ensure the DAGs we are looking at from the DB are up-to-date
+ non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False)
+ non_serialized_dagbag.store_serialized_dags = True
+ non_serialized_dagbag.sync_to_db()
+ cls.dagbag = DagBag(store_serialized_dags=True)
def test_is_alive(self):
job = SchedulerJob(None, heartrate=10, state=State.RUNNING)
@@ -1454,6 +1490,7 @@ class TestSchedulerJob(unittest.TestCase):
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
task1 = DummyOperator(dag=dag, task_id=task_id_1)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = self._make_simple_dag_bag([dag])
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1485,6 +1522,8 @@ class TestSchedulerJob(unittest.TestCase):
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
task1 = DummyOperator(dag=dag, task_id=task_id_1)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
dagbag = self._make_simple_dag_bag([dag])
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1511,6 +1550,7 @@ class TestSchedulerJob(unittest.TestCase):
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
task1 = DummyOperator(dag=dag, task_id=task_id_1)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = self._make_simple_dag_bag([dag])
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1537,6 +1577,7 @@ class TestSchedulerJob(unittest.TestCase):
task_id_1 = 'dummy'
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
task1 = DummyOperator(dag=dag, task_id=task_id_1)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = self._make_simple_dag_bag([dag])
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1577,6 +1618,7 @@ class TestSchedulerJob(unittest.TestCase):
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
task1 = DummyOperator(dag=dag, task_id=task_id_1, pool='a')
task2 = DummyOperator(dag=dag, task_id=task_id_2, pool='b')
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = self._make_simple_dag_bag([dag])
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1620,6 +1662,7 @@ class TestSchedulerJob(unittest.TestCase):
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
op1 = DummyOperator(dag=dag, task_id='dummy1')
op2 = DummyOperator(dag=dag, task_id='dummy2')
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = self._make_simple_dag_bag([dag])
executor = MockExecutor(do_update=True)
@@ -1661,6 +1704,7 @@ class TestSchedulerJob(unittest.TestCase):
task_id = 'dummy_wrong_pool'
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
task = DummyOperator(dag=dag, task_id=task_id, pool="this_pool_doesnt_exist")
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = self._make_simple_dag_bag([dag])
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1685,6 +1729,7 @@ class TestSchedulerJob(unittest.TestCase):
task_id_1 = 'dummy'
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
DummyOperator(dag=dag, task_id=task_id_1)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = self._make_simple_dag_bag([dag])
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1703,6 +1748,7 @@ class TestSchedulerJob(unittest.TestCase):
task_id_1 = 'dummy'
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2)
task1 = DummyOperator(dag=dag, task_id=task_id_1)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = self._make_simple_dag_bag([dag])
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1749,6 +1795,7 @@ class TestSchedulerJob(unittest.TestCase):
task1 = DummyOperator(dag=dag, task_id='dummy1')
task2 = DummyOperator(dag=dag, task_id='dummy2')
task3 = DummyOperator(dag=dag, task_id='dummy3')
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = self._make_simple_dag_bag([dag])
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1783,6 +1830,7 @@ class TestSchedulerJob(unittest.TestCase):
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
task1 = DummyOperator(dag=dag, task_id=task_id_1, task_concurrency=2)
task2 = DummyOperator(dag=dag, task_id=task_id_2)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = self._make_simple_dag_bag([dag])
executor = MockExecutor(do_update=True)
@@ -1877,6 +1925,7 @@ class TestSchedulerJob(unittest.TestCase):
task_id_1 = 'dummy'
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2)
task1 = DummyOperator(dag=dag, task_id=task_id_1)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
self._make_simple_dag_bag([dag])
scheduler = SchedulerJob()
@@ -1909,6 +1958,7 @@ class TestSchedulerJob(unittest.TestCase):
task_id_1 = 'dummy'
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
task1 = DummyOperator(dag=dag, task_id=task_id_1)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = self._make_simple_dag_bag([dag])
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1931,6 +1981,7 @@ class TestSchedulerJob(unittest.TestCase):
task_id_1 = 'dummy'
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2)
task1 = DummyOperator(dag=dag, task_id=task_id_1)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = SimpleDagBag([])
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -1956,6 +2007,7 @@ class TestSchedulerJob(unittest.TestCase):
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=3)
task1 = DummyOperator(dag=dag, task_id=task_id_1)
task2 = DummyOperator(dag=dag, task_id=task_id_2)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = self._make_simple_dag_bag([dag])
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -2026,6 +2078,7 @@ class TestSchedulerJob(unittest.TestCase):
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
task1 = DummyOperator(dag=dag, task_id=task_id_1)
task2 = DummyOperator(dag=dag, task_id=task_id_2)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dagbag = self._make_simple_dag_bag([dag])
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
@@ -2062,15 +2115,20 @@ class TestSchedulerJob(unittest.TestCase):
DummyOperator(task_id='dummy', dag=dag1, owner='airflow')
DummyOperator(task_id='dummy_b', dag=dag1, owner='airflow')
+ dag1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag1))
dag2 = DAG(dag_id='test_change_state_for_tis_without_dagrun_dont_change', start_date=DEFAULT_DATE)
DummyOperator(task_id='dummy', dag=dag2, owner='airflow')
+ dag2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag2))
+
dag3 = DAG(dag_id='test_change_state_for_tis_without_dagrun_no_dagrun', start_date=DEFAULT_DATE)
DummyOperator(task_id='dummy', dag=dag3, owner='airflow')
+ dag3 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag3))
+
session = settings.Session()
dr1 = dag1.create_dagrun(run_id=DagRunType.SCHEDULED.value,
state=State.RUNNING,
@@ -2154,6 +2212,7 @@ class TestSchedulerJob(unittest.TestCase):
task_id='task_id',
dag=dag,
owner='airflow')
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
# If there's no left over task in executor.queued_tasks, nothing happens
session = settings.Session()
@@ -2202,6 +2261,7 @@ class TestSchedulerJob(unittest.TestCase):
with dag:
op1 = DummyOperator(task_id='op1')
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dag.clear()
dr = dag.create_dagrun(run_id=f"{DagRunType.SCHEDULED.value}__",
@@ -2250,6 +2310,7 @@ class TestSchedulerJob(unittest.TestCase):
with dag:
op1 = DummyOperator(task_id='op1')
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
# Create DAG run with FAILED state
dag.clear()
@@ -2311,6 +2372,7 @@ class TestSchedulerJob(unittest.TestCase):
self.null_exec.mock_task_fail(dag_id, tid, ex_date)
try:
+ # This needs a _REAL_ dag, not the serialized version
dag.run(start_date=ex_date, end_date=ex_date, executor=self.null_exec, **run_kwargs)
except AirflowException:
pass
@@ -2452,7 +2514,7 @@ class TestSchedulerJob(unittest.TestCase):
dag_id = 'test_start_date_scheduling'
dag = self.dagbag.get_dag(dag_id)
dag.clear()
- self.assertTrue(dag.start_date > datetime.datetime.utcnow())
+ self.assertGreater(dag.start_date, datetime.datetime.utcnow())
scheduler = SchedulerJob(dag_id,
executor=self.null_exec,
@@ -2592,6 +2654,8 @@ class TestSchedulerJob(unittest.TestCase):
session.merge(orm_dag)
session.commit()
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
scheduler = SchedulerJob(executor=self.null_exec)
@@ -2643,6 +2707,7 @@ class TestSchedulerJob(unittest.TestCase):
bash_command='echo 1',
)
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dag.clear()
dag.is_subdag = False
diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py
index a578307..fe96d73 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -19,7 +19,6 @@
"""Unit tests for SerializedDagModel."""
import unittest
-from unittest import mock
from airflow import DAG, example_dags as example_dags_module
from airflow.models import DagBag
@@ -117,7 +116,6 @@ class SerializedDagModelTest(unittest.TestCase):
self.assertFalse(SDM.has_dag(stale_dag.dag_id))
self.assertTrue(SDM.has_dag(fresh_dag.dag_id))
- @mock.patch('airflow.models.serialized_dag.STORE_SERIALIZED_DAGS', True)
def test_bulk_sync_to_db(self):
dags = [
DAG("dag_1"), DAG("dag_2"), DAG("dag_3"),