You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/05/31 16:58:42 UTC
incubator-airflow git commit: [AIRFLOW-23] Support for Google Cloud
DataProc
Repository: incubator-airflow
Updated Branches:
refs/heads/master 03e8e5d49 -> 0a5364cd4
[AIRFLOW-23] Support for Google Cloud DataProc
Dear Airflow Maintainers,
Please accept this PR that addresses the following issues:
- AIRFLOW-23
Spark, Hadoop, PySpart, SparkSQL, Pig an Hive support of DataProc cluster
Author: Alex Van Boxel <al...@vanboxel.be>
Closes #1532 from alexvanboxel/AIRFLOW-23.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0a5364cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0a5364cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0a5364cd
Branch: refs/heads/master
Commit: 0a5364cd430a024ed9be4e5b2b5d5174c1b98497
Parents: 03e8e5d
Author: Alex Van Boxel <al...@vanboxel.be>
Authored: Tue May 31 09:58:32 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Tue May 31 09:58:32 2016 -0700
----------------------------------------------------------------------
airflow/contrib/hooks/__init__.py | 1 +
airflow/contrib/hooks/gcp_dataproc_hook.py | 145 ++++++
airflow/contrib/operators/dataproc_operator.py | 464 ++++++++++++++++++++
3 files changed, 610 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a5364cd/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py
index b4627f5..17a9f29 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -26,6 +26,7 @@ _hooks = {
'qubole_hook': ['QuboleHook'],
'gcs_hook': ['GoogleCloudStorageHook'],
'datastore_hook': ['DatastoreHook'],
+ 'gcp_dataproc_hook': ['DataProcHook'],
'cloudant_hook': ['CloudantHook']
}
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a5364cd/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
new file mode 100644
index 0000000..3dd1f64
--- /dev/null
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -0,0 +1,145 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import time
+import uuid
+
+from apiclient.discovery import build
+
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+
+
+class _DataProcJob:
+ def __init__(self, dataproc_api, project_id, job):
+ self.dataproc_api = dataproc_api
+ self.project_id = project_id
+ self.job = dataproc_api.projects().regions().jobs().submit(
+ projectId=self.project_id,
+ region='global',
+ body=job).execute()
+ self.job_id = self.job['reference']['jobId']
+ logging.info('DataProc job %s is %s', self.job_id,
+ str(self.job['status']['state']))
+
+ def wait_for_done(self):
+ while True:
+ self.job = self.dataproc_api.projects().regions().jobs().get(
+ projectId=self.project_id,
+ region='global',
+ jobId=self.job_id).execute()
+ if 'ERROR' == self.job['status']['state']:
+ print(str(self.job))
+ logging.error('DataProc job %s has errors', self.job_id)
+ logging.error(self.job['status']['details'])
+ logging.debug(str(self.job))
+ return False
+ if 'CANCELLED' == self.job['status']['state']:
+ print(str(self.job))
+ logging.warning('DataProc job %s is cancelled', self.job_id)
+ if 'details' in self.job['status']:
+ logging.warning(self.job['status']['details'])
+ logging.debug(str(self.job))
+ return False
+ if 'DONE' == self.job['status']['state']:
+ return True
+ logging.debug('DataProc job %s is %s', self.job_id,
+ str(self.job['status']['state']))
+ time.sleep(5)
+
+ def raise_error(self, message=None):
+ if 'ERROR' == self.job['status']['state']:
+ if message is None:
+ message = "Google DataProc job has error"
+ raise Exception(message + ": " + str(self.job['status']['details']))
+
+ def get(self):
+ return self.job
+
+
+class _DataProcJobBuilder:
+ def __init__(self, project_id, task_id, dataproc_cluster, job_type, properties):
+ name = task_id + "_" + str(uuid.uuid1())[:8]
+ self.job_type = job_type
+ self.job = {
+ "job": {
+ "reference": {
+ "projectId": project_id,
+ "jobId": name,
+ },
+ "placement": {
+ "clusterName": dataproc_cluster
+ },
+ job_type: {
+ }
+ }
+ }
+ if properties is not None:
+ self.job["job"][job_type]["properties"] = properties
+
+ def add_variables(self, variables):
+ if variables is not None:
+ self.job["job"][self.job_type]["scriptVariables"] = variables
+
+ def add_args(self, args):
+ if args is not None:
+ self.job["job"][self.job_type]["args"] = args
+
+ def add_query(self, query):
+ self.job["job"][self.job_type]["queryList"] = {'queries': [query]}
+
+ def add_jar_file_uris(self, jars):
+ if jars is not None:
+ self.job["job"][self.job_type]["jarFileUris"] = jars
+
+ def add_archive_uris(self, archives):
+ if archives is not None:
+ self.job["job"][self.job_type]["archiveUris"] = archives
+
+ def set_main(self, main_jar, main_class):
+ if main_class is not None and main_jar is not None:
+ raise Exception("Set either main_jar or main_class")
+ if main_jar:
+ self.job["job"][self.job_type]["mainJarFileUri"] = main_jar
+ else:
+ self.job["job"][self.job_type]["mainClass"] = main_class
+
+ def set_python_main(self, main):
+ self.job["job"][self.job_type]["mainPythonFileUri"] = main
+
+ def build(self):
+ return self.job
+
+
+class DataProcHook(GoogleCloudBaseHook):
+ def __init__(self,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None):
+ super(DataProcHook, self).__init__(gcp_conn_id, delegate_to)
+
+ def get_conn(self):
+ """
+ Returns a Google Cloud DataProc service object.
+ """
+ http_authorized = self._authorize()
+ return build('dataproc', 'v1', http=http_authorized)
+
+ def submit(self, project_id, job):
+ submitted = _DataProcJob(self.get_conn(), project_id, job)
+ if not submitted.wait_for_done():
+ submitted.raise_error("DataProcTask has errors")
+
+ def create_job_template(self, task_id, dataproc_cluster, job_type, properties):
+ return _DataProcJobBuilder(self.project_id, task_id, dataproc_cluster, job_type,
+ properties)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a5364cd/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
new file mode 100644
index 0000000..4d3e1e7
--- /dev/null
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -0,0 +1,464 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataProcPigOperator(BaseOperator):
+ """
+ Start a Pig query Job on a Cloud DataProc cluster. The parameters of the operation
+ will be passed to the cluster.
+
+ It's a good practice to define dataproc_* parameters in the default_args of the dag
+ like the cluster name and UDFs.
+
+ ```
+ default_args = {
+ 'dataproc_cluster': 'cluster-1',
+ 'dataproc_pig_jars': [
+ 'gs://example/udf/jar/datafu/1.2.0/datafu.jar',
+ 'gs://example/udf/jar/gpig/1.2/gpig.jar'
+ ]
+ }
+ ```
+
+ You can pass a pig script as string or file reference. Use variables to pass on
+ variables for the pig script to be resolved on the cluster or use the parameters to
+ be resolved in the script as template parameters.
+
+ ```
+ t1 = DataProcPigOperator(
+ task_id='dataproc_pig',
+ query='a_pig_script.pig',
+ variables={'out': 'gs://example/output/{ds}'},
+ dag=dag)
+ ```
+ """
+ template_fields = ['query', 'variables']
+ template_ext = ('.pg', '.pig',)
+ ui_color = '#0273d4'
+
+ @apply_defaults
+ def __init__(
+ self,
+ query,
+ variables=None,
+ dataproc_cluster='cluster-1',
+ dataproc_pig_properties=None,
+ dataproc_pig_jars=None,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ Create a new DataProcPigOperator.
+
+ For more detail on about job submission have a look at the reference:
+
+ https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs
+
+ :param query: The query or reference to the query file (pg or pig extension).
+ :type query: string
+ :param variables: Map of named parameters for the query.
+ :type variables: dict
+ :param dataproc_cluster: The id of the DataProc cluster.
+ :type dataproc_cluster: string
+ :param dataproc_pig_properties: Map for the Pig properties. Ideal to put in
+ default arguments
+ :type dataproc_pig_properties: dict
+ :param dataproc_pig_jars: URIs to jars provisioned in Cloud Storage (example: for
+ UDFs and libs) and are ideal to put in default arguments.
+ :type dataproc_pig_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ """
+ super(DataProcPigOperator, self).__init__(*args, **kwargs)
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.query = query
+ self.variables = variables
+ self.dataproc_cluster = dataproc_cluster
+ self.dataproc_properties = dataproc_pig_properties
+ self.dataproc_jars = dataproc_pig_jars
+
+ def execute(self, context):
+ hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to)
+ job = hook.create_job_template(self.task_id, self.dataproc_cluster, "pigJob",
+ self.dataproc_properties)
+
+ job.add_query(self.query)
+ job.add_variables(self.variables)
+ job.add_jar_file_uris(self.dataproc_jars)
+
+ hook.submit(hook.project_id, job.build())
+
+
+class DataProcHiveOperator(BaseOperator):
+ """
+ Start a Hive query Job on a Cloud DataProc cluster.
+ """
+ template_fields = ['query', 'variables']
+ template_ext = ('.q',)
+ ui_color = '#0273d4'
+
+ @apply_defaults
+ def __init__(
+ self,
+ query,
+ variables=None,
+ dataproc_cluster='cluster-1',
+ dataproc_hive_properties=None,
+ dataproc_hive_jars=None,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ Create a new DataProcHiveOperator.
+
+ :param query: The query or reference to the query file (q extension).
+ :type query: string
+ :param variables: Map of named parameters for the query.
+ :type variables: dict
+ :param dataproc_cluster: The id of the DataProc cluster.
+ :type dataproc_cluster: string
+ :param dataproc_hive_properties: Map for the Pig properties. Ideal to put in
+ default arguments
+ :type dataproc_hive_properties: dict
+ :param dataproc_hive_jars: URIs to jars provisioned in Cloud Storage (example: for
+ UDFs and libs) and are ideal to put in default arguments.
+ :type dataproc_hive_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ """
+ super(DataProcHiveOperator, self).__init__(*args, **kwargs)
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.query = query
+ self.variables = variables
+ self.dataproc_cluster = dataproc_cluster
+ self.dataproc_properties = dataproc_hive_properties
+ self.dataproc_jars = dataproc_hive_jars
+
+ def execute(self, context):
+ hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to)
+
+ job = hook.create_job_template(self.task_id, self.dataproc_cluster, "hiveJob",
+ self.dataproc_properties)
+
+ job.add_query(self.query)
+ job.add_variables(self.variables)
+ job.add_jar_file_uris(self.dataproc_jars)
+
+ hook.submit(hook.project_id, job.build())
+
+
+class DataProcSparkSqlOperator(BaseOperator):
+ """
+ Start a Spark SQL query Job on a Cloud DataProc cluster.
+ """
+ template_fields = ['query', 'variables']
+ template_ext = ('.q',)
+ ui_color = '#0273d4'
+
+ @apply_defaults
+ def __init__(
+ self,
+ query,
+ variables=None,
+ dataproc_cluster='cluster-1',
+ dataproc_spark_properties=None,
+ dataproc_spark_jars=None,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ Create a new DataProcSparkSqlOperator.
+
+ :param query: The query or reference to the query file (q extension).
+ :type query: string
+ :param variables: Map of named parameters for the query.
+ :type variables: dict
+ :param dataproc_cluster: The id of the DataProc cluster.
+ :type dataproc_cluster: string
+ :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in
+ default arguments
+ :type dataproc_spark_properties: dict
+ :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage (example:
+ for UDFs and libs) and are ideal to put in default arguments.
+ :type dataproc_spark_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ """
+ super(DataProcSparkSqlOperator, self).__init__(*args, **kwargs)
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.query = query
+ self.variables = variables
+ self.dataproc_cluster = dataproc_cluster
+ self.dataproc_properties = dataproc_spark_properties
+ self.dataproc_jars = dataproc_spark_jars
+
+ def execute(self, context):
+ hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to)
+
+ job = hook.create_job_template(self.task_id, self.dataproc_cluster, "sparkSqlJob",
+ self.dataproc_properties)
+
+ job.add_query(self.query)
+ job.add_variables(self.variables)
+ job.add_jar_file_uris(self.dataproc_jars)
+
+ hook.submit(hook.project_id, job.build())
+
+
+class DataProcSparkOperator(BaseOperator):
+ """
+ Start a Spark Job on a Cloud DataProc cluster.
+ """
+
+ template_fields = ['arguments']
+ ui_color = '#0273d4'
+
+ @apply_defaults
+ def __init__(
+ self,
+ main_jar=None,
+ main_class=None,
+ arguments=None,
+ archives=None,
+ dataproc_cluster='cluster-1',
+ dataproc_spark_properties=None,
+ dataproc_spark_jars=None,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ Create a new DataProcSparkOperator.
+
+ :param main_jar: URI of the job jar provisioned on Cloud Storage. (use this or
+ the main_class, not both together).
+ :type main_jar: string
+ :param main_class: Name of the job class. (use this or the main_jar, not both
+ together).
+ :type main_class: string
+ :param arguments: Arguments for the job.
+ :type arguments: list
+ :param archives: List of archived files that will be unpacked in the work
+ directory. Should be stored in Cloud Storage.
+ :type archives: list
+ :param dataproc_cluster: The id of the DataProc cluster.
+ :type dataproc_cluster: string
+ :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in
+ default arguments
+ :type dataproc_spark_properties: dict
+ :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage (example:
+ for UDFs and libs) and are ideal to put in default arguments.
+ :type dataproc_spark_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ """
+ super(DataProcSparkOperator, self).__init__(*args, **kwargs)
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.main_jar = main_jar
+ self.main_class = main_class
+ self.arguments = arguments
+ self.archives = archives
+ self.dataproc_cluster = dataproc_cluster
+ self.dataproc_properties = dataproc_spark_properties
+ self.dataproc_jars = dataproc_spark_jars
+
+ def execute(self, context):
+ hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to)
+ job = hook.create_job_template(self.task_id, self.dataproc_cluster, "sparkJob",
+ self.dataproc_properties)
+
+ job.set_main(self.main_jar, self.main_class)
+ job.add_args(self.arguments)
+ job.add_jar_file_uris(self.dataproc_jars)
+ job.add_archive_uris(self.archives)
+
+ hook.submit(hook.project_id, job.build())
+
+
+class DataProcHadoopOperator(BaseOperator):
+ """
+ Start a Hadoop Job on a Cloud DataProc cluster.
+ """
+
+ template_fields = ['arguments']
+ ui_color = '#0273d4'
+
+ @apply_defaults
+ def __init__(
+ self,
+ main_jar=None,
+ main_class=None,
+ arguments=None,
+ archives=None,
+ dataproc_cluster='cluster-1',
+ dataproc_hadoop_properties=None,
+ dataproc_hadoop_jars=None,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ Create a new DataProcHadoopOperator.
+
+ :param main_jar: URI of the job jar provisioned on Cloud Storage. (use this or
+ the main_class, not both together).
+ :type main_jar: string
+ :param main_class: Name of the job class. (use this or the main_jar, not both
+ together).
+ :type main_class: string
+ :param arguments: Arguments for the job.
+ :type arguments: list
+ :param archives: List of archived files that will be unpacked in the work
+ directory. Should be stored in Cloud Storage.
+ :type archives: list
+ :param dataproc_cluster: The id of the DataProc cluster.
+ :type dataproc_cluster: string
+ :param dataproc_hadoop_properties: Map for the Pig properties. Ideal to put in
+ default arguments
+ :type dataproc_hadoop_properties: dict
+ :param dataproc_hadoop_jars: URIs to jars provisioned in Cloud Storage (example:
+ for UDFs and libs) and are ideal to put in default arguments.
+ :type dataproc_hadoop_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ """
+ super(DataProcHadoopOperator, self).__init__(*args, **kwargs)
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.main_jar = main_jar
+ self.main_class = main_class
+ self.arguments = arguments
+ self.archives = archives
+ self.dataproc_cluster = dataproc_cluster
+ self.dataproc_properties = dataproc_hadoop_properties
+ self.dataproc_jars = dataproc_hadoop_jars
+
+ def execute(self, context):
+ hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to)
+ job = hook.create_job_template(self.task_id, self.dataproc_cluster, "hadoopJob",
+ self.dataproc_properties)
+
+ job.set_main(self.main_jar, self.main_class)
+ job.add_args(self.arguments)
+ job.add_jar_file_uris(self.dataproc_jars)
+ job.add_archive_uris(self.archives)
+
+ hook.submit(hook.project_id, job.build())
+
+
+class DataProcPySparkOperator(BaseOperator):
+ """
+ Start a PySpark Job on a Cloud DataProc cluster.
+ """
+
+ template_fields = ['arguments']
+ ui_color = '#0273d4'
+
+ @apply_defaults
+ def __init__(
+ self,
+ main,
+ arguments=None,
+ archives=None,
+ dataproc_cluster='cluster-1',
+ dataproc_pyspark_properties=None,
+ dataproc_pyspark_jars=None,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ Create a new DataProcPySparkOperator.
+
+ :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main
+ Python file to use as the driver. Must be a .py file.
+ :type main: string
+ :param arguments: Arguments for the job.
+ :type arguments: list
+ :param archives: List of archived files that will be unpacked in the work
+ directory. Should be stored in Cloud Storage.
+ :type archives: list
+ :param dataproc_cluster: The id of the DataProc cluster.
+ :type dataproc_cluster: string
+ :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in
+ default arguments
+ :type dataproc_pyspark_properties: dict
+ :param dataproc_pyspark_jars: URIs to jars provisioned in Cloud Storage (example:
+ for UDFs and libs) and are ideal to put in default arguments.
+ :type dataproc_pyspark_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have
+ domain-wide delegation enabled.
+ :type delegate_to: string
+ """
+ super(DataProcPySparkOperator, self).__init__(*args, **kwargs)
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.main = main
+ self.arguments = arguments
+ self.archives = archives
+ self.dataproc_cluster = dataproc_cluster
+ self.dataproc_properties = dataproc_pyspark_properties
+ self.dataproc_jars = dataproc_pyspark_jars
+
+ def execute(self, context):
+ hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to)
+ job = hook.create_job_template(self.task_id, self.dataproc_cluster, "pysparkJob",
+ self.dataproc_properties)
+
+ job.set_python_main(self.main)
+ job.add_args(self.arguments)
+ job.add_jar_file_uris(self.dataproc_jars)
+ job.add_archive_uris(self.archives)
+
+ hook.submit(hook.project_id, job.build())