You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/01/22 08:50:25 UTC
incubator-airflow git commit: [AIRFLOW-2019] Update DataflowHook for
updating Streaming type job
Repository: incubator-airflow
Updated Branches:
refs/heads/master 97ca9791c -> 279481968
[AIRFLOW-2019] Update DataflowHook for updating Streaming type job
Closes #2965 from ivanwirawan/AIRFLOW-2019
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/27948196
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/27948196
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/27948196
Branch: refs/heads/master
Commit: 2794819687952328c4c86a02dffe45aef21f36de
Parents: 97ca979
Author: Ivan Wirawan <iv...@gmail.com>
Authored: Mon Jan 22 09:50:11 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Jan 22 09:50:11 2018 +0100
----------------------------------------------------------------------
airflow/contrib/hooks/gcp_dataflow_hook.py | 26 +++++++++++++++++++------
1 file changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/27948196/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index 72a2225..dc03f05 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -159,8 +159,12 @@ class DataFlowHook(GoogleCloudBaseHook):
_DataflowJob(self.get_conn(), variables['project'],
name, self.poll_sleep).wait_for_done()
- def start_java_dataflow(self, task_id, variables, dataflow, job_class=None):
- name = task_id + "-" + str(uuid.uuid1())[:8]
+ def start_java_dataflow(self, task_id, variables, dataflow, job_class=None,
+ append_job_name=True):
+ if append_job_name:
+ name = task_id + "-" + str(uuid.uuid1())[:8]
+ else:
+ name = task_id
variables['jobName'] = name
def label_formatter(labels_dict):
@@ -171,13 +175,21 @@ class DataFlowHook(GoogleCloudBaseHook):
self._start_dataflow(task_id, variables, name,
command_prefix, label_formatter)
- def start_template_dataflow(self, task_id, variables, parameters, dataflow_template):
- name = task_id + "-" + str(uuid.uuid1())[:8]
+ def start_template_dataflow(self, task_id, variables, parameters, dataflow_template,
+ append_job_name=True):
+ if append_job_name:
+ name = task_id + "-" + str(uuid.uuid1())[:8]
+ else:
+ name = task_id
self._start_template_dataflow(
name, variables, parameters, dataflow_template)
- def start_python_dataflow(self, task_id, variables, dataflow, py_options):
- name = task_id + "-" + str(uuid.uuid1())[:8]
+ def start_python_dataflow(self, task_id, variables, dataflow, py_options,
+ append_job_name=True):
+ if append_job_name:
+ name = task_id + "-" + str(uuid.uuid1())[:8]
+ else:
+ name = task_id
variables["job_name"] = name
def label_formatter(labels_dict):
@@ -193,6 +205,8 @@ class DataFlowHook(GoogleCloudBaseHook):
for attr, value in variables.items():
if attr == 'labels':
command += label_formatter(value)
+ elif value is None or value.__len__() < 1:
+ command.append("--" + attr)
else:
command.append("--" + attr + "=" + value)
return command