You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/02/19 08:48:29 UTC
incubator-airflow git commit: [AIRFLOW-881] Check if SubDagOperator
is in DAG context manager
Repository: incubator-airflow
Updated Branches:
refs/heads/master a279be65b -> 0ed608dca
[AIRFLOW-881] Check if SubDagOperator is in DAG context manager
When initializing a SubDagOperator, the `dag`
param should not be
required if it is within a DAG context manager. So
we check if that
is the case and use that as the parent DAG if
found (and `dag` param
is not specified).
Closes #2087 from dhuang/AIRFLOW-881
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0ed608dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0ed608dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0ed608dc
Branch: refs/heads/master
Commit: 0ed608dca7b465a4107e7910dfaa838fbd702fdd
Parents: a279be6
Author: Daniel Huang <dx...@gmail.com>
Authored: Sun Feb 19 09:48:21 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Feb 19 09:48:21 2017 +0100
----------------------------------------------------------------------
airflow/operators/subdag_operator.py | 8 +++++---
tests/operators/subdag_operator.py | 11 +++++++++++
2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0ed608dc/airflow/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/subdag_operator.py b/airflow/operators/subdag_operator.py
index a44e210..569d0a0 100644
--- a/airflow/operators/subdag_operator.py
+++ b/airflow/operators/subdag_operator.py
@@ -41,9 +41,11 @@ class SubDagOperator(BaseOperator):
:param dag: the parent DAG
:type subdag: airflow.DAG
"""
- if 'dag' not in kwargs:
- raise AirflowException("Please pass in the `dag` param")
- dag = kwargs['dag']
+ import airflow.models
+ dag = kwargs.get('dag') or airflow.models._CONTEXT_MANAGER_DAG
+ if not dag:
+ raise AirflowException('Please pass in the `dag` param or call '
+ 'within a DAG context manager')
session = kwargs.pop('session')
super(SubDagOperator, self).__init__(*args, **kwargs)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0ed608dc/tests/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py
index 6a25ac3..eeb41ec 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -54,6 +54,17 @@ class SubDagOperatorTests(unittest.TestCase):
AirflowException,
SubDagOperator, task_id='test', dag=dag, subdag=subdag_bad3)
+ def test_subdag_in_context_manager(self):
+ """
+ Creating a sub DAG within a main DAG's context manager
+ """
+ with DAG('parent', default_args=default_args) as dag:
+ subdag = DAG('parent.test', default_args=default_args)
+ op = SubDagOperator(task_id='test', subdag=subdag)
+
+ self.assertEqual(op.dag, dag)
+ self.assertEqual(op.subdag, subdag)
+
def test_subdag_pools(self):
"""
Subdags and subdag tasks can't both have a pool with 1 slot