You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/05/31 13:30:01 UTC

[airflow] branch master updated: Create guide for Dataproc Operators (#9037)

This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 29eb68b  Create guide for Dataproc Operators (#9037)
29eb68b is described below

commit 29eb68b90b5df692ac322be0939af5e7fa9b71bc
Author: Joppe Vos <44...@users.noreply.github.com>
AuthorDate: Sun May 31 15:29:09 2020 +0200

    Create guide for Dataproc Operators (#9037)
    
    * added documentation for dataproc
    
    * added more update information for updateMask
    
    * Added link to information about cluster config api request
    
    * Apply naming convention
    
    * Set all dedents from 4 to 0
    
    * Adjust dedent to 4, for operators
    
    * removed dataproc guide from test_missing_guides
---
 .../google/cloud/example_dags/example_dataproc.py  |  29 +++-
 docs/howto/operator/gcp/dataproc.rst               | 188 +++++++++++++++++++++
 docs/operators-and-hooks-ref.rst                   |   2 +-
 tests/test_project_structure.py                    |   1 -
 4 files changed, 216 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc.py b/airflow/providers/google/cloud/example_dags/example_dataproc.py
index b6e1070..55dbff5 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataproc.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataproc.py
@@ -42,6 +42,8 @@ SPARKR_MAIN = os.environ.get("SPARKR_MAIN", "hello_world.R")
 SPARKR_URI = "gs://{}/{}".format(BUCKET, SPARKR_MAIN)
 
 # Cluster definition
+# [START how_to_cloud_dataproc_create_cluster]
+
 CLUSTER = {
     "project_id": PROJECT_ID,
     "cluster_name": CLUSTER_NAME,
@@ -59,8 +61,10 @@ CLUSTER = {
     },
 }
 
+# [END how_to_cloud_dataproc_create_cluster]
 
 # Update options
+# [START how_to_cloud_dataproc_updatemask_cluster_operator]
 CLUSTER_UPDATE = {
     "config": {
         "worker_config": {"num_instances": 3},
@@ -73,23 +77,28 @@ UPDATE_MASK = {
         "config.secondary_worker_config.num_instances",
     ]
 }
+# [END how_to_cloud_dataproc_updatemask_cluster_operator]
 
 TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
 
-
 # Jobs definitions
+# [START how_to_cloud_dataproc_pig_config]
 PIG_JOB = {
     "reference": {"project_id": PROJECT_ID},
     "placement": {"cluster_name": CLUSTER_NAME},
     "pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
 }
+# [END how_to_cloud_dataproc_pig_config]
 
+# [START how_to_cloud_dataproc_sparksql_config]
 SPARK_SQL_JOB = {
     "reference": {"project_id": PROJECT_ID},
     "placement": {"cluster_name": CLUSTER_NAME},
     "spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
 }
+# [END how_to_cloud_dataproc_sparksql_config]
 
+# [START how_to_cloud_dataproc_spark_config]
 SPARK_JOB = {
     "reference": {"project_id": PROJECT_ID},
     "placement": {"cluster_name": CLUSTER_NAME},
@@ -98,25 +107,33 @@ SPARK_JOB = {
         "main_class": "org.apache.spark.examples.SparkPi",
     },
 }
+# [END how_to_cloud_dataproc_spark_config]
 
+# [START how_to_cloud_dataproc_pyspark_config]
 PYSPARK_JOB = {
     "reference": {"project_id": PROJECT_ID},
     "placement": {"cluster_name": CLUSTER_NAME},
     "pyspark_job": {"main_python_file_uri": PYSPARK_URI},
 }
+# [END how_to_cloud_dataproc_pyspark_config]
 
+# [START how_to_cloud_dataproc_sparkr_config]
 SPARKR_JOB = {
     "reference": {"project_id": PROJECT_ID},
     "placement": {"cluster_name": CLUSTER_NAME},
     "spark_r_job": {"main_r_file_uri": SPARKR_URI},
 }
+# [END how_to_cloud_dataproc_sparkr_config]
 
+# [START how_to_cloud_dataproc_hive_config]
 HIVE_JOB = {
     "reference": {"project_id": PROJECT_ID},
     "placement": {"cluster_name": CLUSTER_NAME},
     "hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
 }
+# [END how_to_cloud_dataproc_hive_config]
 
+# [START how_to_cloud_dataproc_hadoop_config]
 HADOOP_JOB = {
     "reference": {"project_id": PROJECT_ID},
     "placement": {"cluster_name": CLUSTER_NAME},
@@ -125,16 +142,20 @@ HADOOP_JOB = {
         "args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
     },
 }
+# [END how_to_cloud_dataproc_hadoop_config]
 
 with models.DAG(
     "example_gcp_dataproc",
     default_args={"start_date": days_ago(1)},
     schedule_interval=None,
 ) as dag:
+    # [START how_to_cloud_dataproc_create_cluster_operator]
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster", project_id=PROJECT_ID, cluster=CLUSTER, region=REGION
     )
+    # [END how_to_cloud_dataproc_create_cluster_operator]
 
+    # [START how_to_cloud_dataproc_update_cluster_operator]
     scale_cluster = DataprocUpdateClusterOperator(
         task_id="scale_cluster",
         cluster_name=CLUSTER_NAME,
@@ -144,11 +165,11 @@ with models.DAG(
         project_id=PROJECT_ID,
         location=REGION,
     )
+    # [END how_to_cloud_dataproc_update_cluster_operator]
 
     pig_task = DataprocSubmitJobOperator(
         task_id="pig_task", job=PIG_JOB, location=REGION, project_id=PROJECT_ID
     )
-
     spark_sql_task = DataprocSubmitJobOperator(
         task_id="spark_sql_task",
         job=SPARK_SQL_JOB,
@@ -160,9 +181,11 @@ with models.DAG(
         task_id="spark_task", job=SPARK_JOB, location=REGION, project_id=PROJECT_ID
     )
 
+    # [START how_to_cloud_dataproc_submit_job_to_cluster_operator]
     pyspark_task = DataprocSubmitJobOperator(
         task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
     )
+    # [END how_to_cloud_dataproc_submit_job_to_cluster_operator]
 
     sparkr_task = DataprocSubmitJobOperator(
         task_id="sparkr_task", job=SPARKR_JOB, location=REGION, project_id=PROJECT_ID
@@ -176,12 +199,14 @@ with models.DAG(
         task_id="hadoop_task", job=HADOOP_JOB, location=REGION, project_id=PROJECT_ID
     )
 
+    # [START how_to_cloud_dataproc_delete_cluster_operator]
     delete_cluster = DataprocDeleteClusterOperator(
         task_id="delete_cluster",
         project_id=PROJECT_ID,
         cluster_name=CLUSTER_NAME,
         region=REGION,
     )
+    # [END how_to_cloud_dataproc_delete_cluster_operator]
 
     create_cluster >> scale_cluster
     scale_cluster >> hive_task >> delete_cluster
diff --git a/docs/howto/operator/gcp/dataproc.rst b/docs/howto/operator/gcp/dataproc.rst
new file mode 100644
index 0000000..01e1f4a
--- /dev/null
+++ b/docs/howto/operator/gcp/dataproc.rst
@@ -0,0 +1,188 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Google Cloud Dataproc Operators
+===============================
+
+Dataproc is a managed Apache Spark and Apache Hadoop service that lets you
+take advantage of open source data tools for batch processing, querying, streaming and machine learning.
+Dataproc automation helps you create clusters quickly, manage them easily, and
+save money by turning clusters off when you don't need them.
+
+For more information about the service visit `Dataproc production documentation <Product documentation <https://cloud.google.com/dataproc/docs/reference>`__
+
+.. contents::
+  :depth: 1
+  :local:
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+
+.. _howto/operator:DataprocCreateClusterOperator:
+
+Create a Cluster
+----------------
+
+Before you create a dataproc cluster you need to define the cluster.
+It describes the identifying information, config, and status of a cluster of Compute Engine instances.
+For more information about the available fields to pass when creating a cluster, visit `Dataproc create cluster API. <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster>`__
+
+A cluster configuration can look as followed:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_cloud_dataproc_create_cluster]
+    :end-before: [END how_to_cloud_dataproc_create_cluster]
+
+With this configuration we can create the cluster:
+:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateClusterOperator`
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_create_cluster_operator]
+    :end-before: [END how_to_cloud_dataproc_create_cluster_operator]
+
+Update a cluster
+----------------
+You can scale the cluster up or down by providing a cluster config and a updateMask.
+In the updateMask argument you specifies the path, relative to Cluster, of the field to update.
+For more information on updateMask and other parameters take a look at `Dataproc update cluster API. <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/patch>`__
+
+An example of a new cluster config and the updateMask:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_cloud_dataproc_updatemask_cluster_operator]
+    :end-before: [END how_to_cloud_dataproc_updatemask_cluster_operator]
+
+To update a cluster you can use:
+:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocUpdateClusterOperator`
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_update_cluster_operator]
+    :end-before: [END how_to_cloud_dataproc_update_cluster_operator]
+
+Deleting a cluster
+------------------
+
+To delete a cluster you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocDeleteClusterOperator`.
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_delete_cluster_operator]
+    :end-before: [END how_to_cloud_dataproc_delete_cluster_operator]
+
+Submit a job to a cluster
+-------------------------
+
+Dataproc supports submitting jobs of different big data components.
+The list currently includes Spark, Hadoop, Pig and Hive.
+For more information on versions and images take a look at `Cloud Dataproc Image version list <https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-versions>`__
+
+To submit a job to the cluster you need a provide a job source file. The job source file can be on GCS, the cluster or on your local
+file system. You can specify a file:/// path to refer to a local file on a cluster's master node.
+
+The job configuration can be submitted by using:
+:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`.
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_submit_job_to_cluster_operator]
+    :end-before: [END how_to_cloud_dataproc_submit_job_to_cluster_operator]
+
+Examples of job configurations to submit
+----------------------------------------
+
+We have provided an example for every framework below.
+There are more arguments to provide in the jobs than the examples show. For the complete list of arguments take a look at
+`DataProc Job arguments <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs>`__
+
+Example of the configuration for a PySpark Job:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_cloud_dataproc_pyspark_config]
+    :end-before: [END how_to_cloud_dataproc_pyspark_config]
+
+Example of the configuration for a SparkSQl Job:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_cloud_dataproc_sparksql_config]
+    :end-before: [END how_to_cloud_dataproc_sparksql_config]
+
+Example of the configuration for a Spark Job:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_cloud_dataproc_spark_config]
+    :end-before: [END how_to_cloud_dataproc_spark_config]
+
+Example of the configuration for a Hive Job:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_cloud_dataproc_hive_config]
+    :end-before: [END how_to_cloud_dataproc_hive_config]
+
+Example of the configuration for a Hadoop Job:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_cloud_dataproc_hadoop_config]
+    :end-before: [END how_to_cloud_dataproc_hadoop_config]
+
+Example of the configuration for a Pig Job:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_cloud_dataproc_pig_config]
+    :end-before: [END how_to_cloud_dataproc_pig_config]
+
+
+Example of the configuration for a SparkR:
+
+.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_cloud_dataproc_sparkr_config]
+    :end-before: [END how_to_cloud_dataproc_sparkr_config]
+
+References
+^^^^^^^^^^
+For further information, take a look at:
+
+* `DataProc API documentation <https://cloud.google.com/dataproc/docs/reference>`__
+* `Product documentation <https://cloud.google.com/dataproc/docs/reference>`__
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 3eaf1a0..48aeea2 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -697,7 +697,7 @@ These integrations allow you to perform various operations within the Google Clo
      -
 
    * - `Dataproc <https://cloud.google.com/dataproc/>`__
-     -
+     - :doc:`How to use <howto/operator/gcp/dataproc>`
      - :mod:`airflow.providers.google.cloud.hooks.dataproc`
      - :mod:`airflow.providers.google.cloud.operators.dataproc`
      -
diff --git a/tests/test_project_structure.py b/tests/test_project_structure.py
index 8f834d5..bef70b9 100644
--- a/tests/test_project_structure.py
+++ b/tests/test_project_structure.py
@@ -144,7 +144,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
         'bigquery_to_mysql',
         'cassandra_to_gcs',
         'dataflow',
-        'dataproc',
         'datastore',
         'dlp',
         'gcs_to_bigquery',