You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/03/14 08:15:40 UTC
[1/6] incubator-airflow git commit: [AIRFLOW-2203] Store task ids as
sets not lists
Repository: incubator-airflow
Updated Branches:
refs/heads/master 4cf2fba19 -> c3730650c
[AIRFLOW-2203] Store task ids as sets not lists
Massively improve performance by using sets to represent a task's
upstream and downstream task ids.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/781c5bf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/781c5bf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/781c5bf6
Branch: refs/heads/master
Commit: 781c5bf6967473701e4812d72ace4f675f5b5c94
Parents: 4cf2fba
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 17:33:33 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:11:32 2018 +0100
----------------------------------------------------------------------
airflow/models.py | 29 +++++++++++-------------
tests/ti_deps/deps/test_trigger_rule_dep.py | 2 +-
2 files changed, 14 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/781c5bf6/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index cf31b07..74a54bb 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2286,8 +2286,8 @@ class BaseOperator(LoggingMixin):
self.task_concurrency = task_concurrency
# Private attributes
- self._upstream_task_ids = []
- self._downstream_task_ids = []
+ self._upstream_task_ids = set()
+ self._downstream_task_ids = set()
if not dag and _CONTEXT_MANAGER_DAG:
dag = _CONTEXT_MANAGER_DAG
@@ -2771,13 +2771,13 @@ class BaseOperator(LoggingMixin):
def task_type(self):
return self.__class__.__name__
- def append_only_new(self, l, item):
- if any([item is t for t in l]):
+ def add_only_new(self, item_set, item):
+ if item in item_set:
raise AirflowException(
'Dependency {self}, {item} already registered'
''.format(**locals()))
else:
- l.append(item)
+ item_set.add(item)
def _set_relatives(self, task_or_task_list, upstream=False):
try:
@@ -2793,7 +2793,7 @@ class BaseOperator(LoggingMixin):
# relationships can only be set if the tasks share a single DAG. Tasks
# without a DAG are assigned to that DAG.
- dags = {t._dag.dag_id: t.dag for t in [self] + task_list if t.has_dag()}
+ dags = {t._dag.dag_id: t._dag for t in [self] + task_list if t.has_dag()}
if len(dags) > 1:
raise AirflowException(
@@ -2814,13 +2814,11 @@ class BaseOperator(LoggingMixin):
if dag and not task.has_dag():
task.dag = dag
if upstream:
- task.append_only_new(task._downstream_task_ids, self.task_id)
- self.append_only_new(self._upstream_task_ids, task.task_id)
+ task.add_only_new(task._downstream_task_ids, self.task_id)
+ self.add_only_new(self._upstream_task_ids, task.task_id)
else:
- self.append_only_new(self._downstream_task_ids, task.task_id)
- task.append_only_new(task._upstream_task_ids, self.task_id)
-
- self.detect_downstream_cycle()
+ self.add_only_new(self._downstream_task_ids, task.task_id)
+ task.add_only_new(task._upstream_task_ids, self.task_id)
def set_downstream(self, task_or_task_list):
"""
@@ -3729,10 +3727,9 @@ class DAG(BaseDag, LoggingMixin):
for t in dag.tasks:
# Removing upstream/downstream references to tasks that did not
# made the cut
- t._upstream_task_ids = [
- tid for tid in t._upstream_task_ids if tid in dag.task_ids]
- t._downstream_task_ids = [
- tid for tid in t._downstream_task_ids if tid in dag.task_ids]
+ t._upstream_task_ids = t._upstream_task_ids.intersection(dag.task_dict.keys())
+ t._downstream_task_ids = t._downstream_task_ids.intersection(
+ dag.task_dict.keys())
if len(dag.tasks) < len(self.tasks):
dag.partial = True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/781c5bf6/tests/ti_deps/deps/test_trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py b/tests/ti_deps/deps/test_trigger_rule_dep.py
index a61ff0d..18595bb 100644
--- a/tests/ti_deps/deps/test_trigger_rule_dep.py
+++ b/tests/ti_deps/deps/test_trigger_rule_dep.py
@@ -28,7 +28,7 @@ class TriggerRuleDepTest(unittest.TestCase):
task = BaseOperator(task_id='test_task', trigger_rule=trigger_rule,
start_date=datetime(2015, 1, 1))
if upstream_task_ids:
- task._upstream_task_ids.extend(upstream_task_ids)
+ task._upstream_task_ids.update(upstream_task_ids)
return TaskInstance(task=task, state=state, execution_date=None)
def test_no_upstream_tasks(self):
[2/6] incubator-airflow git commit: [AIRFLOW-2203] Cache static rules
(trigger/weight)
Posted by fo...@apache.org.
[AIRFLOW-2203] Cache static rules (trigger/weight)
No need to recalculate them everytime just to see if they are valid
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/16ab314c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/16ab314c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/16ab314c
Branch: refs/heads/master
Commit: 16ab314c21d131136fb00adcc395d71a6f8b31b2
Parents: 781c5bf
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 17:32:24 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:11:38 2018 +0100
----------------------------------------------------------------------
airflow/utils/trigger_rule.py | 9 +++++++--
airflow/utils/weight_rule.py | 9 +++++++--
tests/utils/test_trigger_rule.py | 28 ++++++++++++++++++++++++++++
tests/utils/test_weight_rule.py | 25 +++++++++++++++++++++++++
4 files changed, 67 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16ab314c/airflow/utils/trigger_rule.py
----------------------------------------------------------------------
diff --git a/airflow/utils/trigger_rule.py b/airflow/utils/trigger_rule.py
index f830909..5daaeac 100644
--- a/airflow/utils/trigger_rule.py
+++ b/airflow/utils/trigger_rule.py
@@ -25,12 +25,17 @@ class TriggerRule(object):
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
+ _ALL_TRIGGER_RULES = {}
@classmethod
def is_valid(cls, trigger_rule):
return trigger_rule in cls.all_triggers()
@classmethod
def all_triggers(cls):
- return [getattr(cls, attr)
+ if not cls._ALL_TRIGGER_RULES:
+ cls._ALL_TRIGGER_RULES = {
+ getattr(cls, attr)
for attr in dir(cls)
- if not attr.startswith("__") and not callable(getattr(cls, attr))]
+ if not attr.startswith("_") and not callable(getattr(cls, attr))
+ }
+ return cls._ALL_TRIGGER_RULES
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16ab314c/airflow/utils/weight_rule.py
----------------------------------------------------------------------
diff --git a/airflow/utils/weight_rule.py b/airflow/utils/weight_rule.py
index fde0d90..51f45a6 100644
--- a/airflow/utils/weight_rule.py
+++ b/airflow/utils/weight_rule.py
@@ -22,12 +22,17 @@ class WeightRule(object):
UPSTREAM = 'upstream'
ABSOLUTE = 'absolute'
+ _ALL_WEIGHT_RULES = {}
@classmethod
def is_valid(cls, weight_rule):
return weight_rule in cls.all_weight_rules()
@classmethod
def all_weight_rules(cls):
- return [getattr(cls, attr)
+ if not cls._ALL_WEIGHT_RULES:
+ cls._ALL_WEIGHT_RULES = {
+ getattr(cls, attr)
for attr in dir(cls)
- if not attr.startswith("__") and not callable(getattr(cls, attr))]
+ if not attr.startswith("_") and not callable(getattr(cls, attr))
+ }
+ return cls._ALL_WEIGHT_RULES
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16ab314c/tests/utils/test_trigger_rule.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_trigger_rule.py b/tests/utils/test_trigger_rule.py
new file mode 100644
index 0000000..f89a56e
--- /dev/null
+++ b/tests/utils/test_trigger_rule.py
@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from airflow.utils.trigger_rule import TriggerRule
+
+
+class TestTriggerRule(unittest.TestCase):
+
+ def test_valid_trigger_rules(self):
+ self.assertTrue(TriggerRule.is_valid(TriggerRule.ALL_SUCCESS))
+ self.assertTrue(TriggerRule.is_valid(TriggerRule.ALL_FAILED))
+ self.assertTrue(TriggerRule.is_valid(TriggerRule.ALL_DONE))
+ self.assertTrue(TriggerRule.is_valid(TriggerRule.ONE_SUCCESS))
+ self.assertTrue(TriggerRule.is_valid(TriggerRule.ONE_FAILED))
+ self.assertTrue(TriggerRule.is_valid(TriggerRule.DUMMY))
+ self.assertEqual(len(TriggerRule.all_triggers()), 6)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16ab314c/tests/utils/test_weight_rule.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_weight_rule.py b/tests/utils/test_weight_rule.py
new file mode 100644
index 0000000..8ca618d
--- /dev/null
+++ b/tests/utils/test_weight_rule.py
@@ -0,0 +1,25 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from airflow.utils.weight_rule import WeightRule
+
+
+class TestWeightRule(unittest.TestCase):
+
+ def test_valid_weight_rules(self):
+ self.assertTrue(WeightRule.is_valid(WeightRule.DOWNSTREAM))
+ self.assertTrue(WeightRule.is_valid(WeightRule.UPSTREAM))
+ self.assertTrue(WeightRule.is_valid(WeightRule.ABSOLUTE))
+ self.assertEqual(len(WeightRule.all_weight_rules()), 3)
[4/6] incubator-airflow git commit: [AIRFLOW-2203] Cache signature in
apply_defaults
Posted by fo...@apache.org.
[AIRFLOW-2203] Cache signature in apply_defaults
Cache inspect.signature for the wrapper closure to avoid calling it at
every decorated invocation. This is separate sig_cache created per
decoration, i.e. each function decorated using apply_defaults will have
a different sig_cache.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/81ec595b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/81ec595b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/81ec595b
Branch: refs/heads/master
Commit: 81ec595b6c1ac05bc7f42e2c92c0dd79409953a4
Parents: 92357d5
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 17:08:44 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:11:50 2018 +0100
----------------------------------------------------------------------
airflow/utils/decorators.py | 31 ++++++++++-------
tests/utils/test_decorators.py | 69 +++++++++++++++++++++++++++++++++++++
2 files changed, 88 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/81ec595b/airflow/utils/decorators.py
----------------------------------------------------------------------
diff --git a/airflow/utils/decorators.py b/airflow/utils/decorators.py
index 995e60f..4e9cb05 100644
--- a/airflow/utils/decorators.py
+++ b/airflow/utils/decorators.py
@@ -39,6 +39,19 @@ def apply_defaults(func):
inheritance and argument defaults, this decorator also alerts with
specific information about the missing arguments.
"""
+
+ import airflow.models
+ # Cache inspect.signature for the wrapper closure to avoid calling it
+ # at every decorated invocation. This is separate sig_cache created
+ # per decoration, i.e. each function decorated using apply_defaults will
+ # have a different sig_cache.
+ sig_cache = signature(func)
+ non_optional_args = {
+ name for (name, param) in sig_cache.parameters.items()
+ if param.default == param.empty and
+ param.name != 'self' and
+ param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD)}
+
@wraps(func)
def wrapper(*args, **kwargs):
if len(args) > 1:
@@ -46,9 +59,9 @@ def apply_defaults(func):
"Use keyword arguments when initializing operators")
dag_args = {}
dag_params = {}
- import airflow.models
- if kwargs.get('dag', None) or airflow.models._CONTEXT_MANAGER_DAG:
- dag = kwargs.get('dag', None) or airflow.models._CONTEXT_MANAGER_DAG
+
+ dag = kwargs.get('dag', None) or airflow.models._CONTEXT_MANAGER_DAG
+ if dag:
dag_args = copy(dag.default_args) or {}
dag_params = copy(dag.params) or {}
@@ -67,16 +80,10 @@ def apply_defaults(func):
dag_args.update(default_args)
default_args = dag_args
- sig = signature(func)
- non_optional_args = [
- name for (name, param) in sig.parameters.items()
- if param.default == param.empty and
- param.name != 'self' and
- param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD)]
- for arg in sig.parameters:
- if arg in default_args and arg not in kwargs:
+ for arg in sig_cache.parameters:
+ if arg not in kwargs and arg in default_args:
kwargs[arg] = default_args[arg]
- missing_args = list(set(non_optional_args) - set(kwargs))
+ missing_args = list(non_optional_args - set(kwargs))
if missing_args:
msg = "Argument {0} is required".format(missing_args)
raise AirflowException(msg)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/81ec595b/tests/utils/test_decorators.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_decorators.py b/tests/utils/test_decorators.py
new file mode 100644
index 0000000..29dada7
--- /dev/null
+++ b/tests/utils/test_decorators.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from airflow.utils.decorators import apply_defaults
+from airflow.exceptions import AirflowException
+
+
+# Essentially similar to airflow.models.BaseOperator
+class DummyClass(object):
+ @apply_defaults
+ def __init__(self, test_param, params=None, default_args=None):
+ self.test_param = test_param
+
+
+class DummySubClass(DummyClass):
+ @apply_defaults
+ def __init__(self, test_sub_param, *args, **kwargs):
+ super(DummySubClass, self).__init__(*args, **kwargs)
+ self.test_sub_param = test_sub_param
+
+
+class ApplyDefaultTest(unittest.TestCase):
+
+ def test_apply(self):
+ dc = DummyClass(test_param=True)
+ self.assertTrue(dc.test_param)
+
+ with self.assertRaisesRegexp(AirflowException, 'Argument.*test_param.*required'):
+ DummySubClass(test_sub_param=True)
+
+ def test_default_args(self):
+ default_args = {'test_param': True}
+ dc = DummyClass(default_args=default_args)
+ self.assertTrue(dc.test_param)
+
+ default_args = {'test_param': True, 'test_sub_param': True}
+ dsc = DummySubClass(default_args=default_args)
+ self.assertTrue(dc.test_param)
+ self.assertTrue(dsc.test_sub_param)
+
+ default_args = {'test_param': True}
+ dsc = DummySubClass(default_args=default_args, test_sub_param=True)
+ self.assertTrue(dc.test_param)
+ self.assertTrue(dsc.test_sub_param)
+
+ with self.assertRaisesRegexp(AirflowException,
+ 'Argument.*test_sub_param.*required'):
+ DummySubClass(default_args=default_args)
+
+ def test_incorrect_default_args(self):
+ default_args = {'test_param': True, 'extra_param': True}
+ dc = DummyClass(default_args=default_args)
+ self.assertTrue(dc.test_param)
+
+ default_args = {'random_params': True}
+ with self.assertRaisesRegexp(AirflowException, 'Argument.*test_param.*required'):
+ DummyClass(default_args=default_args)
[5/6] incubator-airflow git commit: [AIRFLOW-2203] Remove Useless
Commands.
Posted by fo...@apache.org.
[AIRFLOW-2203] Remove Useless Commands.
self.tasks is a temp list gen from self.task_dict. no reason to append
to it
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6f0a0d2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6f0a0d2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6f0a0d2f
Branch: refs/heads/master
Commit: 6f0a0d2f6f8203d2c6542a8eaefe2cbb2640a8d3
Parents: 81ec595
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 17:01:35 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:11:56 2018 +0100
----------------------------------------------------------------------
airflow/models.py | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6f0a0d2f/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 74a54bb..8931ac6 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3823,11 +3823,10 @@ class DAG(BaseDag, LoggingMixin):
'exception.'.format(task.task_id),
category=PendingDeprecationWarning)
else:
- self.tasks.append(task)
self.task_dict[task.task_id] = task
task.dag = self
- self.task_count = len(self.tasks)
+ self.task_count = len(self.task_dict)
def add_tasks(self, tasks):
"""
[3/6] incubator-airflow git commit: [AIRFLOW-2203] Speed up Operator
Resources
Posted by fo...@apache.org.
[AIRFLOW-2203] Speed up Operator Resources
Set default values of Resources loaded from the configuration to prevent
4x Config lookups for every task created.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/92357d53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/92357d53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/92357d53
Branch: refs/heads/master
Commit: 92357d53e60a6c88afe6792923ff68070ec81df3
Parents: 16ab314
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 17:14:00 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:11:44 2018 +0100
----------------------------------------------------------------------
airflow/utils/operator_resources.py | 16 ++++++----------
1 file changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92357d53/airflow/utils/operator_resources.py
----------------------------------------------------------------------
diff --git a/airflow/utils/operator_resources.py b/airflow/utils/operator_resources.py
index d304170..b786e8f 100644
--- a/airflow/utils/operator_resources.py
+++ b/airflow/utils/operator_resources.py
@@ -99,16 +99,12 @@ class Resources(object):
:param gpus: The number of gpu units that are required
:type gpus: long
"""
- def __init__(self, cpus=None, ram=None, disk=None, gpus=None):
- if cpus is None:
- cpus = configuration.getint('operators', 'default_cpus')
- if ram is None:
- ram = configuration.getint('operators', 'default_ram')
- if disk is None:
- disk = configuration.getint('operators', 'default_disk')
- if gpus is None:
- gpus = configuration.getint('operators', 'default_gpus')
-
+ def __init__(self,
+ cpus=configuration.getint('operators', 'default_cpus'),
+ ram=configuration.getint('operators', 'default_ram'),
+ disk=configuration.getint('operators', 'default_disk'),
+ gpus=configuration.getint('operators', 'default_gpus')
+ ):
self.cpus = CpuResource(cpus)
self.ram = RamResource(ram)
self.disk = DiskResource(disk)
[6/6] incubator-airflow git commit: [AIRFLOW-2203] Defer cycle
detection
Posted by fo...@apache.org.
[AIRFLOW-2203] Defer cycle detection
Moved from adding_task to when dag is being bagged.
This changes import dag runtime from polynomial to somewhat linear.
Closes #3116 from wongwill86:dag_import_speed
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c3730650
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c3730650
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c3730650
Branch: refs/heads/master
Commit: c3730650c852cd7a5e06a5933f5064bbb04e0e88
Parents: 6f0a0d2
Author: wongwill86 <wo...@gmail.com>
Authored: Mon Mar 12 16:53:01 2018 -0400
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:13:59 2018 +0100
----------------------------------------------------------------------
airflow/exceptions.py | 4 +
airflow/models.py | 111 ++++++++---
tests/core.py | 16 --
tests/models.py | 450 ++++++++++++++++++++++++++++++++++++++++++++-
4 files changed, 533 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/airflow/exceptions.py
----------------------------------------------------------------------
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 90d3e22..c1b728c 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -34,3 +34,7 @@ class AirflowTaskTimeout(AirflowException):
class AirflowSkipException(AirflowException):
pass
+
+
+class AirflowDagCycleException(AirflowException):
+ pass
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 8931ac6..c1b608a 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -22,7 +22,7 @@ from future.standard_library import install_aliases
from builtins import str
from builtins import object, bytes
import copy
-from collections import namedtuple
+from collections import namedtuple, defaultdict
from datetime import timedelta
import dill
@@ -63,7 +63,9 @@ import six
from airflow import settings, utils
from airflow.executors import GetDefaultExecutor, LocalExecutor
from airflow import configuration
-from airflow.exceptions import AirflowException, AirflowSkipException, AirflowTaskTimeout
+from airflow.exceptions import (
+ AirflowDagCycleException, AirflowException, AirflowSkipException, AirflowTaskTimeout
+)
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
@@ -122,7 +124,6 @@ else:
# Used by DAG context_managers
_CONTEXT_MANAGER_DAG = None
-
def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
"""
Clears a set of task instances, but makes sure the running ones
@@ -183,6 +184,11 @@ class DagBag(BaseDagBag, LoggingMixin):
:type include_examples: bool
"""
+ # static class variables to detetct dag cycle
+ CYCLE_NEW = 0
+ CYCLE_IN_PROGRESS = 1
+ CYCLE_DONE = 2
+
def __init__(
self,
dag_folder=None,
@@ -335,10 +341,17 @@ class DagBag(BaseDagBag, LoggingMixin):
dag.full_filepath = filepath
if dag.fileloc != filepath:
dag.fileloc = filepath
- dag.is_subdag = False
- self.bag_dag(dag, parent_dag=dag, root_dag=dag)
- found_dags.append(dag)
- found_dags += dag.subdags
+ try:
+ dag.is_subdag = False
+ self.bag_dag(dag, parent_dag=dag, root_dag=dag)
+ found_dags.append(dag)
+ found_dags += dag.subdags
+ except AirflowDagCycleException as cycle_exception:
+ self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
+ self.import_errors[dag.full_filepath] = str(cycle_exception)
+ self.file_last_changed[dag.full_filepath] = \
+ file_last_changed_on_disk
+
self.file_last_changed[filepath] = file_last_changed_on_disk
return found_dags
@@ -381,20 +394,39 @@ class DagBag(BaseDagBag, LoggingMixin):
def bag_dag(self, dag, parent_dag, root_dag):
"""
Adds the DAG into the bag, recurses into sub dags.
+ Throws AirflowDagCycleException if a cycle is detected in this dag or its subdags
"""
- self.dags[dag.dag_id] = dag
+
+ dag.test_cycle() # throws if a task cycle is found
+
dag.resolve_template_files()
dag.last_loaded = timezone.utcnow()
for task in dag.tasks:
settings.policy(task)
- for subdag in dag.subdags:
- subdag.full_filepath = dag.full_filepath
- subdag.parent_dag = dag
- subdag.is_subdag = True
- self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
- self.log.debug('Loaded DAG {dag}'.format(**locals()))
+ subdags = dag.subdags
+
+ try:
+ for subdag in subdags:
+ subdag.full_filepath = dag.full_filepath
+ subdag.parent_dag = dag
+ subdag.is_subdag = True
+ self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
+
+ self.dags[dag.dag_id] = dag
+ self.log.debug('Loaded DAG {dag}'.format(**locals()))
+ except AirflowDagCycleException as cycle_exception:
+ # There was an error in bagging the dag. Remove it from the list of dags
+ self.log.exception('Exception bagging dag: {dag.dag_id}'.format(**locals()))
+ # Only necessary at the root level since DAG.subdags automatically
+ # performs DFS to search through all subdags
+ if dag == root_dag:
+ for subdag in subdags:
+ if subdag.dag_id in self.dags:
+ del self.dags[subdag.dag_id]
+ raise cycle_exception
+
def collect_dags(
self,
@@ -2699,21 +2731,6 @@ class BaseOperator(LoggingMixin):
return list(map(lambda task_id: self._dag.task_dict[task_id],
self.get_flat_relative_ids(upstream)))
- def detect_downstream_cycle(self, task=None):
- """
- When invoked, this routine will raise an exception if a cycle is
- detected downstream from self. It is invoked when tasks are added to
- the DAG to detect cycles.
- """
- if not task:
- task = self
- for t in self.get_direct_relatives():
- if task is t:
- msg = "Cycle detected in DAG. Faulty task: {0}".format(task)
- raise AirflowException(msg)
- else:
- t.detect_downstream_cycle(task=task)
- return False
def run(
self,
@@ -4074,6 +4091,42 @@ class DAG(BaseDag, LoggingMixin):
qry = qry.filter(TaskInstance.state.in_(states))
return qry.scalar()
+ def test_cycle(self):
+ '''
+ Check to see if there are any cycles in the DAG. Returns False if no cycle found,
+ otherwise raises exception.
+ '''
+
+ # default of int is 0 which corresponds to CYCLE_NEW
+ visit_map = defaultdict(int)
+ for task_id in self.task_dict.keys():
+ # print('starting %s' % task_id)
+ if visit_map[task_id] == DagBag.CYCLE_NEW:
+ self._test_cycle_helper(visit_map, task_id)
+ return False
+
+ def _test_cycle_helper(self, visit_map, task_id):
+ '''
+ Checks if a cycle exists from the input task using DFS traversal
+ '''
+
+ # print('Inspecting %s' % task_id)
+ if visit_map[task_id] == DagBag.CYCLE_DONE:
+ return False
+
+ visit_map[task_id] = DagBag.CYCLE_IN_PROGRESS
+
+ task = self.task_dict[task_id]
+ for descendant_id in task.get_direct_relative_ids():
+ if visit_map[descendant_id] == DagBag.CYCLE_IN_PROGRESS:
+ msg = "Cycle detected in DAG. Faulty task: {0} to {1}".format(
+ task_id, descendant_id)
+ raise AirflowDagCycleException(msg)
+ else:
+ self._test_cycle_helper(visit_map, descendant_id)
+
+ visit_map[task_id] = DagBag.CYCLE_DONE
+
class Chart(Base):
__tablename__ = "chart"
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 03372db..ce5fb7a 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -781,22 +781,6 @@ class CoreTest(unittest.TestCase):
with self.assertRaisesRegexp(AirflowException, regexp):
self.run_after_loop.set_upstream(self.runme_0)
- def test_cyclic_dependencies_1(self):
-
- regexp = "Cycle detected in DAG. (.*)runme_0(.*)"
- with self.assertRaisesRegexp(AirflowException, regexp):
- self.runme_0.set_upstream(self.run_after_loop)
-
- def test_cyclic_dependencies_2(self):
- regexp = "Cycle detected in DAG. (.*)run_after_loop(.*)"
- with self.assertRaisesRegexp(AirflowException, regexp):
- self.run_after_loop.set_downstream(self.runme_0)
-
- def test_cyclic_dependencies_3(self):
- regexp = "Cycle detected in DAG. (.*)run_this_last(.*)"
- with self.assertRaisesRegexp(AirflowException, regexp):
- self.run_this_last.set_downstream(self.runme_0)
-
def test_bad_trigger_rule(self):
with self.assertRaises(AirflowException):
DummyOperator(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3730650/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index c8ee037..5d8184c 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -26,9 +26,11 @@ import time
import six
import re
import urllib
+import textwrap
+import inspect
from airflow import configuration, models, settings, AirflowException
-from airflow.exceptions import AirflowSkipException
+from airflow.exceptions import AirflowDagCycleException, AirflowSkipException
from airflow.jobs import BackfillJob
from airflow.models import DAG, TaskInstance as TI
from airflow.models import State as ST
@@ -47,7 +49,7 @@ from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
from mock import patch
from parameterized import parameterized
-
+from tempfile import NamedTemporaryFile
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
TEST_DAGS_FOLDER = os.path.join(
@@ -388,6 +390,132 @@ class DagTest(unittest.TestCase):
result = task.render_template('', "{{ 'world' | hello}}", dict())
self.assertEqual(result, 'Hello world')
+ def test_cycle(self):
+ # test empty
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ self.assertFalse(dag.test_cycle())
+
+ # test single task
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ with dag:
+ opA = DummyOperator(task_id='A')
+
+ self.assertFalse(dag.test_cycle())
+
+ # test no cycle
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ # A -> B -> C
+ # B -> D
+ # E -> F
+ with dag:
+ opA = DummyOperator(task_id='A')
+ opB = DummyOperator(task_id='B')
+ opC = DummyOperator(task_id='C')
+ opD = DummyOperator(task_id='D')
+ opE = DummyOperator(task_id='E')
+ opF = DummyOperator(task_id='F')
+ opA.set_downstream(opB)
+ opB.set_downstream(opC)
+ opB.set_downstream(opD)
+ opE.set_downstream(opF)
+
+ self.assertFalse(dag.test_cycle())
+
+ # test self loop
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ # A -> A
+ with dag:
+ opA = DummyOperator(task_id='A')
+ opA.set_downstream(opA)
+
+ with self.assertRaises(AirflowDagCycleException):
+ dag.test_cycle()
+
+ # test downstream self loop
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ # A -> B -> C -> D -> E -> E
+ with dag:
+ opA = DummyOperator(task_id='A')
+ opB = DummyOperator(task_id='B')
+ opC = DummyOperator(task_id='C')
+ opD = DummyOperator(task_id='D')
+ opE = DummyOperator(task_id='E')
+ opA.set_downstream(opB)
+ opB.set_downstream(opC)
+ opC.set_downstream(opD)
+ opD.set_downstream(opE)
+ opE.set_downstream(opE)
+
+ with self.assertRaises(AirflowDagCycleException):
+ dag.test_cycle()
+
+ # large loop
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ # A -> B -> C -> D -> E -> A
+ with dag:
+ opA = DummyOperator(task_id='A')
+ opB = DummyOperator(task_id='B')
+ opC = DummyOperator(task_id='C')
+ opD = DummyOperator(task_id='D')
+ opE = DummyOperator(task_id='E')
+ opA.set_downstream(opB)
+ opB.set_downstream(opC)
+ opC.set_downstream(opD)
+ opD.set_downstream(opE)
+ opE.set_downstream(opA)
+
+ with self.assertRaises(AirflowDagCycleException):
+ dag.test_cycle()
+
+ # test arbitrary loop
+ dag = DAG(
+ 'dag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ # E-> A -> B -> F -> A
+ # -> C -> F
+ with dag:
+ opA = DummyOperator(task_id='A')
+ opB = DummyOperator(task_id='B')
+ opC = DummyOperator(task_id='C')
+ opD = DummyOperator(task_id='D')
+ opE = DummyOperator(task_id='E')
+ opF = DummyOperator(task_id='F')
+ opA.set_downstream(opB)
+ opA.set_downstream(opC)
+ opE.set_downstream(opA)
+ opC.set_downstream(opF)
+ opB.set_downstream(opF)
+ opF.set_downstream(opA)
+
+ with self.assertRaises(AirflowDagCycleException):
+ dag.test_cycle()
+
class DagStatTest(unittest.TestCase):
def test_dagstats_crud(self):
@@ -796,7 +924,6 @@ class DagBagTest(unittest.TestCase):
"""
test that we're able to parse file that contains multi-byte char
"""
- from tempfile import NamedTemporaryFile
f = NamedTemporaryFile()
f.write('\u3042'.encode('utf8')) # write multi-byte char (hiragana)
f.flush()
@@ -857,6 +984,323 @@ class DagBagTest(unittest.TestCase):
self.assertTrue(
dag.fileloc.endswith('airflow/example_dags/' + path))
+ def process_dag(self, create_dag):
+ """
+ Helper method to process a file generated from the input create_dag function.
+ """
+ # write source to file
+ source = textwrap.dedent(''.join(
+ inspect.getsource(create_dag).splitlines(True)[1:-1]))
+ f = NamedTemporaryFile()
+ f.write(source.encode('utf8'))
+ f.flush()
+
+ dagbag = models.DagBag(include_examples=False)
+ found_dags = dagbag.process_file(f.name)
+ return (dagbag, found_dags, f.name)
+
+ def validate_dags(self, expected_parent_dag, actual_found_dags, actual_dagbag,
+ should_be_found=True):
+ expected_dag_ids = list(map(lambda dag: dag.dag_id, expected_parent_dag.subdags))
+ expected_dag_ids.append(expected_parent_dag.dag_id)
+
+ actual_found_dag_ids = list(map(lambda dag: dag.dag_id, actual_found_dags))
+
+ for dag_id in expected_dag_ids:
+ actual_dagbag.log.info('validating %s' % dag_id)
+ self.assertEquals(
+ dag_id in actual_found_dag_ids, should_be_found,
+ 'dag "%s" should %shave been found after processing dag "%s"' %
+ (dag_id, '' if should_be_found else 'not ', expected_parent_dag.dag_id)
+ )
+ self.assertEquals(
+ dag_id in actual_dagbag.dags, should_be_found,
+ 'dag "%s" should %sbe in dagbag.dags after processing dag "%s"' %
+ (dag_id, '' if should_be_found else 'not ', expected_parent_dag.dag_id)
+ )
+
+ def test_load_subdags(self):
+ # Define Dag to load
+ def standard_subdag():
+ from airflow.models import DAG
+ from airflow.operators.dummy_operator import DummyOperator
+ from airflow.operators.subdag_operator import SubDagOperator
+ import datetime
+ DAG_NAME = 'master'
+ DEFAULT_ARGS = {
+ 'owner': 'owner1',
+ 'start_date': datetime.datetime(2016, 1, 1)
+ }
+ dag = DAG(
+ DAG_NAME,
+ default_args=DEFAULT_ARGS)
+
+ # master:
+ # A -> opSubDag_0
+ # master.opsubdag_0:
+ # -> subdag_0.task
+ # A -> opSubDag_1
+ # master.opsubdag_1:
+ # -> subdag_1.task
+
+ with dag:
+ def subdag_0():
+ subdag_0 = DAG('master.opSubdag_0', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_0.task', dag=subdag_0)
+ return subdag_0
+
+ def subdag_1():
+ subdag_1 = DAG('master.opSubdag_1', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_1.task', dag=subdag_1)
+ return subdag_1
+
+ opSubdag_0 = SubDagOperator(
+ task_id='opSubdag_0', dag=dag, subdag=subdag_0())
+ opSubdag_1 = SubDagOperator(
+ task_id='opSubdag_1', dag=dag, subdag=subdag_1())
+
+ opA = DummyOperator(task_id='A')
+ opA.set_downstream(opSubdag_0)
+ opA.set_downstream(opSubdag_1)
+ return dag
+
+ testDag = standard_subdag()
+ # sanity check to make sure DAG.subdag is still functioning properly
+ self.assertEqual(len(testDag.subdags), 2)
+
+ # Perform processing dag
+ dagbag, found_dags, _ = self.process_dag(standard_subdag)
+
+ # Validate correctness
+ # all dags from testDag should be listed
+ self.validate_dags(testDag, found_dags, dagbag)
+
+ # Define Dag to load
+ def nested_subdags():
+ from airflow.models import DAG
+ from airflow.operators.dummy_operator import DummyOperator
+ from airflow.operators.subdag_operator import SubDagOperator
+ import datetime
+ DAG_NAME = 'master'
+ DEFAULT_ARGS = {
+ 'owner': 'owner1',
+ 'start_date': datetime.datetime(2016, 1, 1)
+ }
+ dag = DAG(
+ DAG_NAME,
+ default_args=DEFAULT_ARGS)
+
+ # master:
+ # A -> opSubdag_0
+ # master.opSubdag_0:
+ # -> opSubDag_A
+ # master.opSubdag_0.opSubdag_A:
+ # -> subdag_A.task
+ # -> opSubdag_B
+ # master.opSubdag_0.opSubdag_B:
+ # -> subdag_B.task
+ # A -> opSubdag_1
+ # master.opSubdag_1:
+ # -> opSubdag_C
+ # master.opSubdag_1.opSubdag_C:
+ # -> subdag_C.task
+ # -> opSubDag_D
+ # master.opSubdag_1.opSubdag_D:
+ # -> subdag_D.task
+
+ with dag:
+ def subdag_A():
+ subdag_A = DAG(
+ 'master.opSubdag_0.opSubdag_A', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_A.task', dag=subdag_A)
+ return subdag_A
+
+ def subdag_B():
+ subdag_B = DAG(
+ 'master.opSubdag_0.opSubdag_B', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_B.task', dag=subdag_B)
+ return subdag_B
+
+ def subdag_C():
+ subdag_C = DAG(
+ 'master.opSubdag_1.opSubdag_C', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_C.task', dag=subdag_C)
+ return subdag_C
+
+ def subdag_D():
+ subdag_D = DAG(
+ 'master.opSubdag_1.opSubdag_D', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_D.task', dag=subdag_D)
+ return subdag_D
+
+ def subdag_0():
+ subdag_0 = DAG('master.opSubdag_0', default_args=DEFAULT_ARGS)
+ SubDagOperator(task_id='opSubdag_A', dag=subdag_0, subdag=subdag_A())
+ SubDagOperator(task_id='opSubdag_B', dag=subdag_0, subdag=subdag_B())
+ return subdag_0
+
+ def subdag_1():
+ subdag_1 = DAG('master.opSubdag_1', default_args=DEFAULT_ARGS)
+ SubDagOperator(task_id='opSubdag_C', dag=subdag_1, subdag=subdag_C())
+ SubDagOperator(task_id='opSubdag_D', dag=subdag_1, subdag=subdag_D())
+ return subdag_1
+
+ opSubdag_0 = SubDagOperator(
+ task_id='opSubdag_0', dag=dag, subdag=subdag_0())
+ opSubdag_1 = SubDagOperator(
+ task_id='opSubdag_1', dag=dag, subdag=subdag_1())
+
+ opA = DummyOperator(task_id='A')
+ opA.set_downstream(opSubdag_0)
+ opA.set_downstream(opSubdag_1)
+
+ return dag
+
+ testDag = nested_subdags()
+ # sanity check to make sure DAG.subdag is still functioning properly
+ self.assertEqual(len(testDag.subdags), 6)
+
+ # Perform processing dag
+ dagbag, found_dags, _ = self.process_dag(nested_subdags)
+
+ # Validate correctness
+ # all dags from testDag should be listed
+ self.validate_dags(testDag, found_dags, dagbag)
+
+ def test_skip_cycle_dags(self):
+ """
+ Don't crash when loading an invalid (contains a cycle) DAG file.
+ Don't load the dag into the DagBag either
+ """
+ # Define Dag to load
+ def basic_cycle():
+ from airflow.models import DAG
+ from airflow.operators.dummy_operator import DummyOperator
+ import datetime
+ DAG_NAME = 'cycle_dag'
+ DEFAULT_ARGS = {
+ 'owner': 'owner1',
+ 'start_date': datetime.datetime(2016, 1, 1)
+ }
+ dag = DAG(
+ DAG_NAME,
+ default_args=DEFAULT_ARGS)
+
+ # A -> A
+ with dag:
+ opA = DummyOperator(task_id='A')
+ opA.set_downstream(opA)
+
+ return dag
+
+ testDag = basic_cycle()
+ # sanity check to make sure DAG.subdag is still functioning properly
+ self.assertEqual(len(testDag.subdags), 0)
+
+ # Perform processing dag
+ dagbag, found_dags, file_path = self.process_dag(basic_cycle)
+
+ # #Validate correctness
+ # None of the dags should be found
+ self.validate_dags(testDag, found_dags, dagbag, should_be_found=False)
+ self.assertIn(file_path, dagbag.import_errors)
+
+ # Define Dag to load
+ def nested_subdag_cycle():
+ from airflow.models import DAG
+ from airflow.operators.dummy_operator import DummyOperator
+ from airflow.operators.subdag_operator import SubDagOperator
+ import datetime
+ DAG_NAME = 'nested_cycle'
+ DEFAULT_ARGS = {
+ 'owner': 'owner1',
+ 'start_date': datetime.datetime(2016, 1, 1)
+ }
+ dag = DAG(
+ DAG_NAME,
+ default_args=DEFAULT_ARGS)
+
+ # cycle:
+ # A -> opSubdag_0
+ # cycle.opSubdag_0:
+ # -> opSubDag_A
+ # cycle.opSubdag_0.opSubdag_A:
+ # -> subdag_A.task
+ # -> opSubdag_B
+ # cycle.opSubdag_0.opSubdag_B:
+ # -> subdag_B.task
+ # A -> opSubdag_1
+ # cycle.opSubdag_1:
+ # -> opSubdag_C
+ # cycle.opSubdag_1.opSubdag_C:
+ # -> subdag_C.task -> subdag_C.task >Invalid Loop<
+ # -> opSubDag_D
+ # cycle.opSubdag_1.opSubdag_D:
+ # -> subdag_D.task
+
+ with dag:
+ def subdag_A():
+ subdag_A = DAG(
+ 'nested_cycle.opSubdag_0.opSubdag_A', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_A.task', dag=subdag_A)
+ return subdag_A
+
+ def subdag_B():
+ subdag_B = DAG(
+ 'nested_cycle.opSubdag_0.opSubdag_B', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_B.task', dag=subdag_B)
+ return subdag_B
+
+ def subdag_C():
+ subdag_C = DAG(
+ 'nested_cycle.opSubdag_1.opSubdag_C', default_args=DEFAULT_ARGS)
+ opSubdag_C_task = DummyOperator(
+ task_id='subdag_C.task', dag=subdag_C)
+ # introduce a loop in opSubdag_C
+ opSubdag_C_task.set_downstream(opSubdag_C_task)
+ return subdag_C
+
+ def subdag_D():
+ subdag_D = DAG(
+ 'nested_cycle.opSubdag_1.opSubdag_D', default_args=DEFAULT_ARGS)
+ DummyOperator(task_id='subdag_D.task', dag=subdag_D)
+ return subdag_D
+
+ def subdag_0():
+ subdag_0 = DAG('nested_cycle.opSubdag_0', default_args=DEFAULT_ARGS)
+ SubDagOperator(task_id='opSubdag_A', dag=subdag_0, subdag=subdag_A())
+ SubDagOperator(task_id='opSubdag_B', dag=subdag_0, subdag=subdag_B())
+ return subdag_0
+
+ def subdag_1():
+ subdag_1 = DAG('nested_cycle.opSubdag_1', default_args=DEFAULT_ARGS)
+ SubDagOperator(task_id='opSubdag_C', dag=subdag_1, subdag=subdag_C())
+ SubDagOperator(task_id='opSubdag_D', dag=subdag_1, subdag=subdag_D())
+ return subdag_1
+
+ opSubdag_0 = SubDagOperator(
+ task_id='opSubdag_0', dag=dag, subdag=subdag_0())
+ opSubdag_1 = SubDagOperator(
+ task_id='opSubdag_1', dag=dag, subdag=subdag_1())
+
+ opA = DummyOperator(task_id='A')
+ opA.set_downstream(opSubdag_0)
+ opA.set_downstream(opSubdag_1)
+
+ return dag
+
+ testDag = nested_subdag_cycle()
+ # sanity check to make sure DAG.subdag is still functioning properly
+ self.assertEqual(len(testDag.subdags), 6)
+
+ # Perform processing dag
+ dagbag, found_dags, file_path = self.process_dag(nested_subdag_cycle)
+
+ # Validate correctness
+ # None of the dags should be found
+ self.validate_dags(testDag, found_dags, dagbag, should_be_found=False)
+ self.assertIn(file_path, dagbag.import_errors)
+
def test_process_file_with_none(self):
"""
test that process_file can handle Nones