You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/06/16 07:05:53 UTC

[spark] branch master updated: [SPARK-39490][K8S] Support `ipFamilyPolicy` and `ipFamilies` in Driver Service

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fa98a6a097 [SPARK-39490][K8S] Support `ipFamilyPolicy` and `ipFamilies` in Driver Service
2fa98a6a097 is described below

commit 2fa98a6a0971cafeb98a2148a962cf8d79381842
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Thu Jun 16 00:05:23 2022 -0700

    [SPARK-39490][K8S] Support `ipFamilyPolicy` and `ipFamilies` in Driver Service
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `iFamilyPolicy` and `ipFamilies` K8s Service feature in Spark Driver Service in order to support `IPv6`-only environment. After this PR, we can control `ipFamilyPolicy` and `ipFamilies`.
    
    ```yaml
    $ kubectl get svc spark-xxx-driver-svc -oyaml
    apiVersion: v1
    kind: Service
    metadata:
      ...
    spec:
      clusterIP: None
      ipFamilyPolicy: SingleStack
      ipFamilies:
      - IPv4
    ...
    ```
    
    ### Why are the changes needed?
    
    K8s IPv4/IPv6 dual-stack Feature reached `Stable` stage at v1.23.
    - https://kubernetes.io/docs/concepts/services-networking/dual-stack/
      - v1.16 [alpha]
      - v1.21 [beta]
      - v1.23 [stable]
    
    According to [EKS milestone](https://docs.aws.amazon.com/eks/latest/userguide/kubernetes-versions.html), K8s v1.23 will be GA on August.
    
    Kubernetes version | Upstream release | Amazon EKS release | Amazon EKS end of support
    -- | -- | -- | --
    1.20 | December 8, 2020 | May 18, 2021 | September 2022
    1.21 | April 8, 2021 | July 19, 2021 | February 2023
    1.22 | August 4, 2021 | April 4, 2022 | May 2023
    1.23 | December 7, 2021 | August 2022 | October 2023
    
    Note that
    - [EKS started IPv6 since January 2022.](https://aws.amazon.com/blogs/containers/amazon-eks-launches-ipv6-support/)
    - [Docker Desktop supports IPv6 only on Linux.](https://docs.docker.com/config/daemon/ipv6/)
    - [Minikube doesn't support IPv6 yet](https://github.com/kubernetes/minikube/issues/8535) unfortunately.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this is a new feature.
    
    ### How was this patch tested?
    
    Pass the CIs with newly added test cases.
    
    Closes #36887 from dongjoon-hyun/SPARK-39490.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../scala/org/apache/spark/deploy/k8s/Config.scala | 16 ++++++
 .../k8s/features/DriverServiceFeatureStep.scala    |  8 ++-
 .../features/DriverServiceFeatureStepSuite.scala   | 60 ++++++++++++++++++++++
 3 files changed, 83 insertions(+), 1 deletion(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 168c86ecb3a..0bc2660bfc8 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -63,6 +63,22 @@ private[spark] object Config extends Logging {
       .booleanConf
       .createWithDefault(true)
 
+  val KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY =
+    ConfigBuilder("spark.kubernetes.driver.service.ipFamilyPolicy")
+      .doc("K8s IP Family Policy for Driver Service")
+      .version("3.4.0")
+      .stringConf
+      .checkValues(Set("SingleStack", "PreferDualStack", "RequireDualStack"))
+      .createWithDefault("SingleStack")
+
+  val KUBERNETES_DRIVER_SERVICE_IP_FAMILIES =
+    ConfigBuilder("spark.kubernetes.driver.service.ipFamilies")
+      .doc("A list of IP families for K8s Driver Service")
+      .version("3.4.0")
+      .stringConf
+      .checkValues(Set("IPv4", "IPv6", "IPv4,IPv6", "IPv6,IPv4"))
+      .createWithDefault("IPv4")
+
   val KUBERNETES_DRIVER_OWN_PVC =
     ConfigBuilder("spark.kubernetes.driver.ownPersistentVolumeClaim")
       .doc("If true, driver pod becomes the owner of on-demand persistent volume claims " +
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
index 75c40584a64..8825b5f69b2 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}
 
 import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
-import org.apache.spark.deploy.k8s.Config.KUBERNETES_DNSNAME_MAX_LENGTH
+import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNSNAME_MAX_LENGTH, KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY}
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.util.{Clock, SystemClock}
@@ -50,6 +50,10 @@ private[spark] class DriverServiceFeatureStep(
       s"$shorterServiceName as the driver service's name.")
     shorterServiceName
   }
+  private val ipFamilyPolicy =
+    kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY)
+  private val ipFamilies =
+    kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES).split(",").toList.asJava
 
   private val driverPort = kubernetesConf.sparkConf.getInt(
     config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT)
@@ -75,6 +79,8 @@ private[spark] class DriverServiceFeatureStep(
         .endMetadata()
       .withNewSpec()
         .withClusterIP("None")
+        .withIpFamilyPolicy(ipFamilyPolicy)
+        .withIpFamilies(ipFamilies)
         .withSelector(kubernetesConf.labels.asJava)
         .addNewPort()
           .withName(DRIVER_PORT_NAME)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index 2e5e60eb39f..ef33bb3069a 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -160,6 +160,66 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
       " a Kubernetes service.")
   }
 
+  test("Support ipFamilies spec with default SingleStack and IPv4") {
+    val sparkConf = new SparkConf(false)
+    val kconf = KubernetesTestConf.createDriverConf(
+      sparkConf = sparkConf,
+      labels = DRIVER_LABELS,
+      serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS)
+    val configurationStep = new DriverServiceFeatureStep(kconf)
+    assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
+    val driverService = configurationStep
+      .getAdditionalKubernetesResources()
+      .head
+      .asInstanceOf[Service]
+    assert(driverService.getSpec.getIpFamilyPolicy() == "SingleStack")
+    assert(driverService.getSpec.getIpFamilies.size() === 1)
+    assert(driverService.getSpec.getIpFamilies.get(0) == "IPv4")
+  }
+
+  test("Support ipFamilies spec with SingleStack and IPv6") {
+    val sparkConf = new SparkConf(false)
+      .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, "IPv6")
+    val kconf = KubernetesTestConf.createDriverConf(
+      sparkConf = sparkConf,
+      labels = DRIVER_LABELS,
+      serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS)
+    val configurationStep = new DriverServiceFeatureStep(kconf)
+    assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
+    val driverService = configurationStep
+      .getAdditionalKubernetesResources()
+      .head
+      .asInstanceOf[Service]
+    assert(driverService.getSpec.getIpFamilyPolicy() == "SingleStack")
+    assert(driverService.getSpec.getIpFamilies.size() === 1)
+    assert(driverService.getSpec.getIpFamilies.get(0) == "IPv6")
+  }
+
+  test("Support DualStack") {
+    Seq("PreferDualStack", "RequireDualStack").foreach { stack =>
+      val configAndAnswers = Seq(
+        ("IPv4,IPv6", Seq("IPv4", "IPv6")),
+        ("IPv6,IPv4", Seq("IPv6", "IPv4")))
+      configAndAnswers.foreach { case (config, answer) =>
+        val sparkConf = new SparkConf(false)
+          .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY, stack)
+          .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, config)
+        val kconf = KubernetesTestConf.createDriverConf(
+          sparkConf = sparkConf,
+          labels = DRIVER_LABELS,
+          serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS)
+        val configurationStep = new DriverServiceFeatureStep(kconf)
+        assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
+        val driverService = configurationStep
+          .getAdditionalKubernetesResources()
+          .head
+          .asInstanceOf[Service]
+        assert(driverService.getSpec.getIpFamilyPolicy() == stack)
+        assert(driverService.getSpec.getIpFamilies === answer.asJava)
+      }
+    }
+  }
+
   private def verifyService(
       driverPort: Int,
       blockManagerPort: Int,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org