You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/03/14 08:15:40 UTC

[1/6] incubator-airflow git commit: [AIRFLOW-2203] Store task ids as sets not lists

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4cf2fba19 -> c3730650c


[AIRFLOW-2203] Store task ids as sets not lists

Massively improve performance by using sets to represent a task's
upstream and downstream task ids.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/781c5bf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/781c5bf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/781c5bf6

Branch: refs/heads/master
Commit: 781c5bf6967473701e4812d72ace4f675f5b5c94
Parents: 4cf2fba
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 17:33:33 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:11:32 2018 +0100

----------------------------------------------------------------------
 airflow/models.py                           | 29 +++++++++++-------------
 tests/ti_deps/deps/test_trigger_rule_dep.py |  2 +-
 2 files changed, 14 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/781c5bf6/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index cf31b07..74a54bb 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2286,8 +2286,8 @@ class BaseOperator(LoggingMixin):
         self.task_concurrency = task_concurrency
 
         # Private attributes
-        self._upstream_task_ids = []
-        self._downstream_task_ids = []
+        self._upstream_task_ids = set()
+        self._downstream_task_ids = set()
 
         if not dag and _CONTEXT_MANAGER_DAG:
             dag = _CONTEXT_MANAGER_DAG
@@ -2771,13 +2771,13 @@ class BaseOperator(LoggingMixin):
     def task_type(self):
         return self.__class__.__name__
 
-    def append_only_new(self, l, item):
-        if any([item is t for t in l]):
+    def add_only_new(self, item_set, item):
+        if item in item_set:
             raise AirflowException(
                 'Dependency {self}, {item} already registered'
                 ''.format(**locals()))
         else:
-            l.append(item)
+            item_set.add(item)
 
     def _set_relatives(self, task_or_task_list, upstream=False):
         try:
@@ -2793,7 +2793,7 @@ class BaseOperator(LoggingMixin):
 
         # relationships can only be set if the tasks share a single DAG. Tasks
         # without a DAG are assigned to that DAG.
-        dags = {t._dag.dag_id: t.dag for t in [self] + task_list if t.has_dag()}
+        dags = {t._dag.dag_id: t._dag for t in [self] + task_list if t.has_dag()}
 
         if len(dags) > 1:
             raise AirflowException(
@@ -2814,13 +2814,11 @@ class BaseOperator(LoggingMixin):
             if dag and not task.has_dag():
                 task.dag = dag
             if upstream:
-                task.append_only_new(task._downstream_task_ids, self.task_id)
-                self.append_only_new(self._upstream_task_ids, task.task_id)
+                task.add_only_new(task._downstream_task_ids, self.task_id)
+                self.add_only_new(self._upstream_task_ids, task.task_id)
             else:
-                self.append_only_new(self._downstream_task_ids, task.task_id)
-                task.append_only_new(task._upstream_task_ids, self.task_id)
-
-        self.detect_downstream_cycle()
+                self.add_only_new(self._downstream_task_ids, task.task_id)
+                task.add_only_new(task._upstream_task_ids, self.task_id)
 
     def set_downstream(self, task_or_task_list):
         """
@@ -3729,10 +3727,9 @@ class DAG(BaseDag, LoggingMixin):
         for t in dag.tasks:
             # Removing upstream/downstream references to tasks that did not
             # made the cut
-            t._upstream_task_ids = [
-                tid for tid in t._upstream_task_ids if tid in dag.task_ids]
-            t._downstream_task_ids = [
-                tid for tid in t._downstream_task_ids if tid in dag.task_ids]
+            t._upstream_task_ids = t._upstream_task_ids.intersection(dag.task_dict.keys())
+            t._downstream_task_ids = t._downstream_task_ids.intersection(
+                dag.task_dict.keys())
 
         if len(dag.tasks) < len(self.tasks):
             dag.partial = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/781c5bf6/tests/ti_deps/deps/test_trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py b/tests/ti_deps/deps/test_trigger_rule_dep.py
index a61ff0d..18595bb 100644
--- a/tests/ti_deps/deps/test_trigger_rule_dep.py
+++ b/tests/ti_deps/deps/test_trigger_rule_dep.py
@@ -28,7 +28,7 @@ class TriggerRuleDepTest(unittest.TestCase):
         task = BaseOperator(task_id='test_task', trigger_rule=trigger_rule,
                             start_date=datetime(2015, 1, 1))
         if upstream_task_ids:
-            task._upstream_task_ids.extend(upstream_task_ids)
+            task._upstream_task_ids.update(upstream_task_ids)
         return TaskInstance(task=task, state=state, execution_date=None)
 
     def test_no_upstream_tasks(self):


[2/6] incubator-airflow git commit: [AIRFLOW-2203] Cache static rules (trigger/weight)

Posted by fo...@apache.org.
[AIRFLOW-2203] Cache static rules (trigger/weight)

No need to recalculate them everytime just to see if they are valid


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/16ab314c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/16ab314c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/16ab314c

Branch: refs/heads/master
Commit: 16ab314c21d131136fb00adcc395d71a6f8b31b2
Parents: 781c5bf
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 17:32:24 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:11:38 2018 +0100

----------------------------------------------------------------------
 airflow/utils/trigger_rule.py    |  9 +++++++--
 airflow/utils/weight_rule.py     |  9 +++++++--
 tests/utils/test_trigger_rule.py | 28 ++++++++++++++++++++++++++++
 tests/utils/test_weight_rule.py  | 25 +++++++++++++++++++++++++
 4 files changed, 67 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16ab314c/airflow/utils/trigger_rule.py
----------------------------------------------------------------------
diff --git a/airflow/utils/trigger_rule.py b/airflow/utils/trigger_rule.py
index f830909..5daaeac 100644
--- a/airflow/utils/trigger_rule.py
+++ b/airflow/utils/trigger_rule.py
@@ -25,12 +25,17 @@ class TriggerRule(object):
     ONE_FAILED = 'one_failed'
     DUMMY = 'dummy'
 
+    _ALL_TRIGGER_RULES = {}
     @classmethod
     def is_valid(cls, trigger_rule):
         return trigger_rule in cls.all_triggers()
 
     @classmethod
     def all_triggers(cls):
-        return [getattr(cls, attr)
+        if not cls._ALL_TRIGGER_RULES:
+            cls._ALL_TRIGGER_RULES = {
+                getattr(cls, attr)
                 for attr in dir(cls)
-                if not attr.startswith("__") and not callable(getattr(cls, attr))]
+                if not attr.startswith("_") and not callable(getattr(cls, attr))
+            }
+        return cls._ALL_TRIGGER_RULES

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16ab314c/airflow/utils/weight_rule.py
----------------------------------------------------------------------
diff --git a/airflow/utils/weight_rule.py b/airflow/utils/weight_rule.py
index fde0d90..51f45a6 100644
--- a/airflow/utils/weight_rule.py
+++ b/airflow/utils/weight_rule.py
@@ -22,12 +22,17 @@ class WeightRule(object):
     UPSTREAM = 'upstream'
     ABSOLUTE = 'absolute'
 
+    _ALL_WEIGHT_RULES = {}
     @classmethod
     def is_valid(cls, weight_rule):
         return weight_rule in cls.all_weight_rules()
 
     @classmethod
     def all_weight_rules(cls):
-        return [getattr(cls, attr)
+        if not cls._ALL_WEIGHT_RULES:
+            cls._ALL_WEIGHT_RULES = {
+                getattr(cls, attr)
                 for attr in dir(cls)
-                if not attr.startswith("__") and not callable(getattr(cls, attr))]
+                if not attr.startswith("_") and not callable(getattr(cls, attr))
+            }
+        return cls._ALL_WEIGHT_RULES

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16ab314c/tests/utils/test_trigger_rule.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_trigger_rule.py b/tests/utils/test_trigger_rule.py
new file mode 100644
index 0000000..f89a56e
--- /dev/null
+++ b/tests/utils/test_trigger_rule.py
@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from airflow.utils.trigger_rule import TriggerRule
+
+
+class TestTriggerRule(unittest.TestCase):
+
+    def test_valid_trigger_rules(self):
+        self.assertTrue(TriggerRule.is_valid(TriggerRule.ALL_SUCCESS))
+        self.assertTrue(TriggerRule.is_valid(TriggerRule.ALL_FAILED))
+        self.assertTrue(TriggerRule.is_valid(TriggerRule.ALL_DONE))
+        self.assertTrue(TriggerRule.is_valid(TriggerRule.ONE_SUCCESS))
+        self.assertTrue(TriggerRule.is_valid(TriggerRule.ONE_FAILED))
+        self.assertTrue(TriggerRule.is_valid(TriggerRule.DUMMY))
+        self.assertEqual(len(TriggerRule.all_triggers()), 6)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16ab314c/tests/utils/test_weight_rule.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_weight_rule.py b/tests/utils/test_weight_rule.py
new file mode 100644
index 0000000..8ca618d
--- /dev/null
+++ b/tests/utils/test_weight_rule.py
@@ -0,0 +1,25 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from airflow.utils.weight_rule import WeightRule
+
+
+class TestWeightRule(unittest.TestCase):
+
+    def test_valid_weight_rules(self):
+        self.assertTrue(WeightRule.is_valid(WeightRule.DOWNSTREAM))
+        self.assertTrue(WeightRule.is_valid(WeightRule.UPSTREAM))
+        self.assertTrue(WeightRule.is_valid(WeightRule.ABSOLUTE))
+        self.assertEqual(len(WeightRule.all_weight_rules()), 3)


[4/6] incubator-airflow git commit: [AIRFLOW-2203] Cache signature in apply_defaults

Posted by fo...@apache.org.
[AIRFLOW-2203] Cache signature in apply_defaults

Cache inspect.signature for the wrapper closure to avoid calling it at
every decorated invocation. This is separate sig_cache created per
decoration, i.e. each function decorated using apply_defaults will have
a different sig_cache.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/81ec595b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/81ec595b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/81ec595b

Branch: refs/heads/master
Commit: 81ec595b6c1ac05bc7f42e2c92c0dd79409953a4
Parents: 92357d5
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 17:08:44 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:11:50 2018 +0100

----------------------------------------------------------------------
 airflow/utils/decorators.py    | 31 ++++++++++-------
 tests/utils/test_decorators.py | 69 +++++++++++++++++++++++++++++++++++++
 2 files changed, 88 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/81ec595b/airflow/utils/decorators.py
----------------------------------------------------------------------
diff --git a/airflow/utils/decorators.py b/airflow/utils/decorators.py
index 995e60f..4e9cb05 100644
--- a/airflow/utils/decorators.py
+++ b/airflow/utils/decorators.py
@@ -39,6 +39,19 @@ def apply_defaults(func):
     inheritance and argument defaults, this decorator also alerts with
     specific information about the missing arguments.
     """
+
+    import airflow.models
+    # Cache inspect.signature for the wrapper closure to avoid calling it
+    # at every decorated invocation. This is separate sig_cache created
+    # per decoration, i.e. each function decorated using apply_defaults will
+    # have a different sig_cache.
+    sig_cache = signature(func)
+    non_optional_args = {
+        name for (name, param) in sig_cache.parameters.items()
+        if param.default == param.empty and
+        param.name != 'self' and
+        param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD)}
+
     @wraps(func)
     def wrapper(*args, **kwargs):
         if len(args) > 1:
@@ -46,9 +59,9 @@ def apply_defaults(func):
                 "Use keyword arguments when initializing operators")
         dag_args = {}
         dag_params = {}
-        import airflow.models
-        if kwargs.get('dag', None) or airflow.models._CONTEXT_MANAGER_DAG:
-            dag = kwargs.get('dag', None) or airflow.models._CONTEXT_MANAGER_DAG
+
+        dag = kwargs.get('dag', None) or airflow.models._CONTEXT_MANAGER_DAG
+        if dag:
             dag_args = copy(dag.default_args) or {}
             dag_params = copy(dag.params) or {}
 
@@ -67,16 +80,10 @@ def apply_defaults(func):
         dag_args.update(default_args)
         default_args = dag_args
 
-        sig = signature(func)
-        non_optional_args = [
-            name for (name, param) in sig.parameters.items()
-            if param.default == param.empty and
-            param.name != 'self' and
-            param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD)]
-        for arg in sig.parameters:
-            if arg in default_args and arg not in kwargs:
+        for arg in sig_cache.parameters:
+            if arg not in kwargs and arg in default_args:
                 kwargs[arg] = default_args[arg]
-        missing_args = list(set(non_optional_args) - set(kwargs))
+        missing_args = list(non_optional_args - set(kwargs))
         if missing_args:
             msg = "Argument {0} is required".format(missing_args)
             raise AirflowException(msg)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/81ec595b/tests/utils/test_decorators.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_decorators.py b/tests/utils/test_decorators.py
new file mode 100644
index 0000000..29dada7
--- /dev/null
+++ b/tests/utils/test_decorators.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from airflow.utils.decorators import apply_defaults
+from airflow.exceptions import AirflowException
+
+
+# Essentially similar to airflow.models.BaseOperator
+class DummyClass(object):
+    @apply_defaults
+    def __init__(self, test_param, params=None, default_args=None):
+        self.test_param = test_param
+
+
+class DummySubClass(DummyClass):
+    @apply_defaults
+    def __init__(self, test_sub_param, *args, **kwargs):
+        super(DummySubClass, self).__init__(*args, **kwargs)
+        self.test_sub_param = test_sub_param
+
+
+class ApplyDefaultTest(unittest.TestCase):
+
+    def test_apply(self):
+        dc = DummyClass(test_param=True)
+        self.assertTrue(dc.test_param)
+
+        with self.assertRaisesRegexp(AirflowException, 'Argument.*test_param.*required'):
+            DummySubClass(test_sub_param=True)
+
+    def test_default_args(self):
+        default_args = {'test_param': True}
+        dc = DummyClass(default_args=default_args)
+        self.assertTrue(dc.test_param)
+
+        default_args = {'test_param': True, 'test_sub_param': True}
+        dsc = DummySubClass(default_args=default_args)
+        self.assertTrue(dc.test_param)
+        self.assertTrue(dsc.test_sub_param)
+
+        default_args = {'test_param': True}
+        dsc = DummySubClass(default_args=default_args, test_sub_param=True)
+        self.assertTrue(dc.test_param)
+        self.assertTrue(dsc.test_sub_param)
+
+        with self.assertRaisesRegexp(AirflowException,
+                                     'Argument.*test_sub_param.*required'):
+            DummySubClass(default_args=default_args)
+
+    def test_incorrect_default_args(self):
+        default_args = {'test_param': True, 'extra_param': True}
+        dc = DummyClass(default_args=default_args)
+        self.assertTrue(dc.test_param)
+
+        default_args = {'random_params': True}
+        with self.assertRaisesRegexp(AirflowException, 'Argument.*test_param.*required'):
+            DummyClass(default_args=default_args)


[5/6] incubator-airflow git commit: [AIRFLOW-2203] Remove Useless Commands.

Posted by fo...@apache.org.
[AIRFLOW-2203] Remove Useless Commands.

self.tasks is a temp list gen from self.task_dict. no reason to append
to it


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6f0a0d2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6f0a0d2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6f0a0d2f

Branch: refs/heads/master
Commit: 6f0a0d2f6f8203d2c6542a8eaefe2cbb2640a8d3
Parents: 81ec595
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 17:01:35 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:11:56 2018 +0100

----------------------------------------------------------------------
 airflow/models.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6f0a0d2f/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 74a54bb..8931ac6 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3823,11 +3823,10 @@ class DAG(BaseDag, LoggingMixin):
                 'exception.'.format(task.task_id),
                 category=PendingDeprecationWarning)
         else:
-            self.tasks.append(task)
             self.task_dict[task.task_id] = task
             task.dag = self
 
-        self.task_count = len(self.tasks)
+        self.task_count = len(self.task_dict)
 
     def add_tasks(self, tasks):
         """


[3/6] incubator-airflow git commit: [AIRFLOW-2203] Speed up Operator Resources

Posted by fo...@apache.org.
[AIRFLOW-2203] Speed up Operator Resources

Set default values of Resources loaded from the configuration to prevent
4x Config lookups for every task created.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/92357d53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/92357d53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/92357d53

Branch: refs/heads/master
Commit: 92357d53e60a6c88afe6792923ff68070ec81df3
Parents: 16ab314
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 17:14:00 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:11:44 2018 +0100

----------------------------------------------------------------------
 airflow/utils/operator_resources.py | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92357d53/airflow/utils/operator_resources.py
----------------------------------------------------------------------
diff --git a/airflow/utils/operator_resources.py b/airflow/utils/operator_resources.py
index d304170..b786e8f 100644
--- a/airflow/utils/operator_resources.py
+++ b/airflow/utils/operator_resources.py
@@ -99,16 +99,12 @@ class Resources(object):
     :param gpus: The number of gpu units that are required
     :type gpus: long
     """
-    def __init__(self, cpus=None, ram=None, disk=None, gpus=None):
-        if cpus is None:
-            cpus = configuration.getint('operators', 'default_cpus')
-        if ram is None:
-            ram = configuration.getint('operators', 'default_ram')
-        if disk is None:
-            disk = configuration.getint('operators', 'default_disk')
-        if gpus is None:
-            gpus = configuration.getint('operators', 'default_gpus')
-
+    def __init__(self,
+                 cpus=configuration.getint('operators', 'default_cpus'),
+                 ram=configuration.getint('operators', 'default_ram'),
+                 disk=configuration.getint('operators', 'default_disk'),
+                 gpus=configuration.getint('operators', 'default_gpus')
+                 ):
         self.cpus = CpuResource(cpus)
         self.ram = RamResource(ram)
         self.disk = DiskResource(disk)


[6/6] incubator-airflow git commit: [AIRFLOW-2203] Defer cycle detection

Posted by fo...@apache.org.
[AIRFLOW-2203] Defer cycle detection

Moved from adding_task to when dag is being bagged.
This changes import dag runtime from polynomial to somewhat linear.

Closes #3116 from wongwill86:dag_import_speed


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c3730650
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c3730650
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c3730650

Branch: refs/heads/master
Commit: c3730650c852cd7a5e06a5933f5064bbb04e0e88
Parents: 6f0a0d2
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 16:53:01 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:13:59 2018 +0100

----------------------------------------------------------------------
 airflow/exceptions.py |   4 +
 airflow/models.py     | 111 ++++++++---
 tests/core.py         |  16 --
 tests/models.py       | 450 ++++++++++++++++++++++++++++++++++++++++++++-
 4 files changed, 533 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/airflow/exceptions.py
----------------------------------------------------------------------
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 90d3e22..c1b728c 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -34,3 +34,7 @@ class AirflowTaskTimeout(AirflowException):
 
 class AirflowSkipException(AirflowException):
     pass
+
+
+class AirflowDagCycleException(AirflowException):
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 8931ac6..c1b608a 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -22,7 +22,7 @@ from future.standard_library import install_aliases
 from builtins import str
 from builtins import object, bytes
 import copy
-from collections import namedtuple
+from collections import namedtuple, defaultdict
 from datetime import timedelta
 
 import dill
@@ -63,7 +63,9 @@ import six
 from airflow import settings, utils
 from airflow.executors import GetDefaultExecutor, LocalExecutor
 from airflow import configuration
-from airflow.exceptions import AirflowException, AirflowSkipException, AirflowTaskTimeout
+from airflow.exceptions import (
+    AirflowDagCycleException, AirflowException, AirflowSkipException, AirflowTaskTimeout
+)
 from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
 from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
@@ -122,7 +124,6 @@ else:
 # Used by DAG context_managers
 _CONTEXT_MANAGER_DAG = None
 
-
 def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
     """
     Clears a set of task instances, but makes sure the running ones
@@ -183,6 +184,11 @@ class DagBag(BaseDagBag, LoggingMixin):
     :type include_examples: bool
     """
 
+    # static class variables to detetct dag cycle
+    CYCLE_NEW = 0
+    CYCLE_IN_PROGRESS = 1
+    CYCLE_DONE = 2
+
     def __init__(
             self,
             dag_folder=None,
@@ -335,10 +341,17 @@ class DagBag(BaseDagBag, LoggingMixin):
                         dag.full_filepath = filepath
                         if dag.fileloc != filepath:
                             dag.fileloc = filepath
-                    dag.is_subdag = False
-                    self.bag_dag(dag, parent_dag=dag, root_dag=dag)
-                    found_dags.append(dag)
-                    found_dags += dag.subdags
+                    try:
+                        dag.is_subdag = False
+                        self.bag_dag(dag, parent_dag=dag, root_dag=dag)
+                        found_dags.append(dag)
+                        found_dags += dag.subdags
+                    except AirflowDagCycleException as cycle_exception:
+                        self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
+                        self.import_errors[dag.full_filepath] = str(cycle_exception)
+                        self.file_last_changed[dag.full_filepath] = \
+                            file_last_changed_on_disk
+
 
         self.file_last_changed[filepath] = file_last_changed_on_disk
         return found_dags
@@ -381,20 +394,39 @@ class DagBag(BaseDagBag, LoggingMixin):
     def bag_dag(self, dag, parent_dag, root_dag):
         """
         Adds the DAG into the bag, recurses into sub dags.
+        Throws AirflowDagCycleException if a cycle is detected in this dag or its subdags
         """
-        self.dags[dag.dag_id] = dag
+
+        dag.test_cycle()  # throws if a task cycle is found
+
         dag.resolve_template_files()
         dag.last_loaded = timezone.utcnow()
 
         for task in dag.tasks:
             settings.policy(task)
 
-        for subdag in dag.subdags:
-            subdag.full_filepath = dag.full_filepath
-            subdag.parent_dag = dag
-            subdag.is_subdag = True
-            self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
-        self.log.debug('Loaded DAG {dag}'.format(**locals()))
+        subdags = dag.subdags
+
+        try:
+            for subdag in subdags:
+                subdag.full_filepath = dag.full_filepath
+                subdag.parent_dag = dag
+                subdag.is_subdag = True
+                self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
+
+            self.dags[dag.dag_id] = dag
+            self.log.debug('Loaded DAG {dag}'.format(**locals()))
+        except AirflowDagCycleException as cycle_exception:
+            # There was an error in bagging the dag. Remove it from the list of dags
+            self.log.exception('Exception bagging dag: {dag.dag_id}'.format(**locals()))
+            # Only necessary at the root level since DAG.subdags automatically
+            # performs DFS to search through all subdags
+            if dag == root_dag:
+                for subdag in subdags:
+                    if subdag.dag_id in self.dags:
+                        del self.dags[subdag.dag_id]
+            raise cycle_exception
+
 
     def collect_dags(
             self,
@@ -2699,21 +2731,6 @@ class BaseOperator(LoggingMixin):
         return list(map(lambda task_id: self._dag.task_dict[task_id],
                         self.get_flat_relative_ids(upstream)))
 
-    def detect_downstream_cycle(self, task=None):
-        """
-        When invoked, this routine will raise an exception if a cycle is
-        detected downstream from self. It is invoked when tasks are added to
-        the DAG to detect cycles.
-        """
-        if not task:
-            task = self
-        for t in self.get_direct_relatives():
-            if task is t:
-                msg = "Cycle detected in DAG. Faulty task: {0}".format(task)
-                raise AirflowException(msg)
-            else:
-                t.detect_downstream_cycle(task=task)
-        return False
 
     def run(
             self,
@@ -4074,6 +4091,42 @@ class DAG(BaseDag, LoggingMixin):
                 qry = qry.filter(TaskInstance.state.in_(states))
         return qry.scalar()
 
+    def test_cycle(self):
+        '''
+        Check to see if there are any cycles in the DAG. Returns False if no cycle found,
+        otherwise raises exception.
+        '''
+
+        # default of int is 0 which corresponds to CYCLE_NEW
+        visit_map = defaultdict(int)
+        for task_id in self.task_dict.keys():
+            # print('starting %s' % task_id)
+            if visit_map[task_id] == DagBag.CYCLE_NEW:
+                self._test_cycle_helper(visit_map, task_id)
+        return False
+
+    def _test_cycle_helper(self, visit_map, task_id):
+        '''
+        Checks if a cycle exists from the input task using DFS traversal
+        '''
+
+        # print('Inspecting %s' % task_id)
+        if visit_map[task_id] == DagBag.CYCLE_DONE:
+            return False
+
+        visit_map[task_id] = DagBag.CYCLE_IN_PROGRESS
+
+        task = self.task_dict[task_id]
+        for descendant_id in task.get_direct_relative_ids():
+            if visit_map[descendant_id] == DagBag.CYCLE_IN_PROGRESS:
+                msg = "Cycle detected in DAG. Faulty task: {0} to {1}".format(
+                    task_id, descendant_id)
+                raise AirflowDagCycleException(msg)
+            else:
+                self._test_cycle_helper(visit_map, descendant_id)
+
+        visit_map[task_id] = DagBag.CYCLE_DONE
+
 
 class Chart(Base):
     __tablename__ = "chart"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 03372db..ce5fb7a 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -781,22 +781,6 @@ class CoreTest(unittest.TestCase):
         with self.assertRaisesRegexp(AirflowException, regexp):
             self.run_after_loop.set_upstream(self.runme_0)
 
-    def test_cyclic_dependencies_1(self):
-
-        regexp = "Cycle detected in DAG. (.*)runme_0(.*)"
-        with self.assertRaisesRegexp(AirflowException, regexp):
-            self.runme_0.set_upstream(self.run_after_loop)
-
-    def test_cyclic_dependencies_2(self):
-        regexp = "Cycle detected in DAG. (.*)run_after_loop(.*)"
-        with self.assertRaisesRegexp(AirflowException, regexp):
-            self.run_after_loop.set_downstream(self.runme_0)
-
-    def test_cyclic_dependencies_3(self):
-        regexp = "Cycle detected in DAG. (.*)run_this_last(.*)"
-        with self.assertRaisesRegexp(AirflowException, regexp):
-            self.run_this_last.set_downstream(self.runme_0)
-
     def test_bad_trigger_rule(self):
         with self.assertRaises(AirflowException):
             DummyOperator(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index c8ee037..5d8184c 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -26,9 +26,11 @@ import time
 import six
 import re
 import urllib
+import textwrap
+import inspect
 
 from airflow import configuration, models, settings, AirflowException
-from airflow.exceptions import AirflowSkipException
+from airflow.exceptions import AirflowDagCycleException, AirflowSkipException
 from airflow.jobs import BackfillJob
 from airflow.models import DAG, TaskInstance as TI
 from airflow.models import State as ST
@@ -47,7 +49,7 @@ from airflow.utils.state import State
 from airflow.utils.trigger_rule import TriggerRule
 from mock import patch
 from parameterized import parameterized
-
+from tempfile import NamedTemporaryFile
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 TEST_DAGS_FOLDER = os.path.join(
@@ -388,6 +390,132 @@ class DagTest(unittest.TestCase):
         result = task.render_template('', "{{ 'world' | hello}}", dict())
         self.assertEqual(result, 'Hello world')
 
+    def test_cycle(self):
+        # test empty
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        self.assertFalse(dag.test_cycle())
+
+        # test single task
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        with dag:
+            opA = DummyOperator(task_id='A')
+
+        self.assertFalse(dag.test_cycle())
+
+        # test no cycle
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # A -> B -> C
+        #      B -> D
+        # E -> F
+        with dag:
+            opA = DummyOperator(task_id='A')
+            opB = DummyOperator(task_id='B')
+            opC = DummyOperator(task_id='C')
+            opD = DummyOperator(task_id='D')
+            opE = DummyOperator(task_id='E')
+            opF = DummyOperator(task_id='F')
+            opA.set_downstream(opB)
+            opB.set_downstream(opC)
+            opB.set_downstream(opD)
+            opE.set_downstream(opF)
+
+        self.assertFalse(dag.test_cycle())
+
+        # test self loop
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # A -> A
+        with dag:
+            opA = DummyOperator(task_id='A')
+            opA.set_downstream(opA)
+
+        with self.assertRaises(AirflowDagCycleException):
+            dag.test_cycle()
+
+        # test downstream self loop
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # A -> B -> C -> D -> E -> E
+        with dag:
+            opA = DummyOperator(task_id='A')
+            opB = DummyOperator(task_id='B')
+            opC = DummyOperator(task_id='C')
+            opD = DummyOperator(task_id='D')
+            opE = DummyOperator(task_id='E')
+            opA.set_downstream(opB)
+            opB.set_downstream(opC)
+            opC.set_downstream(opD)
+            opD.set_downstream(opE)
+            opE.set_downstream(opE)
+
+        with self.assertRaises(AirflowDagCycleException):
+            dag.test_cycle()
+
+        # large loop
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # A -> B -> C -> D -> E -> A
+        with dag:
+            opA = DummyOperator(task_id='A')
+            opB = DummyOperator(task_id='B')
+            opC = DummyOperator(task_id='C')
+            opD = DummyOperator(task_id='D')
+            opE = DummyOperator(task_id='E')
+            opA.set_downstream(opB)
+            opB.set_downstream(opC)
+            opC.set_downstream(opD)
+            opD.set_downstream(opE)
+            opE.set_downstream(opA)
+
+        with self.assertRaises(AirflowDagCycleException):
+            dag.test_cycle()
+
+        # test arbitrary loop
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # E-> A -> B -> F -> A
+        #       -> C -> F
+        with dag:
+            opA = DummyOperator(task_id='A')
+            opB = DummyOperator(task_id='B')
+            opC = DummyOperator(task_id='C')
+            opD = DummyOperator(task_id='D')
+            opE = DummyOperator(task_id='E')
+            opF = DummyOperator(task_id='F')
+            opA.set_downstream(opB)
+            opA.set_downstream(opC)
+            opE.set_downstream(opA)
+            opC.set_downstream(opF)
+            opB.set_downstream(opF)
+            opF.set_downstream(opA)
+
+        with self.assertRaises(AirflowDagCycleException):
+            dag.test_cycle()
+
 
 class DagStatTest(unittest.TestCase):
     def test_dagstats_crud(self):
@@ -796,7 +924,6 @@ class DagBagTest(unittest.TestCase):
         """
         test that we're able to parse file that contains multi-byte char
         """
-        from tempfile import NamedTemporaryFile
         f = NamedTemporaryFile()
         f.write('\u3042'.encode('utf8'))  # write multi-byte char (hiragana)
         f.flush()
@@ -857,6 +984,323 @@ class DagBagTest(unittest.TestCase):
             self.assertTrue(
                 dag.fileloc.endswith('airflow/example_dags/' + path))
 
+    def process_dag(self, create_dag):
+        """
+        Helper method to process a file generated from the input create_dag function.
+        """
+        # write source to file
+        source = textwrap.dedent(''.join(
+            inspect.getsource(create_dag).splitlines(True)[1:-1]))
+        f = NamedTemporaryFile()
+        f.write(source.encode('utf8'))
+        f.flush()
+
+        dagbag = models.DagBag(include_examples=False)
+        found_dags = dagbag.process_file(f.name)
+        return (dagbag, found_dags, f.name)
+
+    def validate_dags(self, expected_parent_dag, actual_found_dags, actual_dagbag,
+                      should_be_found=True):
+        expected_dag_ids = list(map(lambda dag: dag.dag_id, expected_parent_dag.subdags))
+        expected_dag_ids.append(expected_parent_dag.dag_id)
+
+        actual_found_dag_ids = list(map(lambda dag: dag.dag_id, actual_found_dags))
+
+        for dag_id in expected_dag_ids:
+            actual_dagbag.log.info('validating %s' % dag_id)
+            self.assertEquals(
+                dag_id in actual_found_dag_ids, should_be_found,
+                'dag "%s" should %shave been found after processing dag "%s"' %
+                (dag_id, '' if should_be_found else 'not ', expected_parent_dag.dag_id)
+            )
+            self.assertEquals(
+                dag_id in actual_dagbag.dags, should_be_found,
+                'dag "%s" should %sbe in dagbag.dags after processing dag "%s"' %
+                (dag_id, '' if should_be_found else 'not ', expected_parent_dag.dag_id)
+            )
+
+    def test_load_subdags(self):
+        # Define Dag to load
+        def standard_subdag():
+            from airflow.models import DAG
+            from airflow.operators.dummy_operator import DummyOperator
+            from airflow.operators.subdag_operator import SubDagOperator
+            import datetime
+            DAG_NAME = 'master'
+            DEFAULT_ARGS = {
+                'owner': 'owner1',
+                'start_date': datetime.datetime(2016, 1, 1)
+            }
+            dag = DAG(
+                DAG_NAME,
+                default_args=DEFAULT_ARGS)
+
+            # master:
+            #     A -> opSubDag_0
+            #          master.opsubdag_0:
+            #              -> subdag_0.task
+            #     A -> opSubDag_1
+            #          master.opsubdag_1:
+            #              -> subdag_1.task
+
+            with dag:
+                def subdag_0():
+                    subdag_0 = DAG('master.opSubdag_0', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_0.task', dag=subdag_0)
+                    return subdag_0
+
+                def subdag_1():
+                    subdag_1 = DAG('master.opSubdag_1', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_1.task', dag=subdag_1)
+                    return subdag_1
+
+                opSubdag_0 = SubDagOperator(
+                    task_id='opSubdag_0', dag=dag, subdag=subdag_0())
+                opSubdag_1 = SubDagOperator(
+                    task_id='opSubdag_1', dag=dag, subdag=subdag_1())
+
+                opA = DummyOperator(task_id='A')
+                opA.set_downstream(opSubdag_0)
+                opA.set_downstream(opSubdag_1)
+            return dag
+
+        testDag = standard_subdag()
+        # sanity check to make sure DAG.subdag is still functioning properly
+        self.assertEqual(len(testDag.subdags), 2)
+
+        # Perform processing dag
+        dagbag, found_dags, _ = self.process_dag(standard_subdag)
+
+        # Validate correctness
+        # all dags from testDag should be listed
+        self.validate_dags(testDag, found_dags, dagbag)
+
+        # Define Dag to load
+        def nested_subdags():
+            from airflow.models import DAG
+            from airflow.operators.dummy_operator import DummyOperator
+            from airflow.operators.subdag_operator import SubDagOperator
+            import datetime
+            DAG_NAME = 'master'
+            DEFAULT_ARGS = {
+                'owner': 'owner1',
+                'start_date': datetime.datetime(2016, 1, 1)
+            }
+            dag = DAG(
+                DAG_NAME,
+                default_args=DEFAULT_ARGS)
+
+            # master:
+            #     A -> opSubdag_0
+            #          master.opSubdag_0:
+            #              -> opSubDag_A
+            #                 master.opSubdag_0.opSubdag_A:
+            #                     -> subdag_A.task
+            #              -> opSubdag_B
+            #                 master.opSubdag_0.opSubdag_B:
+            #                     -> subdag_B.task
+            #     A -> opSubdag_1
+            #          master.opSubdag_1:
+            #              -> opSubdag_C
+            #                 master.opSubdag_1.opSubdag_C:
+            #                     -> subdag_C.task
+            #              -> opSubDag_D
+            #                 master.opSubdag_1.opSubdag_D:
+            #                     -> subdag_D.task
+
+            with dag:
+                def subdag_A():
+                    subdag_A = DAG(
+                        'master.opSubdag_0.opSubdag_A', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_A.task', dag=subdag_A)
+                    return subdag_A
+
+                def subdag_B():
+                    subdag_B = DAG(
+                        'master.opSubdag_0.opSubdag_B', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_B.task', dag=subdag_B)
+                    return subdag_B
+
+                def subdag_C():
+                    subdag_C = DAG(
+                        'master.opSubdag_1.opSubdag_C', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_C.task', dag=subdag_C)
+                    return subdag_C
+
+                def subdag_D():
+                    subdag_D = DAG(
+                        'master.opSubdag_1.opSubdag_D', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_D.task', dag=subdag_D)
+                    return subdag_D
+
+                def subdag_0():
+                    subdag_0 = DAG('master.opSubdag_0', default_args=DEFAULT_ARGS)
+                    SubDagOperator(task_id='opSubdag_A', dag=subdag_0, subdag=subdag_A())
+                    SubDagOperator(task_id='opSubdag_B', dag=subdag_0, subdag=subdag_B())
+                    return subdag_0
+
+                def subdag_1():
+                    subdag_1 = DAG('master.opSubdag_1', default_args=DEFAULT_ARGS)
+                    SubDagOperator(task_id='opSubdag_C', dag=subdag_1, subdag=subdag_C())
+                    SubDagOperator(task_id='opSubdag_D', dag=subdag_1, subdag=subdag_D())
+                    return subdag_1
+
+                opSubdag_0 = SubDagOperator(
+                    task_id='opSubdag_0', dag=dag, subdag=subdag_0())
+                opSubdag_1 = SubDagOperator(
+                    task_id='opSubdag_1', dag=dag, subdag=subdag_1())
+
+                opA = DummyOperator(task_id='A')
+                opA.set_downstream(opSubdag_0)
+                opA.set_downstream(opSubdag_1)
+
+            return dag
+
+        testDag = nested_subdags()
+        # sanity check to make sure DAG.subdag is still functioning properly
+        self.assertEqual(len(testDag.subdags), 6)
+
+        # Perform processing dag
+        dagbag, found_dags, _ = self.process_dag(nested_subdags)
+
+        # Validate correctness
+        # all dags from testDag should be listed
+        self.validate_dags(testDag, found_dags, dagbag)
+
+    def test_skip_cycle_dags(self):
+        """
+        Don't crash when loading an invalid (contains a cycle) DAG file.
+        Don't load the dag into the DagBag either
+        """
+        # Define Dag to load
+        def basic_cycle():
+            from airflow.models import DAG
+            from airflow.operators.dummy_operator import DummyOperator
+            import datetime
+            DAG_NAME = 'cycle_dag'
+            DEFAULT_ARGS = {
+                'owner': 'owner1',
+                'start_date': datetime.datetime(2016, 1, 1)
+            }
+            dag = DAG(
+                DAG_NAME,
+                default_args=DEFAULT_ARGS)
+
+            # A -> A
+            with dag:
+                opA = DummyOperator(task_id='A')
+                opA.set_downstream(opA)
+
+            return dag
+
+        testDag = basic_cycle()
+        # sanity check to make sure DAG.subdag is still functioning properly
+        self.assertEqual(len(testDag.subdags), 0)
+
+        # Perform processing dag
+        dagbag, found_dags, file_path = self.process_dag(basic_cycle)
+
+        # #Validate correctness
+        # None of the dags should be found
+        self.validate_dags(testDag, found_dags, dagbag, should_be_found=False)
+        self.assertIn(file_path, dagbag.import_errors)
+
+        # Define Dag to load
+        def nested_subdag_cycle():
+            from airflow.models import DAG
+            from airflow.operators.dummy_operator import DummyOperator
+            from airflow.operators.subdag_operator import SubDagOperator
+            import datetime
+            DAG_NAME = 'nested_cycle'
+            DEFAULT_ARGS = {
+                'owner': 'owner1',
+                'start_date': datetime.datetime(2016, 1, 1)
+            }
+            dag = DAG(
+                DAG_NAME,
+                default_args=DEFAULT_ARGS)
+
+            # cycle:
+            #     A -> opSubdag_0
+            #          cycle.opSubdag_0:
+            #              -> opSubDag_A
+            #                 cycle.opSubdag_0.opSubdag_A:
+            #                     -> subdag_A.task
+            #              -> opSubdag_B
+            #                 cycle.opSubdag_0.opSubdag_B:
+            #                     -> subdag_B.task
+            #     A -> opSubdag_1
+            #          cycle.opSubdag_1:
+            #              -> opSubdag_C
+            #                 cycle.opSubdag_1.opSubdag_C:
+            #                     -> subdag_C.task -> subdag_C.task  >Invalid Loop<
+            #              -> opSubDag_D
+            #                 cycle.opSubdag_1.opSubdag_D:
+            #                     -> subdag_D.task
+
+            with dag:
+                def subdag_A():
+                    subdag_A = DAG(
+                        'nested_cycle.opSubdag_0.opSubdag_A', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_A.task', dag=subdag_A)
+                    return subdag_A
+
+                def subdag_B():
+                    subdag_B = DAG(
+                        'nested_cycle.opSubdag_0.opSubdag_B', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_B.task', dag=subdag_B)
+                    return subdag_B
+
+                def subdag_C():
+                    subdag_C = DAG(
+                        'nested_cycle.opSubdag_1.opSubdag_C', default_args=DEFAULT_ARGS)
+                    opSubdag_C_task = DummyOperator(
+                        task_id='subdag_C.task', dag=subdag_C)
+                    # introduce a loop in opSubdag_C
+                    opSubdag_C_task.set_downstream(opSubdag_C_task)
+                    return subdag_C
+
+                def subdag_D():
+                    subdag_D = DAG(
+                        'nested_cycle.opSubdag_1.opSubdag_D', default_args=DEFAULT_ARGS)
+                    DummyOperator(task_id='subdag_D.task', dag=subdag_D)
+                    return subdag_D
+
+                def subdag_0():
+                    subdag_0 = DAG('nested_cycle.opSubdag_0', default_args=DEFAULT_ARGS)
+                    SubDagOperator(task_id='opSubdag_A', dag=subdag_0, subdag=subdag_A())
+                    SubDagOperator(task_id='opSubdag_B', dag=subdag_0, subdag=subdag_B())
+                    return subdag_0
+
+                def subdag_1():
+                    subdag_1 = DAG('nested_cycle.opSubdag_1', default_args=DEFAULT_ARGS)
+                    SubDagOperator(task_id='opSubdag_C', dag=subdag_1, subdag=subdag_C())
+                    SubDagOperator(task_id='opSubdag_D', dag=subdag_1, subdag=subdag_D())
+                    return subdag_1
+
+                opSubdag_0 = SubDagOperator(
+                    task_id='opSubdag_0', dag=dag, subdag=subdag_0())
+                opSubdag_1 = SubDagOperator(
+                    task_id='opSubdag_1', dag=dag, subdag=subdag_1())
+
+                opA = DummyOperator(task_id='A')
+                opA.set_downstream(opSubdag_0)
+                opA.set_downstream(opSubdag_1)
+
+            return dag
+
+        testDag = nested_subdag_cycle()
+        # sanity check to make sure DAG.subdag is still functioning properly
+        self.assertEqual(len(testDag.subdags), 6)
+
+        # Perform processing dag
+        dagbag, found_dags, file_path = self.process_dag(nested_subdag_cycle)
+
+        # Validate correctness
+        # None of the dags should be found
+        self.validate_dags(testDag, found_dags, dagbag, should_be_found=False)
+        self.assertIn(file_path, dagbag.import_errors)
+
     def test_process_file_with_none(self):
         """
         test that process_file can handle Nones