You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/03/14 08:15:45 UTC
[6/6] incubator-airflow git commit: [AIRFLOW-2203] Defer cycle
detection
[AIRFLOW-2203] Defer cycle detection
Moved from adding_task to when dag is being bagged.
This changes import dag runtime from polynomial to somewhat linear.
Closes #3116 from wongwill86:dag_import_speed
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c3730650
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c3730650
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c3730650
Branch: refs/heads/master
Commit: c3730650c852cd7a5e06a5933f5064bbb04e0e88
Parents: 6f0a0d2
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 16:53:01 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:13:59 2018 +0100
----------------------------------------------------------------------
airflow/exceptions.py | 4 +
airflow/models.py | 111 ++++++++---
tests/core.py | 16 --
tests/models.py | 450 ++++++++++++++++++++++++++++++++++++++++++++-
4 files changed, 533 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/airflow/exceptions.py
----------------------------------------------------------------------
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 90d3e22..c1b728c 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -34,3 +34,7 @@ class AirflowTaskTimeout(AirflowException):
class AirflowSkipException(AirflowException):
pass
+
+
+class AirflowDagCycleException(AirflowException):
+ pass
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 8931ac6..c1b608a 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -22,7 +22,7 @@ from future.standard_library import install_aliases
from builtins import str
from builtins import object, bytes
import copy
-from collections import namedtuple
+from collections import namedtuple, defaultdict
from datetime import timedelta
import dill
@@ -63,7 +63,9 @@ import six
from airflow import settings, utils
from airflow.executors import GetDefaultExecutor, LocalExecutor
from airflow import configuration
-from airflow.exceptions import AirflowException, AirflowSkipException, AirflowTaskTimeout
+from airflow.exceptions import (
+ AirflowDagCycleException, AirflowException, AirflowSkipException, AirflowTaskTimeout
+)
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
@@ -122,7 +124,6 @@ else:
# Used by DAG context_managers
_CONTEXT_MANAGER_DAG = None
-
def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
"""
Clears a set of task instances, but makes sure the running ones
@@ -183,6 +184,11 @@ class DagBag(BaseDagBag, LoggingMixin):
:type include_examples: bool
"""
+ # static class variables to detetct dag cycle
+ CYCLE_NEW = 0
+ CYCLE_IN_PROGRESS = 1
+ CYCLE_DONE = 2
+
def __init__(
self,
dag_folder=None,
@@ -335,10 +341,17 @@ class DagBag(BaseDagBag, LoggingMixin):
dag.full_filepath = filepath
if dag.fileloc != filepath:
dag.fileloc = filepath
- dag.is_subdag = False
- self.bag_dag(dag, parent_dag=dag, root_dag=dag)
- found_dags.append(dag)
- found_dags += dag.subdags
+ try:
+ dag.is_subdag = False
+ self.bag_dag(dag, parent_dag=dag, root_dag=dag)
+ found_dags.append(dag)
+ found_dags += dag.subdags
+ except AirflowDagCycleException as cycle_exception:
+ self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
+ self.import_errors[dag.full_filepath] = str(cycle_exception)
+ self.file_last_changed[dag.full_filepath] = \
+ file_last_changed_on_disk
+
self.file_last_changed[filepath] = file_last_changed_on_disk
return found_dags
@@ -381,20 +394,39 @@ class DagBag(BaseDagBag, LoggingMixin):
def bag_dag(self, dag, parent_dag, root_dag):
"""
Adds the DAG into the bag, recurses into sub dags.
+ Throws AirflowDagCycleException if a cycle is detected in this dag or its subdags
"""
- self.dags[dag.dag_id] = dag
+
+ dag.test_cycle() # throws if a task cycle is found
+
dag.resolve_template_files()
dag.last_loaded = timezone.utcnow()
for task in dag.tasks:
settings.policy(task)
- for subdag in dag.subdags:
- subdag.full_filepath = dag.full_filepath
- subdag.parent_dag = dag
- subdag.is_subdag = True
- self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
- self.log.debug('Loaded DAG {dag}'.format(**locals()))
+ subdags = dag.subdags
+
+ try:
+ for subdag in subdags:
+ subdag.full_filepath = dag.full_filepath
+ subdag.parent_dag = dag
+ subdag.is_subdag = True
+ self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
+
+ self.dags[dag.dag_id] = dag
+ self.log.debug('Loaded DAG {dag}'.format(**locals()))
+ except AirflowDagCycleException as cycle_exception:
+ # There was an error in bagging the dag. Remove it from the list of dags
+ self.log.exception('Exception bagging dag: {dag.dag_id}'.format(**locals()))
+ # Only necessary at the root level since DAG.subdags automatically
+ # performs DFS to search through all subdags
+ if dag == root_dag:
+ for subdag in subdags:
+ if subdag.dag_id in self.dags:
+ del self.dags[subdag.dag_id]
+ raise cycle_exception
+
def collect_dags(
self,
@@ -2699,21 +2731,6 @@ class BaseOperator(LoggingMixin):
return list(map(lambda task_id: self._dag.task_dict[task_id],
self.get_flat_relative_ids(upstream)))
- def detect_downstream_cycle(self, task=None):
- """
- When invoked, this routine will raise an exception if a cycle is
- detected downstream from self. It is invoked when tasks are added to
- the DAG to detect cycles.
- """
- if not task:
- task = self
- for t in self.get_direct_relatives():
- if task is t:
- msg = "Cycle detected in DAG. Faulty task: {0}".format(task)
- raise AirflowException(msg)
- else:
- t.detect_downstream_cycle(task=task)
- return False
def run(
self,
@@ -4074,6 +4091,42 @@ class DAG(BaseDag, LoggingMixin):
qry = qry.filter(TaskInstance.state.in_(states))
return qry.scalar()
+ def test_cycle(self):
+ '''
+ Check to see if there are any cycles in the DAG. Returns False if no cycle found,
+ otherwise raises exception.
+ '''
+
+ # default of int is 0 which corresponds to CYCLE_NEW
+ visit_map = defaultdict(int)
+ for task_id in self.task_dict.keys():
+ # print('starting %s' % task_id)
+ if visit_map[task_id] == DagBag.CYCLE_NEW:
+ self._test_cycle_helper(visit_map, task_id)
+ return False
+
+ def _test_cycle_helper(self, visit_map, task_id):
+ '''
+ Checks if a cycle exists from the input task using DFS traversal
+ '''
+
+ # print('Inspecting %s' % task_id)
+ if visit_map[task_id] == DagBag.CYCLE_DONE:
+ return False
+
+ visit_map[task_id] = DagBag.CYCLE_IN_PROGRESS
+
+ task = self.task_dict[task_id]
+ for descendant_id in task.get_direct_relative_ids():
+ if visit_map[descendant_id] == DagBag.CYCLE_IN_PROGRESS:
+ msg = "Cycle detected in DAG. Faulty task: {0} to {1}".format(
+ task_id, descendant_id)
+ raise AirflowDagCycleException(msg)
+ else:
+ self._test_cycle_helper(visit_map, descendant_id)
+
+ visit_map[task_id] = DagBag.CYCLE_DONE
+
class Chart(Base):
__tablename__ = "chart"
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 03372db..ce5fb7a 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -781,22 +781,6 @@ class CoreTest(unittest.TestCase):
with self.assertRaisesRegexp(AirflowException, regexp):
self.run_after_loop.set_upstream(self.runme_0)
- def test_cyclic_dependencies_1(self):
-
- regexp = "Cycle detected in DAG. (.*)runme_0(.*)"
- with self.assertRaisesRegexp(AirflowException, regexp):
- self.runme_0.set_upstream(self.run_after_loop)
-
- def test_cyclic_dependencies_2(self):
- regexp = "Cycle detected in DAG. (.*)run_after_loop(.*)"
- with self.assertRaisesRegexp(AirflowException, regexp):
- self.run_after_loop.set_downstream(self.runme_0)
-
- def test_cyclic_dependencies_3(self):
- regexp = "Cycle detected in DAG. (.*)run_this_last(.*)"
- with self.assertRaisesRegexp(AirflowException, regexp):
- self.run_this_last.set_downstream(self.runme_0)
-
def test_bad_trigger_rule(self):
with self.assertRaises(AirflowException):
DummyOperator(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index c8ee037..5d8184c 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -26,9 +26,11 @@ import time
import six
import re
import urllib
+import textwrap
+import inspect
from airflow import configuration, models, settings, AirflowException
-from airflow.exceptions import AirflowSkipException
+from airflow.exceptions import AirflowDagCycleException, AirflowSkipException
from airflow.jobs import BackfillJob
from airflow.models import DAG, TaskInstance as TI
from airflow.models import State as ST
@@ -47,7 +49,7 @@ from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
from mock import patch
from parameterized import parameterized
-
+from tempfile import NamedTemporaryFile
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
TEST_DAGS_FOLDER = os.path.join(
@@ -388,6 +390,132 @@ class DagTest(unittest.TestCase):
result = task.render_template('', "{{ 'world' | hello}}", dict())
self.assertEqual(result, 'Hello world')
+ def test_cycle(self):
+ # test empty
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ self.assertFalse(dag.test_cycle())
+
+ # test single task
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ with dag:
+ opA = DummyOperator(task_id='A')
+
+ self.assertFalse(dag.test_cycle())
+
+ # test no cycle
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ # A -> B -> C
+ # B -> D
+ # E -> F
+ with dag:
+ opA = DummyOperator(task_id='A')
+ opB = DummyOperator(task_id='B')
+ opC = DummyOperator(task_id='C')
+ opD = DummyOperator(task_id='D')
+ opE = DummyOperator(task_id='E')
+ opF = DummyOperator(task_id='F')
+ opA.set_downstream(opB)
+ opB.set_downstream(opC)
+ opB.set_downstream(opD)
+ opE.set_downstream(opF)
+
+ self.assertFalse(dag.test_cycle())
+
+ # test self loop
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ # A -> A
+ with dag:
+ opA = DummyOperator(task_id='A')
+ opA.set_downstream(opA)
+
+ with self.assertRaises(AirflowDagCycleException):
+ dag.test_cycle()
+
+ # test downstream self loop
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ # A -> B -> C -> D -> E -> E
+ with dag:
+ opA = DummyOperator(task_id='A')
+ opB = DummyOperator(task_id='B')
+ opC = DummyOperator(task_id='C')
+ opD = DummyOperator(task_id='D')
+ opE = DummyOperator(task_id='E')
+ opA.set_downstream(opB)
+ opB.set_downstream(opC)
+ opC.set_downstream(opD)
+ opD.set_downstream(opE)
+ opE.set_downstream(opE)
+
+ with self.assertRaises(AirflowDagCycleException):
+ dag.test_cycle()
+
+ # large loop
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ # A -> B -> C -> D -> E -> A
+ with dag:
+ opA = DummyOperator(task_id='A')
+ opB = DummyOperator(task_id='B')
+ opC = DummyOperator(task_id='C')
+ opD = DummyOperator(task_id='D')
+ opE = DummyOperator(task_id='E')
+ opA.set_downstream(opB)
+ opB.set_downstream(opC)
+ opC.set_downstream(opD)
+ opD.set_downstream(opE)
+ opE.set_downstream(opA)
+
+ with self.assertRaises(AirflowDagCycleException):
+ dag.test_cycle()
+
+ # test arbitrary loop
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ # E-> A -> B -> F -> A
+ # -> C -> F
+ with dag:
+ opA = DummyOperator(task_id='A')
+ opB = DummyOperator(task_id='B')
+ opC = DummyOperator(task_id='C')
+ opD = DummyOperator(task_id='D')
+ opE = DummyOperator(task_id='E')
+ opF = DummyOperator(task_id='F')
+ opA.set_downstream(opB)
+ opA.set_downstream(opC)
+ opE.set_downstream(opA)
+ opC.set_downstream(opF)
+ opB.set_downstream(opF)
+ opF.set_downstream(opA)
+
+ with self.assertRaises(AirflowDagCycleException):
+ dag.test_cycle()
+
class DagStatTest(unittest.TestCase):
def test_dagstats_crud(self):
@@ -796,7 +924,6 @@ class DagBagTest(unittest.TestCase):
"""
test that we're able to parse file that contains multi-byte char
"""
- from tempfile import NamedTemporaryFile
f = NamedTemporaryFile()
f.write('\u3042'.encode('utf8')) # write multi-byte char (hiragana)
f.flush()
@@ -857,6 +984,323 @@ class DagBagTest(unittest.TestCase):
self.assertTrue(
dag.fileloc.endswith('airflow/example_dags/' + path))
+ def process_dag(self, create_dag):
+ """
+ Helper method to process a file generated from the input create_dag function.
+ """
+ # write source to file
+ source = textwrap.dedent(''.join(
+ inspect.getsource(create_dag).splitlines(True)[1:-1]))
+ f = NamedTemporaryFile()
+ f.write(source.encode('utf8'))
+ f.flush()
+
+ dagbag = models.DagBag(include_examples=False)
+ found_dags = dagbag.process_file(f.name)
+ return (dagbag, found_dags, f.name)
+
+ def validate_dags(self, expected_parent_dag, actual_found_dags, actual_dagbag,
+ should_be_found=True):
+ expected_dag_ids = list(map(lambda dag: dag.dag_id, expected_parent_dag.subdags))
+ expected_dag_ids.append(expected_parent_dag.dag_id)
+
+ actual_found_dag_ids = list(map(lambda dag: dag.dag_id, actual_found_dags))
+
+ for dag_id in expected_dag_ids:
+ actual_dagbag.log.info('validating %s' % dag_id)
+ self.assertEquals(
+ dag_id in actual_found_dag_ids, should_be_found,
+ 'dag "%s" should %shave been found after processing dag "%s"' %
+ (dag_id, '' if should_be_found else 'not ', expected_parent_dag.dag_id)
+ )
+ self.assertEquals(
+ dag_id in actual_dagbag.dags, should_be_found,
+ 'dag "%s" should %sbe in dagbag.dags after processing dag "%s"' %
+ (dag_id, '' if should_be_found else 'not ', expected_parent_dag.dag_id)
+ )
+
+ def test_load_subdags(self):
+ # Define Dag to load
+ def standard_subdag():
+ from airflow.models import DAG
+ from airflow.operators.dummy_operator import DummyOperator
+ from airflow.operators.subdag_operator import SubDagOperator
+ import datetime
+ DAG_NAME = 'master'
+ DEFAULT_ARGS = {
+ 'owner': 'owner1',
+ 'start_date': datetime.datetime(2016, 1, 1)
+ }
+ dag = DAG(
+ DAG_NAME,
+ default_args=DEFAULT_ARGS)
+
+ # master:
+ # A -> opSubDag_0
+ # master.opsubdag_0:
+ # -> subdag_0.task
+ # A -> opSubDag_1
+ # master.opsubdag_1:
+ # -> subdag_1.task
+
+ with dag:
+ def subdag_0():
+ subdag_0 = DAG('master.opSubdag_0', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_0.task', dag=subdag_0)
+ return subdag_0
+
+ def subdag_1():
+ subdag_1 = DAG('master.opSubdag_1', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_1.task', dag=subdag_1)
+ return subdag_1
+
+ opSubdag_0 = SubDagOperator(
+ task_id='opSubdag_0', dag=dag, subdag=subdag_0())
+ opSubdag_1 = SubDagOperator(
+ task_id='opSubdag_1', dag=dag, subdag=subdag_1())
+
+ opA = DummyOperator(task_id='A')
+ opA.set_downstream(opSubdag_0)
+ opA.set_downstream(opSubdag_1)
+ return dag
+
+ testDag = standard_subdag()
+ # sanity check to make sure DAG.subdag is still functioning properly
+ self.assertEqual(len(testDag.subdags), 2)
+
+ # Perform processing dag
+ dagbag, found_dags, _ = self.process_dag(standard_subdag)
+
+ # Validate correctness
+ # all dags from testDag should be listed
+ self.validate_dags(testDag, found_dags, dagbag)
+
+ # Define Dag to load
+ def nested_subdags():
+ from airflow.models import DAG
+ from airflow.operators.dummy_operator import DummyOperator
+ from airflow.operators.subdag_operator import SubDagOperator
+ import datetime
+ DAG_NAME = 'master'
+ DEFAULT_ARGS = {
+ 'owner': 'owner1',
+ 'start_date': datetime.datetime(2016, 1, 1)
+ }
+ dag = DAG(
+ DAG_NAME,
+ default_args=DEFAULT_ARGS)
+
+ # master:
+ # A -> opSubdag_0
+ # master.opSubdag_0:
+ # -> opSubDag_A
+ # master.opSubdag_0.opSubdag_A:
+ # -> subdag_A.task
+ # -> opSubdag_B
+ # master.opSubdag_0.opSubdag_B:
+ # -> subdag_B.task
+ # A -> opSubdag_1
+ # master.opSubdag_1:
+ # -> opSubdag_C
+ # master.opSubdag_1.opSubdag_C:
+ # -> subdag_C.task
+ # -> opSubDag_D
+ # master.opSubdag_1.opSubdag_D:
+ # -> subdag_D.task
+
+ with dag:
+ def subdag_A():
+ subdag_A = DAG(
+ 'master.opSubdag_0.opSubdag_A', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_A.task', dag=subdag_A)
+ return subdag_A
+
+ def subdag_B():
+ subdag_B = DAG(
+ 'master.opSubdag_0.opSubdag_B', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_B.task', dag=subdag_B)
+ return subdag_B
+
+ def subdag_C():
+ subdag_C = DAG(
+ 'master.opSubdag_1.opSubdag_C', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_C.task', dag=subdag_C)
+ return subdag_C
+
+ def subdag_D():
+ subdag_D = DAG(
+ 'master.opSubdag_1.opSubdag_D', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_D.task', dag=subdag_D)
+ return subdag_D
+
+ def subdag_0():
+ subdag_0 = DAG('master.opSubdag_0', default_args=DEFAULT_ARGS)
+ SubDagOperator(task_id='opSubdag_A', dag=subdag_0, subdag=subdag_A())
+ SubDagOperator(task_id='opSubdag_B', dag=subdag_0, subdag=subdag_B())
+ return subdag_0
+
+ def subdag_1():
+ subdag_1 = DAG('master.opSubdag_1', default_args=DEFAULT_ARGS)
+ SubDagOperator(task_id='opSubdag_C', dag=subdag_1, subdag=subdag_C())
+ SubDagOperator(task_id='opSubdag_D', dag=subdag_1, subdag=subdag_D())
+ return subdag_1
+
+ opSubdag_0 = SubDagOperator(
+ task_id='opSubdag_0', dag=dag, subdag=subdag_0())
+ opSubdag_1 = SubDagOperator(
+ task_id='opSubdag_1', dag=dag, subdag=subdag_1())
+
+ opA = DummyOperator(task_id='A')
+ opA.set_downstream(opSubdag_0)
+ opA.set_downstream(opSubdag_1)
+
+ return dag
+
+ testDag = nested_subdags()
+ # sanity check to make sure DAG.subdag is still functioning properly
+ self.assertEqual(len(testDag.subdags), 6)
+
+ # Perform processing dag
+ dagbag, found_dags, _ = self.process_dag(nested_subdags)
+
+ # Validate correctness
+ # all dags from testDag should be listed
+ self.validate_dags(testDag, found_dags, dagbag)
+
+ def test_skip_cycle_dags(self):
+ """
+ Don't crash when loading an invalid (contains a cycle) DAG file.
+ Don't load the dag into the DagBag either
+ """
+ # Define Dag to load
+ def basic_cycle():
+ from airflow.models import DAG
+ from airflow.operators.dummy_operator import DummyOperator
+ import datetime
+ DAG_NAME = 'cycle_dag'
+ DEFAULT_ARGS = {
+ 'owner': 'owner1',
+ 'start_date': datetime.datetime(2016, 1, 1)
+ }
+ dag = DAG(
+ DAG_NAME,
+ default_args=DEFAULT_ARGS)
+
+ # A -> A
+ with dag:
+ opA = DummyOperator(task_id='A')
+ opA.set_downstream(opA)
+
+ return dag
+
+ testDag = basic_cycle()
+ # sanity check to make sure DAG.subdag is still functioning properly
+ self.assertEqual(len(testDag.subdags), 0)
+
+ # Perform processing dag
+ dagbag, found_dags, file_path = self.process_dag(basic_cycle)
+
+ # #Validate correctness
+ # None of the dags should be found
+ self.validate_dags(testDag, found_dags, dagbag, should_be_found=False)
+ self.assertIn(file_path, dagbag.import_errors)
+
+ # Define Dag to load
+ def nested_subdag_cycle():
+ from airflow.models import DAG
+ from airflow.operators.dummy_operator import DummyOperator
+ from airflow.operators.subdag_operator import SubDagOperator
+ import datetime
+ DAG_NAME = 'nested_cycle'
+ DEFAULT_ARGS = {
+ 'owner': 'owner1',
+ 'start_date': datetime.datetime(2016, 1, 1)
+ }
+ dag = DAG(
+ DAG_NAME,
+ default_args=DEFAULT_ARGS)
+
+ # cycle:
+ # A -> opSubdag_0
+ # cycle.opSubdag_0:
+ # -> opSubDag_A
+ # cycle.opSubdag_0.opSubdag_A:
+ # -> subdag_A.task
+ # -> opSubdag_B
+ # cycle.opSubdag_0.opSubdag_B:
+ # -> subdag_B.task
+ # A -> opSubdag_1
+ # cycle.opSubdag_1:
+ # -> opSubdag_C
+ # cycle.opSubdag_1.opSubdag_C:
+ # -> subdag_C.task -> subdag_C.task >Invalid Loop<
+ # -> opSubDag_D
+ # cycle.opSubdag_1.opSubdag_D:
+ # -> subdag_D.task
+
+ with dag:
+ def subdag_A():
+ subdag_A = DAG(
+ 'nested_cycle.opSubdag_0.opSubdag_A', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_A.task', dag=subdag_A)
+ return subdag_A
+
+ def subdag_B():
+ subdag_B = DAG(
+ 'nested_cycle.opSubdag_0.opSubdag_B', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_B.task', dag=subdag_B)
+ return subdag_B
+
+ def subdag_C():
+ subdag_C = DAG(
+ 'nested_cycle.opSubdag_1.opSubdag_C', default_args=DEFAULT_ARGS)
+ opSubdag_C_task = DummyOperator(
+ task_id='subdag_C.task', dag=subdag_C)
+ # introduce a loop in opSubdag_C
+ opSubdag_C_task.set_downstream(opSubdag_C_task)
+ return subdag_C
+
+ def subdag_D():
+ subdag_D = DAG(
+ 'nested_cycle.opSubdag_1.opSubdag_D', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_D.task', dag=subdag_D)
+ return subdag_D
+
+ def subdag_0():
+ subdag_0 = DAG('nested_cycle.opSubdag_0', default_args=DEFAULT_ARGS)
+ SubDagOperator(task_id='opSubdag_A', dag=subdag_0, subdag=subdag_A())
+ SubDagOperator(task_id='opSubdag_B', dag=subdag_0, subdag=subdag_B())
+ return subdag_0
+
+ def subdag_1():
+ subdag_1 = DAG('nested_cycle.opSubdag_1', default_args=DEFAULT_ARGS)
+ SubDagOperator(task_id='opSubdag_C', dag=subdag_1, subdag=subdag_C())
+ SubDagOperator(task_id='opSubdag_D', dag=subdag_1, subdag=subdag_D())
+ return subdag_1
+
+ opSubdag_0 = SubDagOperator(
+ task_id='opSubdag_0', dag=dag, subdag=subdag_0())
+ opSubdag_1 = SubDagOperator(
+ task_id='opSubdag_1', dag=dag, subdag=subdag_1())
+
+ opA = DummyOperator(task_id='A')
+ opA.set_downstream(opSubdag_0)
+ opA.set_downstream(opSubdag_1)
+
+ return dag
+
+ testDag = nested_subdag_cycle()
+ # sanity check to make sure DAG.subdag is still functioning properly
+ self.assertEqual(len(testDag.subdags), 6)
+
+ # Perform processing dag
+ dagbag, found_dags, file_path = self.process_dag(nested_subdag_cycle)
+
+ # Validate correctness
+ # None of the dags should be found
+ self.validate_dags(testDag, found_dags, dagbag, should_be_found=False)
+ self.assertIn(file_path, dagbag.import_errors)
+
def test_process_file_with_none(self):
"""
test that process_file can handle Nones