You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/02/19 08:48:29 UTC

incubator-airflow git commit: [AIRFLOW-881] Check if SubDagOperator is in DAG context manager

Repository: incubator-airflow
Updated Branches:
  refs/heads/master a279be65b -> 0ed608dca


[AIRFLOW-881] Check if SubDagOperator is in DAG context manager

When initializing a SubDagOperator, the `dag`
param should not be
required if it is within a DAG context manager. So
we check if that
is the case and use that as the parent DAG if
found (and `dag` param
is not specified).

Closes #2087 from dhuang/AIRFLOW-881


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0ed608dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0ed608dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0ed608dc

Branch: refs/heads/master
Commit: 0ed608dca7b465a4107e7910dfaa838fbd702fdd
Parents: a279be6
Author: Daniel Huang <dx...@gmail.com>
Authored: Sun Feb 19 09:48:21 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Feb 19 09:48:21 2017 +0100

----------------------------------------------------------------------
 airflow/operators/subdag_operator.py |  8 +++++---
 tests/operators/subdag_operator.py   | 11 +++++++++++
 2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0ed608dc/airflow/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/subdag_operator.py b/airflow/operators/subdag_operator.py
index a44e210..569d0a0 100644
--- a/airflow/operators/subdag_operator.py
+++ b/airflow/operators/subdag_operator.py
@@ -41,9 +41,11 @@ class SubDagOperator(BaseOperator):
         :param dag: the parent DAG
         :type subdag: airflow.DAG
         """
-        if 'dag' not in kwargs:
-            raise AirflowException("Please pass in the `dag` param")
-        dag = kwargs['dag']
+        import airflow.models
+        dag = kwargs.get('dag') or airflow.models._CONTEXT_MANAGER_DAG
+        if not dag:
+            raise AirflowException('Please pass in the `dag` param or call '
+                                   'within a DAG context manager')
         session = kwargs.pop('session')
         super(SubDagOperator, self).__init__(*args, **kwargs)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0ed608dc/tests/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py
index 6a25ac3..eeb41ec 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -54,6 +54,17 @@ class SubDagOperatorTests(unittest.TestCase):
             AirflowException,
             SubDagOperator, task_id='test', dag=dag, subdag=subdag_bad3)
 
+    def test_subdag_in_context_manager(self):
+        """
+        Creating a sub DAG within a main DAG's context manager
+        """
+        with DAG('parent', default_args=default_args) as dag:
+            subdag = DAG('parent.test', default_args=default_args)
+            op = SubDagOperator(task_id='test', subdag=subdag)
+
+            self.assertEqual(op.dag, dag)
+            self.assertEqual(op.subdag, subdag)
+
     def test_subdag_pools(self):
         """
         Subdags and subdag tasks can't both have a pool with 1 slot