You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/06/16 07:05:53 UTC
[spark] branch master updated: [SPARK-39490][K8S] Support `ipFamilyPolicy` and `ipFamilies` in Driver Service
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2fa98a6a097 [SPARK-39490][K8S] Support `ipFamilyPolicy` and `ipFamilies` in Driver Service
2fa98a6a097 is described below
commit 2fa98a6a0971cafeb98a2148a962cf8d79381842
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Thu Jun 16 00:05:23 2022 -0700
[SPARK-39490][K8S] Support `ipFamilyPolicy` and `ipFamilies` in Driver Service
### What changes were proposed in this pull request?
This PR aims to support `iFamilyPolicy` and `ipFamilies` K8s Service feature in Spark Driver Service in order to support `IPv6`-only environment. After this PR, we can control `ipFamilyPolicy` and `ipFamilies`.
```yaml
$ kubectl get svc spark-xxx-driver-svc -oyaml
apiVersion: v1
kind: Service
metadata:
...
spec:
clusterIP: None
ipFamilyPolicy: SingleStack
ipFamilies:
- IPv4
...
```
### Why are the changes needed?
K8s IPv4/IPv6 dual-stack Feature reached `Stable` stage at v1.23.
- https://kubernetes.io/docs/concepts/services-networking/dual-stack/
- v1.16 [alpha]
- v1.21 [beta]
- v1.23 [stable]
According to [EKS milestone](https://docs.aws.amazon.com/eks/latest/userguide/kubernetes-versions.html), K8s v1.23 will be GA on August.
Kubernetes version | Upstream release | Amazon EKS release | Amazon EKS end of support
-- | -- | -- | --
1.20 | December 8, 2020 | May 18, 2021 | September 2022
1.21 | April 8, 2021 | July 19, 2021 | February 2023
1.22 | August 4, 2021 | April 4, 2022 | May 2023
1.23 | December 7, 2021 | August 2022 | October 2023
Note that
- [EKS started IPv6 since January 2022.](https://aws.amazon.com/blogs/containers/amazon-eks-launches-ipv6-support/)
- [Docker Desktop supports IPv6 only on Linux.](https://docs.docker.com/config/daemon/ipv6/)
- [Minikube doesn't support IPv6 yet](https://github.com/kubernetes/minikube/issues/8535) unfortunately.
### Does this PR introduce _any_ user-facing change?
Yes, this is a new feature.
### How was this patch tested?
Pass the CIs with newly added test cases.
Closes #36887 from dongjoon-hyun/SPARK-39490.
Authored-by: Dongjoon Hyun <dh...@apple.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../scala/org/apache/spark/deploy/k8s/Config.scala | 16 ++++++
.../k8s/features/DriverServiceFeatureStep.scala | 8 ++-
.../features/DriverServiceFeatureStepSuite.scala | 60 ++++++++++++++++++++++
3 files changed, 83 insertions(+), 1 deletion(-)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 168c86ecb3a..0bc2660bfc8 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -63,6 +63,22 @@ private[spark] object Config extends Logging {
.booleanConf
.createWithDefault(true)
+ val KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY =
+ ConfigBuilder("spark.kubernetes.driver.service.ipFamilyPolicy")
+ .doc("K8s IP Family Policy for Driver Service")
+ .version("3.4.0")
+ .stringConf
+ .checkValues(Set("SingleStack", "PreferDualStack", "RequireDualStack"))
+ .createWithDefault("SingleStack")
+
+ val KUBERNETES_DRIVER_SERVICE_IP_FAMILIES =
+ ConfigBuilder("spark.kubernetes.driver.service.ipFamilies")
+ .doc("A list of IP families for K8s Driver Service")
+ .version("3.4.0")
+ .stringConf
+ .checkValues(Set("IPv4", "IPv6", "IPv4,IPv6", "IPv6,IPv4"))
+ .createWithDefault("IPv4")
+
val KUBERNETES_DRIVER_OWN_PVC =
ConfigBuilder("spark.kubernetes.driver.ownPersistentVolumeClaim")
.doc("If true, driver pod becomes the owner of on-demand persistent volume claims " +
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
index 75c40584a64..8825b5f69b2 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
-import org.apache.spark.deploy.k8s.Config.KUBERNETES_DNSNAME_MAX_LENGTH
+import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNSNAME_MAX_LENGTH, KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.{Clock, SystemClock}
@@ -50,6 +50,10 @@ private[spark] class DriverServiceFeatureStep(
s"$shorterServiceName as the driver service's name.")
shorterServiceName
}
+ private val ipFamilyPolicy =
+ kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY)
+ private val ipFamilies =
+ kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES).split(",").toList.asJava
private val driverPort = kubernetesConf.sparkConf.getInt(
config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT)
@@ -75,6 +79,8 @@ private[spark] class DriverServiceFeatureStep(
.endMetadata()
.withNewSpec()
.withClusterIP("None")
+ .withIpFamilyPolicy(ipFamilyPolicy)
+ .withIpFamilies(ipFamilies)
.withSelector(kubernetesConf.labels.asJava)
.addNewPort()
.withName(DRIVER_PORT_NAME)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index 2e5e60eb39f..ef33bb3069a 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -160,6 +160,66 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
" a Kubernetes service.")
}
+ test("Support ipFamilies spec with default SingleStack and IPv4") {
+ val sparkConf = new SparkConf(false)
+ val kconf = KubernetesTestConf.createDriverConf(
+ sparkConf = sparkConf,
+ labels = DRIVER_LABELS,
+ serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS)
+ val configurationStep = new DriverServiceFeatureStep(kconf)
+ assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
+ val driverService = configurationStep
+ .getAdditionalKubernetesResources()
+ .head
+ .asInstanceOf[Service]
+ assert(driverService.getSpec.getIpFamilyPolicy() == "SingleStack")
+ assert(driverService.getSpec.getIpFamilies.size() === 1)
+ assert(driverService.getSpec.getIpFamilies.get(0) == "IPv4")
+ }
+
+ test("Support ipFamilies spec with SingleStack and IPv6") {
+ val sparkConf = new SparkConf(false)
+ .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, "IPv6")
+ val kconf = KubernetesTestConf.createDriverConf(
+ sparkConf = sparkConf,
+ labels = DRIVER_LABELS,
+ serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS)
+ val configurationStep = new DriverServiceFeatureStep(kconf)
+ assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
+ val driverService = configurationStep
+ .getAdditionalKubernetesResources()
+ .head
+ .asInstanceOf[Service]
+ assert(driverService.getSpec.getIpFamilyPolicy() == "SingleStack")
+ assert(driverService.getSpec.getIpFamilies.size() === 1)
+ assert(driverService.getSpec.getIpFamilies.get(0) == "IPv6")
+ }
+
+ test("Support DualStack") {
+ Seq("PreferDualStack", "RequireDualStack").foreach { stack =>
+ val configAndAnswers = Seq(
+ ("IPv4,IPv6", Seq("IPv4", "IPv6")),
+ ("IPv6,IPv4", Seq("IPv6", "IPv4")))
+ configAndAnswers.foreach { case (config, answer) =>
+ val sparkConf = new SparkConf(false)
+ .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY, stack)
+ .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, config)
+ val kconf = KubernetesTestConf.createDriverConf(
+ sparkConf = sparkConf,
+ labels = DRIVER_LABELS,
+ serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS)
+ val configurationStep = new DriverServiceFeatureStep(kconf)
+ assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
+ val driverService = configurationStep
+ .getAdditionalKubernetesResources()
+ .head
+ .asInstanceOf[Service]
+ assert(driverService.getSpec.getIpFamilyPolicy() == stack)
+ assert(driverService.getSpec.getIpFamilies === answer.asJava)
+ }
+ }
+ }
+
private def verifyService(
driverPort: Int,
blockManagerPort: Int,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org