You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/06/27 19:44:07 UTC
incubator-airflow git commit: [AIRFLOW-1350] Add query_uri param to
Hive/SparkSQL DataProc operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 5fe25d859 -> d32c72969
[AIRFLOW-1350] Add query_uri param to Hive/SparkSQL DataProc operator
Closes #2402 from lukeFalsina/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d32c7296
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d32c7296
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d32c7296
Branch: refs/heads/master
Commit: d32c7296908e6975c4dda7159c1a7a6b9e89f046
Parents: 5fe25d8
Author: Luca Falsina <lu...@booking.com>
Authored: Tue Jun 27 12:43:13 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Tue Jun 27 12:43:17 2017 -0700
----------------------------------------------------------------------
airflow/contrib/operators/dataproc_operator.py | 22 +++++++++++++++++----
1 file changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d32c7296/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 14245c8..3e006ac 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -472,7 +472,8 @@ class DataProcHiveOperator(BaseOperator):
@apply_defaults
def __init__(
self,
- query,
+ query=None,
+ query_uri=None,
variables=None,
job_name='{{task.task_id}}_{{ds_nodash}}',
dataproc_cluster='cluster-1',
@@ -487,6 +488,8 @@ class DataProcHiveOperator(BaseOperator):
:param query: The query or reference to the query file (q extension).
:type query: string
+ :param query_uri: The uri of a hive script on Cloud Storage.
+ :type query_uri: string
:param variables: Map of named parameters for the query.
:type variables: dict
:param job_name: The job name used in the DataProc cluster. This name by default
@@ -512,6 +515,7 @@ class DataProcHiveOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.query = query
+ self.query_uri = query_uri
self.variables = variables
self.job_name = job_name
self.dataproc_cluster = dataproc_cluster
@@ -525,7 +529,10 @@ class DataProcHiveOperator(BaseOperator):
job = hook.create_job_template(self.task_id, self.dataproc_cluster, "hiveJob",
self.dataproc_properties)
- job.add_query(self.query)
+ if self.query is None:
+ job.add_query_uri(self.query_uri)
+ else:
+ job.add_query(self.query)
job.add_variables(self.variables)
job.add_jar_file_uris(self.dataproc_jars)
job.set_job_name(self.job_name)
@@ -544,7 +551,8 @@ class DataProcSparkSqlOperator(BaseOperator):
@apply_defaults
def __init__(
self,
- query,
+ query=None,
+ query_uri=None,
variables=None,
job_name='{{task.task_id}}_{{ds_nodash}}',
dataproc_cluster='cluster-1',
@@ -559,6 +567,8 @@ class DataProcSparkSqlOperator(BaseOperator):
:param query: The query or reference to the query file (q extension).
:type query: string
+ :param query_uri: The uri of a spark sql script on Cloud Storage.
+ :type query_uri: string
:param variables: Map of named parameters for the query.
:type variables: dict
:param job_name: The job name used in the DataProc cluster. This name by default
@@ -584,6 +594,7 @@ class DataProcSparkSqlOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.query = query
+ self.query_uri = query_uri
self.variables = variables
self.job_name = job_name
self.dataproc_cluster = dataproc_cluster
@@ -597,7 +608,10 @@ class DataProcSparkSqlOperator(BaseOperator):
job = hook.create_job_template(self.task_id, self.dataproc_cluster, "sparkSqlJob",
self.dataproc_properties)
- job.add_query(self.query)
+ if self.query is None:
+ job.add_query_uri(self.query_uri)
+ else:
+ job.add_query(self.query)
job.add_variables(self.variables)
job.add_jar_file_uris(self.dataproc_jars)
job.set_job_name(self.job_name)