You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/05/31 13:30:01 UTC
[airflow] branch master updated: Create guide for Dataproc
Operators (#9037)
This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 29eb68b Create guide for Dataproc Operators (#9037)
29eb68b is described below
commit 29eb68b90b5df692ac322be0939af5e7fa9b71bc
Author: Joppe Vos <44...@users.noreply.github.com>
AuthorDate: Sun May 31 15:29:09 2020 +0200
Create guide for Dataproc Operators (#9037)
* added documentation for dataproc
* added more update information for updateMask
* Added link to information about cluster config api request
* Apply naming convention
* Set all dedents from 4 to 0
* Adjust dedent to 4, for operators
* removed dataproc guide from test_missing_guides
---
.../google/cloud/example_dags/example_dataproc.py | 29 +++-
docs/howto/operator/gcp/dataproc.rst | 188 +++++++++++++++++++++
docs/operators-and-hooks-ref.rst | 2 +-
tests/test_project_structure.py | 1 -
4 files changed, 216 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc.py b/airflow/providers/google/cloud/example_dags/example_dataproc.py
index b6e1070..55dbff5 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataproc.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataproc.py
@@ -42,6 +42,8 @@ SPARKR_MAIN = os.environ.get("SPARKR_MAIN", "hello_world.R")
SPARKR_URI = "gs://{}/{}".format(BUCKET, SPARKR_MAIN)
# Cluster definition
+# [START how_to_cloud_dataproc_create_cluster]
+
CLUSTER = {
"project_id": PROJECT_ID,
"cluster_name": CLUSTER_NAME,
@@ -59,8 +61,10 @@ CLUSTER = {
},
}
+# [END how_to_cloud_dataproc_create_cluster]
# Update options
+# [START how_to_cloud_dataproc_updatemask_cluster_operator]
CLUSTER_UPDATE = {
"config": {
"worker_config": {"num_instances": 3},
@@ -73,23 +77,28 @@ UPDATE_MASK = {
"config.secondary_worker_config.num_instances",
]
}
+# [END how_to_cloud_dataproc_updatemask_cluster_operator]
TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
-
# Jobs definitions
+# [START how_to_cloud_dataproc_pig_config]
PIG_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
}
+# [END how_to_cloud_dataproc_pig_config]
+# [START how_to_cloud_dataproc_sparksql_config]
SPARK_SQL_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}
+# [END how_to_cloud_dataproc_sparksql_config]
+# [START how_to_cloud_dataproc_spark_config]
SPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
@@ -98,25 +107,33 @@ SPARK_JOB = {
"main_class": "org.apache.spark.examples.SparkPi",
},
}
+# [END how_to_cloud_dataproc_spark_config]
+# [START how_to_cloud_dataproc_pyspark_config]
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": PYSPARK_URI},
}
+# [END how_to_cloud_dataproc_pyspark_config]
+# [START how_to_cloud_dataproc_sparkr_config]
SPARKR_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_r_job": {"main_r_file_uri": SPARKR_URI},
}
+# [END how_to_cloud_dataproc_sparkr_config]
+# [START how_to_cloud_dataproc_hive_config]
HIVE_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}
+# [END how_to_cloud_dataproc_hive_config]
+# [START how_to_cloud_dataproc_hadoop_config]
HADOOP_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
@@ -125,16 +142,20 @@ HADOOP_JOB = {
"args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
},
}
+# [END how_to_cloud_dataproc_hadoop_config]
with models.DAG(
"example_gcp_dataproc",
default_args={"start_date": days_ago(1)},
schedule_interval=None,
) as dag:
+ # [START how_to_cloud_dataproc_create_cluster_operator]
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster", project_id=PROJECT_ID, cluster=CLUSTER, region=REGION
)
+ # [END how_to_cloud_dataproc_create_cluster_operator]
+ # [START how_to_cloud_dataproc_update_cluster_operator]
scale_cluster = DataprocUpdateClusterOperator(
task_id="scale_cluster",
cluster_name=CLUSTER_NAME,
@@ -144,11 +165,11 @@ with models.DAG(
project_id=PROJECT_ID,
location=REGION,
)
+ # [END how_to_cloud_dataproc_update_cluster_operator]
pig_task = DataprocSubmitJobOperator(
task_id="pig_task", job=PIG_JOB, location=REGION, project_id=PROJECT_ID
)
-
spark_sql_task = DataprocSubmitJobOperator(
task_id="spark_sql_task",
job=SPARK_SQL_JOB,
@@ -160,9 +181,11 @@ with models.DAG(
task_id="spark_task", job=SPARK_JOB, location=REGION, project_id=PROJECT_ID
)
+ # [START how_to_cloud_dataproc_submit_job_to_cluster_operator]
pyspark_task = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
)
+ # [END how_to_cloud_dataproc_submit_job_to_cluster_operator]
sparkr_task = DataprocSubmitJobOperator(
task_id="sparkr_task", job=SPARKR_JOB, location=REGION, project_id=PROJECT_ID
@@ -176,12 +199,14 @@ with models.DAG(
task_id="hadoop_task", job=HADOOP_JOB, location=REGION, project_id=PROJECT_ID
)
+ # [START how_to_cloud_dataproc_delete_cluster_operator]
delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
)
+ # [END how_to_cloud_dataproc_delete_cluster_operator]
create_cluster >> scale_cluster
scale_cluster >> hive_task >> delete_cluster
diff --git a/docs/howto/operator/gcp/dataproc.rst b/docs/howto/operator/gcp/dataproc.rst
new file mode 100644
index 0000000..01e1f4a
--- /dev/null
+++ b/docs/howto/operator/gcp/dataproc.rst
@@ -0,0 +1,188 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Google Cloud Dataproc Operators
+===============================
+
+Dataproc is a managed Apache Spark and Apache Hadoop service that lets you
+take advantage of open source data tools for batch processing, querying, streaming and machine learning.
+Dataproc automation helps you create clusters quickly, manage them easily, and
+save money by turning clusters off when you don't need them.
+
+For more information about the service visit `Dataproc production documentation <Product documentation <https://cloud.google.com/dataproc/docs/reference>`__
+
+.. contents::
+ :depth: 1
+ :local:
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+
+.. _howto/operator:DataprocCreateClusterOperator:
+
+Create a Cluster
+----------------
+
+Before you create a dataproc cluster you need to define the cluster.
+It describes the identifying information, config, and status of a cluster of Compute Engine instances.
+For more information about the available fields to pass when creating a cluster, visit `Dataproc create cluster API. <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster>`__
+
+A cluster configuration can look as followed:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_cloud_dataproc_create_cluster]
+ :end-before: [END how_to_cloud_dataproc_create_cluster]
+
+With this configuration we can create the cluster:
+:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateClusterOperator`
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_create_cluster_operator]
+ :end-before: [END how_to_cloud_dataproc_create_cluster_operator]
+
+Update a cluster
+----------------
+You can scale the cluster up or down by providing a cluster config and a updateMask.
+In the updateMask argument you specifies the path, relative to Cluster, of the field to update.
+For more information on updateMask and other parameters take a look at `Dataproc update cluster API. <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/patch>`__
+
+An example of a new cluster config and the updateMask:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_cloud_dataproc_updatemask_cluster_operator]
+ :end-before: [END how_to_cloud_dataproc_updatemask_cluster_operator]
+
+To update a cluster you can use:
+:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocUpdateClusterOperator`
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_update_cluster_operator]
+ :end-before: [END how_to_cloud_dataproc_update_cluster_operator]
+
+Deleting a cluster
+------------------
+
+To delete a cluster you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocDeleteClusterOperator`.
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_delete_cluster_operator]
+ :end-before: [END how_to_cloud_dataproc_delete_cluster_operator]
+
+Submit a job to a cluster
+-------------------------
+
+Dataproc supports submitting jobs of different big data components.
+The list currently includes Spark, Hadoop, Pig and Hive.
+For more information on versions and images take a look at `Cloud Dataproc Image version list <https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-versions>`__
+
+To submit a job to the cluster you need a provide a job source file. The job source file can be on GCS, the cluster or on your local
+file system. You can specify a file:/// path to refer to a local file on a cluster's master node.
+
+The job configuration can be submitted by using:
+:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`.
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_submit_job_to_cluster_operator]
+ :end-before: [END how_to_cloud_dataproc_submit_job_to_cluster_operator]
+
+Examples of job configurations to submit
+----------------------------------------
+
+We have provided an example for every framework below.
+There are more arguments to provide in the jobs than the examples show. For the complete list of arguments take a look at
+`DataProc Job arguments <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs>`__
+
+Example of the configuration for a PySpark Job:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_cloud_dataproc_pyspark_config]
+ :end-before: [END how_to_cloud_dataproc_pyspark_config]
+
+Example of the configuration for a SparkSQl Job:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_cloud_dataproc_sparksql_config]
+ :end-before: [END how_to_cloud_dataproc_sparksql_config]
+
+Example of the configuration for a Spark Job:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_cloud_dataproc_spark_config]
+ :end-before: [END how_to_cloud_dataproc_spark_config]
+
+Example of the configuration for a Hive Job:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_cloud_dataproc_hive_config]
+ :end-before: [END how_to_cloud_dataproc_hive_config]
+
+Example of the configuration for a Hadoop Job:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_cloud_dataproc_hadoop_config]
+ :end-before: [END how_to_cloud_dataproc_hadoop_config]
+
+Example of the configuration for a Pig Job:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_cloud_dataproc_pig_config]
+ :end-before: [END how_to_cloud_dataproc_pig_config]
+
+
+Example of the configuration for a SparkR:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_cloud_dataproc_sparkr_config]
+ :end-before: [END how_to_cloud_dataproc_sparkr_config]
+
+References
+^^^^^^^^^^
+For further information, take a look at:
+
+* `DataProc API documentation <https://cloud.google.com/dataproc/docs/reference>`__
+* `Product documentation <https://cloud.google.com/dataproc/docs/reference>`__
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 3eaf1a0..48aeea2 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -697,7 +697,7 @@ These integrations allow you to perform various operations within the Google Clo
-
* - `Dataproc <https://cloud.google.com/dataproc/>`__
- -
+ - :doc:`How to use <howto/operator/gcp/dataproc>`
- :mod:`airflow.providers.google.cloud.hooks.dataproc`
- :mod:`airflow.providers.google.cloud.operators.dataproc`
-
diff --git a/tests/test_project_structure.py b/tests/test_project_structure.py
index 8f834d5..bef70b9 100644
--- a/tests/test_project_structure.py
+++ b/tests/test_project_structure.py
@@ -144,7 +144,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
'bigquery_to_mysql',
'cassandra_to_gcs',
'dataflow',
- 'dataproc',
'datastore',
'dlp',
'gcs_to_bigquery',