You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/07/26 09:02:55 UTC
[airflow] branch main updated: Add Spot Instances support with Dataproc Operators (#31644)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4c2ef99772 Add Spot Instances support with Dataproc Operators (#31644)
4c2ef99772 is described below
commit 4c2ef99772203936cdb6387f099a64ec9aa736f2
Author: VladaZakharova <80...@users.noreply.github.com>
AuthorDate: Wed Jul 26 11:02:48 2023 +0200
Add Spot Instances support with Dataproc Operators (#31644)
---
.../providers/google/cloud/operators/dataproc.py | 25 ++++++++++++++++--
.../operators/cloud/dataproc.rst | 30 +++++++++++++++++++---
docs/spelling_wordlist.txt | 4 +++
.../google/cloud/operators/test_dataproc.py | 4 +++
.../example_dataproc_cluster_deferrable.py | 10 ++++++++
.../dataproc/example_dataproc_cluster_generator.py | 2 ++
.../google/cloud/dataproc/example_dataproc_gke.py | 5 ++++
.../google/cloud/dataproc/example_dataproc_hive.py | 10 ++++++++
8 files changed, 85 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index 0d9cf592a1..69b82e34b9 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -26,6 +26,7 @@ import time
import uuid
import warnings
from datetime import datetime, timedelta
+from enum import Enum
from typing import TYPE_CHECKING, Any, Sequence
from google.api_core import operation # type: ignore
@@ -64,6 +65,14 @@ if TYPE_CHECKING:
from airflow.utils.context import Context
+class PreemptibilityType(Enum):
+ """Contains possible Type values of Preemptibility applicable for every secondary worker of Cluster."""
+
+ PREEMPTIBLE = "PREEMPTIBLE"
+ SPOT = "SPOT"
+ PREEMPTIBILITY_UNSPECIFIED = "PREEMPTIBILITY_UNSPECIFIED"
+
+
class ClusterGenerator:
"""Create a new Dataproc Cluster.
@@ -109,7 +118,13 @@ class ClusterGenerator:
Valid values: ``pd-ssd`` (Persistent Disk Solid State Drive) or
``pd-standard`` (Persistent Disk Hard Disk Drive).
:param worker_disk_size: Disk size for the worker nodes
- :param num_preemptible_workers: The # of preemptible worker nodes to spin up
+ :param num_preemptible_workers: The # of VM instances in the instance group as secondary workers
+ inside the cluster with Preemptibility enabled by default.
+ Note, that it is not possible to mix non-preemptible and preemptible secondary workers in
+ one cluster.
+ :param preemptibility: The type of Preemptibility applicable for every secondary worker, see
+ https://cloud.google.com/dataproc/docs/reference/rpc/ \
+ google.cloud.dataproc.v1#google.cloud.dataproc.v1.InstanceGroupConfig.Preemptibility
:param zone: The zone where the cluster will be located. Set to None to auto-zone. (templated)
:param network_uri: The network uri to be used for machine communication, cannot be
specified with subnetwork_uri
@@ -164,6 +179,7 @@ class ClusterGenerator:
worker_disk_type: str = "pd-standard",
worker_disk_size: int = 1024,
num_preemptible_workers: int = 0,
+ preemptibility: str = PreemptibilityType.PREEMPTIBLE.value,
service_account: str | None = None,
service_account_scopes: list[str] | None = None,
idle_delete_ttl: int | None = None,
@@ -177,6 +193,7 @@ class ClusterGenerator:
self.num_masters = num_masters
self.num_workers = num_workers
self.num_preemptible_workers = num_preemptible_workers
+ self.preemptibility = self._set_preemptibility_type(preemptibility)
self.storage_bucket = storage_bucket
self.init_actions_uris = init_actions_uris
self.init_action_timeout = init_action_timeout
@@ -220,6 +237,9 @@ class ClusterGenerator:
if self.single_node and self.num_preemptible_workers > 0:
raise ValueError("Single node cannot have preemptible workers.")
+ def _set_preemptibility_type(self, preemptibility: str):
+ return PreemptibilityType(preemptibility.upper())
+
def _get_init_action_timeout(self) -> dict:
match = re.match(r"^(\d+)([sm])$", self.init_action_timeout)
if match:
@@ -328,6 +348,7 @@ class ClusterGenerator:
"boot_disk_size_gb": self.worker_disk_size,
},
"is_preemptible": True,
+ "preemptibility": self.preemptibility.value,
}
if self.storage_bucket:
@@ -2116,7 +2137,7 @@ class DataprocUpdateClusterOperator(GoogleCloudBaseOperator):
allowed timeout is 1 day.
:param request_id: Optional. A unique id used to identify the request. If the server receives two
``UpdateClusterRequest`` requests with the same id, then the second request will be ignored and the
- first ``google.longrunning.Operation`` created and stored in the backend is returned.
+ first ``google.long-running.Operation`` created and stored in the backend is returned.
:param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
retried.
:param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
index d13227c135..b42cdc7e79 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
@@ -37,8 +37,21 @@ Prerequisite Tasks
Create a Cluster
----------------
-Before you create a dataproc cluster you need to define the cluster.
-It describes the identifying information, config, and status of a cluster of Compute Engine instances.
+When you create a Dataproc cluster, you have the option to choose Compute Engine as the deployment platform.
+In this configuration, Dataproc automatically provisions the required Compute Engine VM instances to run the cluster.
+The VM instances are used for the main node, primary worker and secondary worker nodes (if specified).
+These VM instances are created and managed by Compute Engine, while Dataproc takes care of configuring the software and
+orchestration required for the big data processing tasks.
+By providing the configuration for your nodes, you describe the configuration of primary and
+secondary nodes, and status of a cluster of Compute Engine instances.
+Configuring secondary worker nodes, you can specify the number of workers and their types. By
+enabling the Preemptible option to use Preemptible VMs (equivalent to Spot instances) for those nodes, you
+can take advantage of the cost savings provided by these instances for your Dataproc workloads.
+The primary node, which typically hosts the cluster main and various control services, does not have the Preemptible
+option because it's crucial for the primary node to maintain stability and availability.
+Once a cluster is created, the configuration settings, including the preemptibility of secondary worker nodes,
+cannot be modified directly.
+
For more information about the available fields to pass when creating a cluster, visit `Dataproc create cluster API. <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster>`__
A cluster configuration can look as followed:
@@ -58,7 +71,18 @@ With this configuration we can create the cluster:
:start-after: [START how_to_cloud_dataproc_create_cluster_operator]
:end-before: [END how_to_cloud_dataproc_create_cluster_operator]
-For create Dataproc cluster in Google Kubernetes Engine you should use this cluster configuration:
+Dataproc on GKE deploys Dataproc virtual clusters on a GKE cluster. Unlike Dataproc on Compute Engine clusters,
+Dataproc on GKE virtual clusters do not include separate main and worker VMs. Instead, when you create a Dataproc on
+GKE virtual cluster, Dataproc on GKE creates node pools within a GKE cluster. Dataproc on GKE jobs are run as pods on
+these node pools. The node pools and scheduling of pods on the node pools are managed by GKE.
+
+When creating a GKE Dataproc cluster, you can specify the usage of Preemptible VMs for the underlying compute resources.
+GKE supports the use of Preemptible VMs as a cost-saving measure.
+By enabling Preemptible VMs, GKE will provision the cluster nodes using Preemptible VMs. Or you can create nodes as
+Spot VM instances, which are the latest update to legacy preemptible VMs.
+This can be beneficial for running Dataproc workloads on GKE while optimizing costs.
+
+To create Dataproc cluster in Google Kubernetes Engine you could pass cluster configuration:
.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
:language: python
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index fb721ada25..fbca728f8e 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -769,6 +769,7 @@ InspectContentResponse
InspectTemplate
instafail
installable
+InstanceGroupConfig
instanceTemplates
instantiation
integrations
@@ -1133,6 +1134,9 @@ precheck
Precommit
preconfigured
PredictionServiceClient
+Preemptibility
+preemptibility
+Preemptible
preemptible
prefetch
prefetched
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py b/tests/providers/google/cloud/operators/test_dataproc.py
index b37c6ebc76..38e4ffeef5 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -114,6 +114,7 @@ CONFIG = {
"machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type",
"disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256},
"is_preemptible": True,
+ "preemptibility": "SPOT",
},
"software_config": {"properties": {"properties": "data"}, "optional_components": ["optional_components"]},
"lifecycle_config": {
@@ -174,6 +175,7 @@ CONFIG_WITH_CUSTOM_IMAGE_FAMILY = {
"machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type",
"disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256},
"is_preemptible": True,
+ "preemptibility": "SPOT",
},
"software_config": {"properties": {"properties": "data"}, "optional_components": ["optional_components"]},
"lifecycle_config": {
@@ -372,6 +374,7 @@ class TestsClusterGenerator:
worker_disk_type="worker_disk_type",
worker_disk_size=256,
num_preemptible_workers=4,
+ preemptibility="Spot",
region="region",
service_account="service_account",
service_account_scopes=["service_account_scopes"],
@@ -409,6 +412,7 @@ class TestsClusterGenerator:
worker_disk_type="worker_disk_type",
worker_disk_size=256,
num_preemptible_workers=4,
+ preemptibility="Spot",
region="region",
service_account="service_account",
service_account_scopes=["service_account_scopes"],
diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
index 1128c4cce7..b86ae0f0c2 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
@@ -52,6 +52,16 @@ CLUSTER_CONFIG = {
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
+ "secondary_worker_config": {
+ "num_instances": 1,
+ "machine_type_uri": "n1-standard-4",
+ "disk_config": {
+ "boot_disk_type": "pd-standard",
+ "boot_disk_size_gb": 1024,
+ },
+ "is_preemptible": True,
+ "preemptibility": "PREEMPTIBLE",
+ },
}
# Update options
diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
index be25251e16..f3f68291b2 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
@@ -59,6 +59,8 @@ CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
storage_bucket=BUCKET_NAME,
init_actions_uris=[f"gs://{BUCKET_NAME}/{INIT_FILE}"],
metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
+ num_preemptible_workers=1,
+ preemptibility="PREEMPTIBLE",
).make()
# [END how_to_cloud_dataproc_create_cluster_generate_cluster_config]
diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
index 63e58d2583..d2e5f0fd3d 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
@@ -68,6 +68,11 @@ VIRTUAL_CLUSTER_CONFIG = {
{
"node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp", # noqa
"roles": ["DEFAULT"],
+ "node_pool_config": {
+ "config": {
+ "preemptible": True,
+ }
+ },
}
],
},
diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py
index 8d345f818d..37ebe56dc0 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py
@@ -54,6 +54,16 @@ CLUSTER_CONFIG = {
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
+ "secondary_worker_config": {
+ "num_instances": 1,
+ "machine_type_uri": "n1-standard-4",
+ "disk_config": {
+ "boot_disk_type": "pd-standard",
+ "boot_disk_size_gb": 1024,
+ },
+ "is_preemptible": True,
+ "preemptibility": "PREEMPTIBLE",
+ },
}
# [END how_to_cloud_dataproc_create_cluster]