You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/07/26 09:02:55 UTC

[airflow] branch main updated: Add Spot Instances support with Dataproc Operators (#31644)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c2ef99772 Add Spot Instances support with Dataproc Operators (#31644)
4c2ef99772 is described below

commit 4c2ef99772203936cdb6387f099a64ec9aa736f2
Author: VladaZakharova <80...@users.noreply.github.com>
AuthorDate: Wed Jul 26 11:02:48 2023 +0200

    Add Spot Instances support with Dataproc Operators (#31644)
---
 .../providers/google/cloud/operators/dataproc.py   | 25 ++++++++++++++++--
 .../operators/cloud/dataproc.rst                   | 30 +++++++++++++++++++---
 docs/spelling_wordlist.txt                         |  4 +++
 .../google/cloud/operators/test_dataproc.py        |  4 +++
 .../example_dataproc_cluster_deferrable.py         | 10 ++++++++
 .../dataproc/example_dataproc_cluster_generator.py |  2 ++
 .../google/cloud/dataproc/example_dataproc_gke.py  |  5 ++++
 .../google/cloud/dataproc/example_dataproc_hive.py | 10 ++++++++
 8 files changed, 85 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index 0d9cf592a1..69b82e34b9 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -26,6 +26,7 @@ import time
 import uuid
 import warnings
 from datetime import datetime, timedelta
+from enum import Enum
 from typing import TYPE_CHECKING, Any, Sequence
 
 from google.api_core import operation  # type: ignore
@@ -64,6 +65,14 @@ if TYPE_CHECKING:
     from airflow.utils.context import Context
 
 
+class PreemptibilityType(Enum):
+    """Contains possible Type values of Preemptibility applicable for every secondary worker of Cluster."""
+
+    PREEMPTIBLE = "PREEMPTIBLE"
+    SPOT = "SPOT"
+    PREEMPTIBILITY_UNSPECIFIED = "PREEMPTIBILITY_UNSPECIFIED"
+
+
 class ClusterGenerator:
     """Create a new Dataproc Cluster.
 
@@ -109,7 +118,13 @@ class ClusterGenerator:
         Valid values: ``pd-ssd`` (Persistent Disk Solid State Drive) or
         ``pd-standard`` (Persistent Disk Hard Disk Drive).
     :param worker_disk_size: Disk size for the worker nodes
-    :param num_preemptible_workers: The # of preemptible worker nodes to spin up
+    :param num_preemptible_workers: The # of VM instances in the instance group as secondary workers
+        inside the cluster with Preemptibility enabled by default.
+        Note, that it is not possible to mix non-preemptible and preemptible secondary workers in
+        one cluster.
+    :param preemptibility: The type of Preemptibility applicable for every secondary worker, see
+        https://cloud.google.com/dataproc/docs/reference/rpc/ \
+        google.cloud.dataproc.v1#google.cloud.dataproc.v1.InstanceGroupConfig.Preemptibility
     :param zone: The zone where the cluster will be located. Set to None to auto-zone. (templated)
     :param network_uri: The network uri to be used for machine communication, cannot be
         specified with subnetwork_uri
@@ -164,6 +179,7 @@ class ClusterGenerator:
         worker_disk_type: str = "pd-standard",
         worker_disk_size: int = 1024,
         num_preemptible_workers: int = 0,
+        preemptibility: str = PreemptibilityType.PREEMPTIBLE.value,
         service_account: str | None = None,
         service_account_scopes: list[str] | None = None,
         idle_delete_ttl: int | None = None,
@@ -177,6 +193,7 @@ class ClusterGenerator:
         self.num_masters = num_masters
         self.num_workers = num_workers
         self.num_preemptible_workers = num_preemptible_workers
+        self.preemptibility = self._set_preemptibility_type(preemptibility)
         self.storage_bucket = storage_bucket
         self.init_actions_uris = init_actions_uris
         self.init_action_timeout = init_action_timeout
@@ -220,6 +237,9 @@ class ClusterGenerator:
         if self.single_node and self.num_preemptible_workers > 0:
             raise ValueError("Single node cannot have preemptible workers.")
 
+    def _set_preemptibility_type(self, preemptibility: str):
+        return PreemptibilityType(preemptibility.upper())
+
     def _get_init_action_timeout(self) -> dict:
         match = re.match(r"^(\d+)([sm])$", self.init_action_timeout)
         if match:
@@ -328,6 +348,7 @@ class ClusterGenerator:
                     "boot_disk_size_gb": self.worker_disk_size,
                 },
                 "is_preemptible": True,
+                "preemptibility": self.preemptibility.value,
             }
 
         if self.storage_bucket:
@@ -2116,7 +2137,7 @@ class DataprocUpdateClusterOperator(GoogleCloudBaseOperator):
         allowed timeout is 1 day.
     :param request_id: Optional. A unique id used to identify the request. If the server receives two
         ``UpdateClusterRequest`` requests with the same id, then the second request will be ignored and the
-        first ``google.longrunning.Operation`` created and stored in the backend is returned.
+        first ``google.long-running.Operation`` created and stored in the backend is returned.
     :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
         retried.
     :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
index d13227c135..b42cdc7e79 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
@@ -37,8 +37,21 @@ Prerequisite Tasks
 Create a Cluster
 ----------------
 
-Before you create a dataproc cluster you need to define the cluster.
-It describes the identifying information, config, and status of a cluster of Compute Engine instances.
+When you create a Dataproc cluster, you have the option to choose Compute Engine as the deployment platform.
+In this configuration, Dataproc automatically provisions the required Compute Engine VM instances to run the cluster.
+The VM instances are used for the main node, primary worker and secondary worker nodes (if specified).
+These VM instances are created and managed by Compute Engine, while Dataproc takes care of configuring the software and
+orchestration required for the big data processing tasks.
+By providing the configuration for your nodes, you describe the configuration of primary and
+secondary nodes, and status of a cluster of Compute Engine instances.
+Configuring secondary worker nodes, you can specify the number of workers and their types. By
+enabling the Preemptible option to use Preemptible VMs (equivalent to Spot instances) for those nodes, you
+can take advantage of the cost savings provided by these instances for your Dataproc workloads.
+The primary node, which typically hosts the cluster main and various control services, does not have the Preemptible
+option because it's crucial for the primary node to maintain stability and availability.
+Once a cluster is created, the configuration settings, including the preemptibility of secondary worker nodes,
+cannot be modified directly.
+
 For more information about the available fields to pass when creating a cluster, visit `Dataproc create cluster API. <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster>`__
 
 A cluster configuration can look as followed:
@@ -58,7 +71,18 @@ With this configuration we can create the cluster:
     :start-after: [START how_to_cloud_dataproc_create_cluster_operator]
     :end-before: [END how_to_cloud_dataproc_create_cluster_operator]
 
-For create Dataproc cluster in Google Kubernetes Engine you should use this cluster configuration:
+Dataproc on GKE deploys Dataproc virtual clusters on a GKE cluster. Unlike Dataproc on Compute Engine clusters,
+Dataproc on GKE virtual clusters do not include separate main and worker VMs. Instead, when you create a Dataproc on
+GKE virtual cluster, Dataproc on GKE creates node pools within a GKE cluster. Dataproc on GKE jobs are run as pods on
+these node pools. The node pools and scheduling of pods on the node pools are managed by GKE.
+
+When creating a GKE Dataproc cluster, you can specify the usage of Preemptible VMs for the underlying compute resources.
+GKE supports the use of Preemptible VMs as a cost-saving measure.
+By enabling Preemptible VMs, GKE will provision the cluster nodes using Preemptible VMs. Or you can create nodes as
+Spot VM instances, which are the latest update to legacy preemptible VMs.
+This can be beneficial for running Dataproc workloads on GKE while optimizing costs.
+
+To create Dataproc cluster in Google Kubernetes Engine you could pass cluster configuration:
 
 .. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
     :language: python
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index fb721ada25..fbca728f8e 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -769,6 +769,7 @@ InspectContentResponse
 InspectTemplate
 instafail
 installable
+InstanceGroupConfig
 instanceTemplates
 instantiation
 integrations
@@ -1133,6 +1134,9 @@ precheck
 Precommit
 preconfigured
 PredictionServiceClient
+Preemptibility
+preemptibility
+Preemptible
 preemptible
 prefetch
 prefetched
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py b/tests/providers/google/cloud/operators/test_dataproc.py
index b37c6ebc76..38e4ffeef5 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -114,6 +114,7 @@ CONFIG = {
         "machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type",
         "disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256},
         "is_preemptible": True,
+        "preemptibility": "SPOT",
     },
     "software_config": {"properties": {"properties": "data"}, "optional_components": ["optional_components"]},
     "lifecycle_config": {
@@ -174,6 +175,7 @@ CONFIG_WITH_CUSTOM_IMAGE_FAMILY = {
         "machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type",
         "disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256},
         "is_preemptible": True,
+        "preemptibility": "SPOT",
     },
     "software_config": {"properties": {"properties": "data"}, "optional_components": ["optional_components"]},
     "lifecycle_config": {
@@ -372,6 +374,7 @@ class TestsClusterGenerator:
             worker_disk_type="worker_disk_type",
             worker_disk_size=256,
             num_preemptible_workers=4,
+            preemptibility="Spot",
             region="region",
             service_account="service_account",
             service_account_scopes=["service_account_scopes"],
@@ -409,6 +412,7 @@ class TestsClusterGenerator:
             worker_disk_type="worker_disk_type",
             worker_disk_size=256,
             num_preemptible_workers=4,
+            preemptibility="Spot",
             region="region",
             service_account="service_account",
             service_account_scopes=["service_account_scopes"],
diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
index 1128c4cce7..b86ae0f0c2 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
@@ -52,6 +52,16 @@ CLUSTER_CONFIG = {
         "machine_type_uri": "n1-standard-4",
         "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
     },
+    "secondary_worker_config": {
+        "num_instances": 1,
+        "machine_type_uri": "n1-standard-4",
+        "disk_config": {
+            "boot_disk_type": "pd-standard",
+            "boot_disk_size_gb": 1024,
+        },
+        "is_preemptible": True,
+        "preemptibility": "PREEMPTIBLE",
+    },
 }
 
 # Update options
diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
index be25251e16..f3f68291b2 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
@@ -59,6 +59,8 @@ CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
     storage_bucket=BUCKET_NAME,
     init_actions_uris=[f"gs://{BUCKET_NAME}/{INIT_FILE}"],
     metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
+    num_preemptible_workers=1,
+    preemptibility="PREEMPTIBLE",
 ).make()
 
 # [END how_to_cloud_dataproc_create_cluster_generate_cluster_config]
diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
index 63e58d2583..d2e5f0fd3d 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
@@ -68,6 +68,11 @@ VIRTUAL_CLUSTER_CONFIG = {
                 {
                     "node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp",  # noqa
                     "roles": ["DEFAULT"],
+                    "node_pool_config": {
+                        "config": {
+                            "preemptible": True,
+                        }
+                    },
                 }
             ],
         },
diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py
index 8d345f818d..37ebe56dc0 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py
@@ -54,6 +54,16 @@ CLUSTER_CONFIG = {
         "machine_type_uri": "n1-standard-4",
         "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
     },
+    "secondary_worker_config": {
+        "num_instances": 1,
+        "machine_type_uri": "n1-standard-4",
+        "disk_config": {
+            "boot_disk_type": "pd-standard",
+            "boot_disk_size_gb": 1024,
+        },
+        "is_preemptible": True,
+        "preemptibility": "PREEMPTIBLE",
+    },
 }
 
 # [END how_to_cloud_dataproc_create_cluster]