You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2016/08/19 23:02:50 UTC
incubator-airflow git commit: [AIRFLOW-446][AIRFLOW-445] Adds missing
dataproc submit options
Repository: incubator-airflow
Updated Branches:
refs/heads/master d20299177 -> 7a5e1d832
[AIRFLOW-446][AIRFLOW-445] Adds missing dataproc submit options
Adds support equivalent support for --files and
--py-files cli options.
Signed-off-by: Corentin Kerisit <c...@42.am>
Dear Airflow Maintainers,
Please accept this PR that addresses the following
issues:
- [AIRFLOW-445 - Missing dataproc operator submit
options](https://issues.apache.org/jira/browse/AIR
FLOW-445)
Changes Done:
- Adds support equivalent support for --files and
--py-files cli options.
Testing Done:
- Non existent for that operator yet, tested
manually by submitting a job with extra files and
py-files
Signed-off-by: Corentin Kerisit <c42.am>
Closes #1748 from
cerisier/dataproc_additional_options
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7a5e1d83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7a5e1d83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7a5e1d83
Branch: refs/heads/master
Commit: 7a5e1d832376af5e048a20ab718c41a20ff6cdf8
Parents: d202991
Author: Corentin Kerisit <c...@42.am>
Authored: Fri Aug 19 16:02:35 2016 -0700
Committer: Siddharth Anand <si...@yahoo.com>
Committed: Fri Aug 19 16:02:35 2016 -0700
----------------------------------------------------------------------
airflow/contrib/hooks/gcp_dataproc_hook.py | 8 ++++++++
airflow/contrib/operators/dataproc_operator.py | 21 +++++++++++++++++++++
2 files changed, 29 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7a5e1d83/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index 3dd1f64..fab71c5 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -107,6 +107,14 @@ class _DataProcJobBuilder:
if archives is not None:
self.job["job"][self.job_type]["archiveUris"] = archives
+ def add_file_uris(self, files):
+ if files is not None:
+ self.job["job"][self.job_type]["fileUris"] = files
+
+ def add_python_file_uris(self, pyfiles):
+ if pyfiles is not None:
+ self.job["job"][self.job_type]["pythonFileUris"] = pyfiles
+
def set_main(self, main_jar, main_class):
if main_class is not None and main_jar is not None:
raise Exception("Set either main_jar or main_class")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7a5e1d83/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 4d3e1e7..33e5f79 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -256,6 +256,7 @@ class DataProcSparkOperator(BaseOperator):
main_class=None,
arguments=None,
archives=None,
+ files=None,
dataproc_cluster='cluster-1',
dataproc_spark_properties=None,
dataproc_spark_jars=None,
@@ -277,6 +278,8 @@ class DataProcSparkOperator(BaseOperator):
:param archives: List of archived files that will be unpacked in the work
directory. Should be stored in Cloud Storage.
:type archives: list
+ :param files: List of files to be copied to the working directory
+ :type files: list
:param dataproc_cluster: The id of the DataProc cluster.
:type dataproc_cluster: string
:param dataproc_spark_properties: Map for the Pig properties. Ideal to put in
@@ -299,6 +302,7 @@ class DataProcSparkOperator(BaseOperator):
self.main_class = main_class
self.arguments = arguments
self.archives = archives
+ self.files = files
self.dataproc_cluster = dataproc_cluster
self.dataproc_properties = dataproc_spark_properties
self.dataproc_jars = dataproc_spark_jars
@@ -313,6 +317,7 @@ class DataProcSparkOperator(BaseOperator):
job.add_args(self.arguments)
job.add_jar_file_uris(self.dataproc_jars)
job.add_archive_uris(self.archives)
+ job.add_file_uris(self.files)
hook.submit(hook.project_id, job.build())
@@ -332,6 +337,7 @@ class DataProcHadoopOperator(BaseOperator):
main_class=None,
arguments=None,
archives=None,
+ files=None,
dataproc_cluster='cluster-1',
dataproc_hadoop_properties=None,
dataproc_hadoop_jars=None,
@@ -353,6 +359,8 @@ class DataProcHadoopOperator(BaseOperator):
:param archives: List of archived files that will be unpacked in the work
directory. Should be stored in Cloud Storage.
:type archives: list
+ :param files: List of files to be copied to the working directory
+ :type files: list
:param dataproc_cluster: The id of the DataProc cluster.
:type dataproc_cluster: string
:param dataproc_hadoop_properties: Map for the Pig properties. Ideal to put in
@@ -375,6 +383,7 @@ class DataProcHadoopOperator(BaseOperator):
self.main_class = main_class
self.arguments = arguments
self.archives = archives
+ self.files = files
self.dataproc_cluster = dataproc_cluster
self.dataproc_properties = dataproc_hadoop_properties
self.dataproc_jars = dataproc_hadoop_jars
@@ -389,6 +398,7 @@ class DataProcHadoopOperator(BaseOperator):
job.add_args(self.arguments)
job.add_jar_file_uris(self.dataproc_jars)
job.add_archive_uris(self.archives)
+ job.add_file_uris(self.files)
hook.submit(hook.project_id, job.build())
@@ -407,6 +417,8 @@ class DataProcPySparkOperator(BaseOperator):
main,
arguments=None,
archives=None,
+ pyfiles=None,
+ files=None,
dataproc_cluster='cluster-1',
dataproc_pyspark_properties=None,
dataproc_pyspark_jars=None,
@@ -425,6 +437,11 @@ class DataProcPySparkOperator(BaseOperator):
:param archives: List of archived files that will be unpacked in the work
directory. Should be stored in Cloud Storage.
:type archives: list
+ :param files: List of files to be copied to the working directory
+ :type files: list
+ :param pyfiles: List of Python files to pass to the PySpark framework.
+ Supported file types: .py, .egg, and .zip
+ :type pyfiles: list
:param dataproc_cluster: The id of the DataProc cluster.
:type dataproc_cluster: string
:param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in
@@ -446,6 +463,8 @@ class DataProcPySparkOperator(BaseOperator):
self.main = main
self.arguments = arguments
self.archives = archives
+ self.files = files
+ self.pyfiles = pyfiles
self.dataproc_cluster = dataproc_cluster
self.dataproc_properties = dataproc_pyspark_properties
self.dataproc_jars = dataproc_pyspark_jars
@@ -460,5 +479,7 @@ class DataProcPySparkOperator(BaseOperator):
job.add_args(self.arguments)
job.add_jar_file_uris(self.dataproc_jars)
job.add_archive_uris(self.archives)
+ job.add_file_uris(self.files)
+ job.add_python_file_uris(self.pyfiles)
hook.submit(hook.project_id, job.build())