You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/09/17 19:25:25 UTC

[GitHub] seelmann closed pull request #3540: [AIRFLOW-2639] Dagrun of subdags is set to RUNNING immediately

seelmann closed pull request #3540: [AIRFLOW-2639] Dagrun of subdags is set to RUNNING immediately
URL: https://github.com/apache/incubator-airflow/pull/3540
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py
index 86be6aa544..256a642ec6 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -61,22 +61,14 @@ def _trigger_dag(
     if conf:
         run_conf = json.loads(conf)
 
-    triggers = list()
-    dags_to_trigger = list()
-    dags_to_trigger.append(dag)
-    while dags_to_trigger:
-        dag = dags_to_trigger.pop()
-        trigger = dag.create_dagrun(
-            run_id=run_id,
-            execution_date=execution_date,
-            state=State.RUNNING,
-            conf=run_conf,
-            external_trigger=True,
-        )
-        triggers.append(trigger)
-        if dag.subdags:
-            dags_to_trigger.extend(dag.subdags)
-    return triggers
+    trigger = dag.create_dagrun(
+        run_id=run_id,
+        execution_date=execution_date,
+        state=State.RUNNING,
+        conf=run_conf,
+        external_trigger=True,
+    )
+    return trigger
 
 
 def trigger_dag(
@@ -88,7 +80,7 @@ def trigger_dag(
 ):
     dagbag = DagBag()
     dag_run = DagRun()
-    triggers = _trigger_dag(
+    trigger = _trigger_dag(
         dag_id=dag_id,
         dag_run=dag_run,
         dag_bag=dagbag,
@@ -98,4 +90,4 @@ def trigger_dag(
         replace_microseconds=replace_microseconds,
     )
 
-    return triggers[0] if triggers else None
+    return trigger
diff --git a/airflow/operators/subdag_operator.py b/airflow/operators/subdag_operator.py
index 052095e2a6..0bffc961a8 100644
--- a/airflow/operators/subdag_operator.py
+++ b/airflow/operators/subdag_operator.py
@@ -98,6 +98,7 @@ def __init__(
 
     def execute(self, context):
         ed = context['execution_date']
+        conf = context['dag_run'].conf
         self.subdag.run(
             start_date=ed, end_date=ed, donot_pickle=True,
-            executor=self.executor)
+            executor=self.executor, conf=conf)
diff --git a/tests/api/common/experimental/trigger_dag_tests.py b/tests/api/common/experimental/trigger_dag_tests.py
index d6354840e2..0517599e4c 100644
--- a/tests/api/common/experimental/trigger_dag_tests.py
+++ b/tests/api/common/experimental/trigger_dag_tests.py
@@ -77,7 +77,7 @@ def test_trigger_dag_include_subdags(self, dag_bag_mock, dag_run_mock, dag_mock)
         dag2.subdags = []
         dag_mock.subdags = [dag1, dag2]
 
-        triggers = _trigger_dag(
+        trigger = _trigger_dag(
             dag_id,
             dag_bag_mock,
             dag_run_mock,
@@ -86,7 +86,7 @@ def test_trigger_dag_include_subdags(self, dag_bag_mock, dag_run_mock, dag_mock)
             execution_date=None,
             replace_microseconds=True)
 
-        self.assertEqual(3, len(triggers))
+        self.assertTrue(trigger)
 
 
 if __name__ == '__main__':
diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py
index af47c5cfd5..8436588053 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -19,12 +19,12 @@
 
 import unittest
 
-from mock import Mock
+from mock import Mock, MagicMock
 
 import airflow
 from airflow.exceptions import AirflowException
 from airflow.executors.sequential_executor import SequentialExecutor
-from airflow.models import DAG, DagBag
+from airflow.models import DAG, DagBag, DagRun
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.subdag_operator import SubDagOperator
 from airflow.utils.timezone import datetime
@@ -150,3 +150,25 @@ def test_subdag_executor(self):
         subdag_good = DAG('parent.test', default_args=default_args)
         subdag = SubDagOperator(task_id='test', dag=dag, subdag=subdag_good)
         self.assertEqual(type(subdag.executor), SequentialExecutor)
+
+    def test_forwards_dag_run_conf(self):
+        """
+        Tests that the parent's dag run conf is forwarded to the subdag
+        """
+        dag = DAG('parent', default_args=default_args)
+        subdag = DAG('parent.test', default_args=default_args)
+        subdag.run = MagicMock()
+
+        subdag_op = SubDagOperator(task_id='test', dag=dag, subdag=subdag)
+        ed = DEFAULT_DATE
+        dr = DagRun()
+        dr.conf = {'a': 'foo', 'b': 1, 'c': True, 'd': [1, 2, 3]}
+        context = dict(
+            execution_date=ed,
+            dag_run=dr
+        )
+        subdag_op.execute(context)
+
+        # expect sudgag.run was called with the parent's conf
+        subdag.run.assert_called_once_with(start_date=ed, end_date=ed, donot_pickle=True,
+                                           executor=subdag_op.executor, conf=dr.conf)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services