You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2016/08/19 23:02:50 UTC

incubator-airflow git commit: [AIRFLOW-446][AIRFLOW-445] Adds missing dataproc submit options

Repository: incubator-airflow
Updated Branches:
  refs/heads/master d20299177 -> 7a5e1d832


[AIRFLOW-446][AIRFLOW-445] Adds missing dataproc submit options

Adds support equivalent support for --files  and
--py-files cli options.

Signed-off-by: Corentin Kerisit <c...@42.am>

Dear Airflow Maintainers,

Please accept this PR that addresses the following
issues:
- [AIRFLOW-445 - Missing dataproc operator submit
options](https://issues.apache.org/jira/browse/AIR
FLOW-445)

Changes Done:
- Adds support equivalent support for --files  and
--py-files cli options.

Testing Done:
- Non existent for that operator yet, tested
manually by submitting a job with extra files and
py-files

Signed-off-by: Corentin Kerisit <c42.am>

Closes #1748 from
cerisier/dataproc_additional_options


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7a5e1d83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7a5e1d83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7a5e1d83

Branch: refs/heads/master
Commit: 7a5e1d832376af5e048a20ab718c41a20ff6cdf8
Parents: d202991
Author: Corentin Kerisit <c...@42.am>
Authored: Fri Aug 19 16:02:35 2016 -0700
Committer: Siddharth Anand <si...@yahoo.com>
Committed: Fri Aug 19 16:02:35 2016 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataproc_hook.py     |  8 ++++++++
 airflow/contrib/operators/dataproc_operator.py | 21 +++++++++++++++++++++
 2 files changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7a5e1d83/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index 3dd1f64..fab71c5 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -107,6 +107,14 @@ class _DataProcJobBuilder:
         if archives is not None:
             self.job["job"][self.job_type]["archiveUris"] = archives
 
+    def add_file_uris(self, files):
+        if files is not None:
+            self.job["job"][self.job_type]["fileUris"] = files
+
+    def add_python_file_uris(self, pyfiles):
+        if pyfiles is not None:
+            self.job["job"][self.job_type]["pythonFileUris"] = pyfiles
+
     def set_main(self, main_jar, main_class):
         if main_class is not None and main_jar is not None:
             raise Exception("Set either main_jar or main_class")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7a5e1d83/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 4d3e1e7..33e5f79 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -256,6 +256,7 @@ class DataProcSparkOperator(BaseOperator):
             main_class=None,
             arguments=None,
             archives=None,
+            files=None,
             dataproc_cluster='cluster-1',
             dataproc_spark_properties=None,
             dataproc_spark_jars=None,
@@ -277,6 +278,8 @@ class DataProcSparkOperator(BaseOperator):
         :param archives: List of archived files that will be unpacked in the work
             directory. Should be stored in Cloud Storage.
         :type archives: list
+        :param files: List of files to be copied to the working directory
+        :type files: list
         :param dataproc_cluster: The id of the DataProc cluster.
         :type dataproc_cluster: string
         :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in
@@ -299,6 +302,7 @@ class DataProcSparkOperator(BaseOperator):
         self.main_class = main_class
         self.arguments = arguments
         self.archives = archives
+        self.files = files
         self.dataproc_cluster = dataproc_cluster
         self.dataproc_properties = dataproc_spark_properties
         self.dataproc_jars = dataproc_spark_jars
@@ -313,6 +317,7 @@ class DataProcSparkOperator(BaseOperator):
         job.add_args(self.arguments)
         job.add_jar_file_uris(self.dataproc_jars)
         job.add_archive_uris(self.archives)
+        job.add_file_uris(self.files)
 
         hook.submit(hook.project_id, job.build())
 
@@ -332,6 +337,7 @@ class DataProcHadoopOperator(BaseOperator):
             main_class=None,
             arguments=None,
             archives=None,
+            files=None,
             dataproc_cluster='cluster-1',
             dataproc_hadoop_properties=None,
             dataproc_hadoop_jars=None,
@@ -353,6 +359,8 @@ class DataProcHadoopOperator(BaseOperator):
         :param archives: List of archived files that will be unpacked in the work
             directory. Should be stored in Cloud Storage.
         :type archives: list
+        :param files: List of files to be copied to the working directory
+        :type files: list
         :param dataproc_cluster: The id of the DataProc cluster.
         :type dataproc_cluster: string
         :param dataproc_hadoop_properties: Map for the Pig properties. Ideal to put in
@@ -375,6 +383,7 @@ class DataProcHadoopOperator(BaseOperator):
         self.main_class = main_class
         self.arguments = arguments
         self.archives = archives
+        self.files = files
         self.dataproc_cluster = dataproc_cluster
         self.dataproc_properties = dataproc_hadoop_properties
         self.dataproc_jars = dataproc_hadoop_jars
@@ -389,6 +398,7 @@ class DataProcHadoopOperator(BaseOperator):
         job.add_args(self.arguments)
         job.add_jar_file_uris(self.dataproc_jars)
         job.add_archive_uris(self.archives)
+        job.add_file_uris(self.files)
 
         hook.submit(hook.project_id, job.build())
 
@@ -407,6 +417,8 @@ class DataProcPySparkOperator(BaseOperator):
             main,
             arguments=None,
             archives=None,
+            pyfiles=None,
+            files=None,
             dataproc_cluster='cluster-1',
             dataproc_pyspark_properties=None,
             dataproc_pyspark_jars=None,
@@ -425,6 +437,11 @@ class DataProcPySparkOperator(BaseOperator):
          :param archives: List of archived files that will be unpacked in the work
             directory. Should be stored in Cloud Storage.
          :type archives: list
+         :param files: List of files to be copied to the working directory
+         :type files: list
+         :param pyfiles: List of Python files to pass to the PySpark framework.
+            Supported file types: .py, .egg, and .zip
+         :type pyfiles: list
          :param dataproc_cluster: The id of the DataProc cluster.
          :type dataproc_cluster: string
          :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in
@@ -446,6 +463,8 @@ class DataProcPySparkOperator(BaseOperator):
         self.main = main
         self.arguments = arguments
         self.archives = archives
+        self.files = files
+        self.pyfiles = pyfiles
         self.dataproc_cluster = dataproc_cluster
         self.dataproc_properties = dataproc_pyspark_properties
         self.dataproc_jars = dataproc_pyspark_jars
@@ -460,5 +479,7 @@ class DataProcPySparkOperator(BaseOperator):
         job.add_args(self.arguments)
         job.add_jar_file_uris(self.dataproc_jars)
         job.add_archive_uris(self.archives)
+        job.add_file_uris(self.files)
+        job.add_python_file_uris(self.pyfiles)
 
         hook.submit(hook.project_id, job.build())