You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/01/22 08:50:25 UTC

incubator-airflow git commit: [AIRFLOW-2019] Update DataflowHook for updating Streaming type job

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 97ca9791c -> 279481968


[AIRFLOW-2019] Update DataflowHook for updating Streaming type job

Closes #2965 from ivanwirawan/AIRFLOW-2019


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/27948196
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/27948196
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/27948196

Branch: refs/heads/master
Commit: 2794819687952328c4c86a02dffe45aef21f36de
Parents: 97ca979
Author: Ivan Wirawan <iv...@gmail.com>
Authored: Mon Jan 22 09:50:11 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Jan 22 09:50:11 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataflow_hook.py | 26 +++++++++++++++++++------
 1 file changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/27948196/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index 72a2225..dc03f05 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -159,8 +159,12 @@ class DataFlowHook(GoogleCloudBaseHook):
         _DataflowJob(self.get_conn(), variables['project'],
                      name, self.poll_sleep).wait_for_done()
 
-    def start_java_dataflow(self, task_id, variables, dataflow, job_class=None):
-        name = task_id + "-" + str(uuid.uuid1())[:8]
+    def start_java_dataflow(self, task_id, variables, dataflow, job_class=None,
+                            append_job_name=True):
+        if append_job_name:
+            name = task_id + "-" + str(uuid.uuid1())[:8]
+        else:
+            name = task_id
         variables['jobName'] = name
 
         def label_formatter(labels_dict):
@@ -171,13 +175,21 @@ class DataFlowHook(GoogleCloudBaseHook):
         self._start_dataflow(task_id, variables, name,
                              command_prefix, label_formatter)
 
-    def start_template_dataflow(self, task_id, variables, parameters, dataflow_template):
-        name = task_id + "-" + str(uuid.uuid1())[:8]
+    def start_template_dataflow(self, task_id, variables, parameters, dataflow_template,
+                                append_job_name=True):
+        if append_job_name:
+            name = task_id + "-" + str(uuid.uuid1())[:8]
+        else:
+            name = task_id
         self._start_template_dataflow(
             name, variables, parameters, dataflow_template)
 
-    def start_python_dataflow(self, task_id, variables, dataflow, py_options):
-        name = task_id + "-" + str(uuid.uuid1())[:8]
+    def start_python_dataflow(self, task_id, variables, dataflow, py_options,
+                              append_job_name=True):
+        if append_job_name:
+            name = task_id + "-" + str(uuid.uuid1())[:8]
+        else:
+            name = task_id
         variables["job_name"] = name
 
         def label_formatter(labels_dict):
@@ -193,6 +205,8 @@ class DataFlowHook(GoogleCloudBaseHook):
             for attr, value in variables.items():
                 if attr == 'labels':
                     command += label_formatter(value)
+                elif value is None or value.__len__() < 1:
+                    command.append("--" + attr)
                 else:
                     command.append("--" + attr + "=" + value)
         return command