You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/28 12:49:31 UTC

[airflow] branch v1-10-test updated (e0bdf95 -> 78fd2cb)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from e0bdf95  [AIRFLOW-5413] Allow K8S worker pod to be configured from JSON/YAML file (#6230)
     new 00e9cbf  Document default timeout value for SSHOperator (#8744)
     new 78fd2cb  [AIRFLOW-5500] Fix the trigger_dag api in the case of nested subdags

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/api/common/experimental/trigger_dag.py    | 12 ++++-------
 airflow/contrib/operators/ssh_operator.py         |  2 +-
 tests/api/common/experimental/test_trigger_dag.py | 25 +++++++++++++++++++++++
 3 files changed, 30 insertions(+), 9 deletions(-)


[airflow] 01/02: Document default timeout value for SSHOperator (#8744)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 00e9cbf0d6c2bbb63f6b210ef788d3818402826e
Author: Abhilash Kishore <ab...@gmail.com>
AuthorDate: Sat May 9 17:35:41 2020 -0700

    Document default timeout value for SSHOperator (#8744)
    
    (cherry picked from commit 21cc7d729827e9f3af0698bf647b2d41fc87b11c)
---
 airflow/contrib/operators/ssh_operator.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py
index 445f7b4..1322b4f 100644
--- a/airflow/contrib/operators/ssh_operator.py
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -43,7 +43,7 @@ class SSHOperator(BaseOperator):
     :type remote_host: str
     :param command: command to execute on remote host. (templated)
     :type command: str
-    :param timeout: timeout (in seconds) for executing the command.
+    :param timeout: timeout (in seconds) for executing the command. The default is 10 seconds.
     :type timeout: int
     :param environment: a dict of shell environment variables. Note that the
         server will reject them silently if `AcceptEnv` is not set in SSH config.


[airflow] 02/02: [AIRFLOW-5500] Fix the trigger_dag api in the case of nested subdags

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 78fd2cb772830810d7e51a028c9de0cb751debba
Author: Charles Bournhonesque <ch...@gmail.com>
AuthorDate: Fri Jun 5 11:50:19 2020 -0400

    [AIRFLOW-5500] Fix the trigger_dag api in the case of nested subdags
    
    Co-authored-by: Charles Bournhonesque <ch...@benevolent.ai>
    (cherry picked from commit 16e06f802b30ce7c3972b1a8165e3d6327dee761)
---
 airflow/api/common/experimental/trigger_dag.py    | 12 ++++-------
 tests/api/common/experimental/test_trigger_dag.py | 25 +++++++++++++++++++++++
 2 files changed, 29 insertions(+), 8 deletions(-)

diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py
index e7aad06..7adfac6 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -85,12 +85,10 @@ def _trigger_dag(
         else:
             run_conf = json.loads(conf)
 
-    triggers = list()
-    dags_to_trigger = list()
-    dags_to_trigger.append(dag)
-    while dags_to_trigger:
-        dag = dags_to_trigger.pop()
-        trigger = dag.create_dagrun(
+    triggers = []
+    dags_to_trigger = [dag] + dag.subdags
+    for _dag in dags_to_trigger:
+        trigger = _dag.create_dagrun(
             run_id=run_id,
             execution_date=execution_date,
             state=State.RUNNING,
@@ -98,8 +96,6 @@ def _trigger_dag(
             external_trigger=True,
         )
         triggers.append(trigger)
-        if dag.subdags:
-            dags_to_trigger.extend(dag.subdags)
     return triggers
 
 
diff --git a/tests/api/common/experimental/test_trigger_dag.py b/tests/api/common/experimental/test_trigger_dag.py
index 396268d..66fc007 100644
--- a/tests/api/common/experimental/test_trigger_dag.py
+++ b/tests/api/common/experimental/test_trigger_dag.py
@@ -89,6 +89,31 @@ class TriggerDagTests(unittest.TestCase):
 
         self.assertEqual(3, len(triggers))
 
+    @mock.patch('airflow.models.DAG')
+    @mock.patch('airflow.models.DagRun')
+    @mock.patch('airflow.models.DagBag')
+    def test_trigger_dag_include_nested_subdags(self, dag_bag_mock, dag_run_mock, dag_mock):
+        dag_id = "trigger_dag"
+        dag_bag_mock.dags = [dag_id]
+        dag_bag_mock.get_dag.return_value = dag_mock
+        dag_run_mock.find.return_value = None
+        dag1 = mock.MagicMock()
+        dag1.subdags = []
+        dag2 = mock.MagicMock()
+        dag2.subdags = [dag1]
+        dag_mock.subdags = [dag1, dag2]
+
+        triggers = _trigger_dag(
+            dag_id,
+            dag_bag_mock,
+            dag_run_mock,
+            run_id=None,
+            conf=None,
+            execution_date=None,
+            replace_microseconds=True)
+
+        self.assertEqual(3, len(triggers))
+
     @mock.patch('airflow.models.DagBag')
     def test_trigger_dag_with_str_conf(self, dag_bag_mock):
         dag_id = "trigger_dag_with_str_conf"