You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/03/10 18:04:05 UTC

[spark] branch branch-3.2 updated: [SPARK-38379][SPARK-37735][K8S][3.2] Fix Kubernetes Client mode when mounting persistent volume with storage class

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new a35ea15  [SPARK-38379][SPARK-37735][K8S][3.2] Fix Kubernetes Client mode when mounting persistent volume with storage class
a35ea15 is described below

commit a35ea1525793f6933f7729c636703a7731393a92
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Thu Mar 10 10:00:02 2022 -0800

    [SPARK-38379][SPARK-37735][K8S][3.2] Fix Kubernetes Client mode when mounting persistent volume with storage class
    
    ### What changes were proposed in this pull request?
    
    This is the branch-3.2 PR for https://github.com/apache/spark/pull/35792. Note I also pulled in https://github.com/apache/spark/commit/068d53bd5d89c96bf0cdb05d3ec7f2f023cf3875 from [SPARK-37735] to pick up adding appId to KubernetesConf. I don't see that as an issue since its private, this was actually the fix I originally had as I was testing on 3.2 and then realized someone added it on master branch.
    
    Running spark-shell in client mode on Kubernetes cluster when mounting persistent volumes with a storage class results in a big warning being thrown on startup.
    
    https://issues.apache.org/jira/browse/SPARK-38379[](https://github.com/tgravescs)
    
    The issue here is there is a race condition between when spark.app.id is set in SparkContext and when its used, so change to use the KubernetesConf appId, which is what is used to set spark.app.id.
    
    ### Why are the changes needed?
    Throws big warning to user and I believe the label is wrong as well.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    
    Unit test added. The test fails without the fix.
    Also manually tested on real k8s cluster.
    
    Closes #35804 from tgravescs/k8smount32.
    
    Authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../apache/spark/deploy/k8s/KubernetesConf.scala   |  1 +
 .../k8s/features/MountVolumesFeatureStep.scala     |  2 +-
 .../spark/deploy/k8s/KubernetesConfSuite.scala     |  8 +++++++
 .../features/MountVolumesFeatureStepSuite.scala    | 25 ++++++++++++++++++++++
 4 files changed, 35 insertions(+), 1 deletion(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 937c5f5..f5f8931 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -41,6 +41,7 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {
   def secretEnvNamesToKeyRefs: Map[String, String]
   def secretNamesToMountPaths: Map[String, String]
   def volumes: Seq[KubernetesVolumeSpec]
+  def appId: String
 
   def appName: String = get("spark.app.name", "spark")
 
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
index 4e16473..78dd6ec 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
@@ -85,7 +85,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
               .withApiVersion("v1")
               .withNewMetadata()
                 .withName(claimName)
-                .addToLabels(SPARK_APP_ID_LABEL, conf.sparkConf.getAppId)
+                .addToLabels(SPARK_APP_ID_LABEL, conf.appId)
                 .endMetadata()
               .withNewSpec()
                 .withStorageClassName(storageClass.get)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index 0b97322..018dc75 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -170,4 +170,12 @@ class KubernetesConfSuite extends SparkFunSuite {
         "executorEnvVars4-var4" -> "executorEnvVars4",
         "executorEnvVars5-var5" -> "executorEnvVars5/var5"))
   }
+
+  test("SPARK-37735: access appId in KubernetesConf") {
+    val sparkConf = new SparkConf(false)
+    val driverConf = KubernetesTestConf.createDriverConf(sparkConf)
+    val execConf = KubernetesTestConf.createExecutorConf(sparkConf)
+    assert(driverConf.asInstanceOf[KubernetesConf].appId === KubernetesTestConf.APP_ID)
+    assert(execConf.asInstanceOf[KubernetesConf].appId === KubernetesTestConf.APP_ID)
+  }
 }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
index 38f8fac..468d1dd 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
@@ -89,6 +89,31 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
     assert(executorPVC.getClaimName === s"pvc-spark-${KubernetesTestConf.EXECUTOR_ID}")
   }
 
+  test("SPARK-32713 Mounts parameterized persistentVolumeClaims in executors with storage class") {
+    val volumeConf = KubernetesVolumeSpec(
+      "testVolume",
+      "/tmp",
+      "",
+      true,
+      KubernetesPVCVolumeConf("pvc-spark-SPARK_EXECUTOR_ID", Some("fast"), Some("512mb"))
+    )
+    val driverConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
+    val driverStep = new MountVolumesFeatureStep(driverConf)
+    val driverPod = driverStep.configurePod(SparkPod.initialPod())
+
+    assert(driverPod.pod.getSpec.getVolumes.size() === 1)
+    val driverPVC = driverPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
+    assert(driverPVC.getClaimName === "pvc-spark-SPARK_EXECUTOR_ID")
+
+    val executorConf = KubernetesTestConf.createExecutorConf(volumes = Seq(volumeConf))
+    val executorStep = new MountVolumesFeatureStep(executorConf)
+    val executorPod = executorStep.configurePod(SparkPod.initialPod())
+
+    assert(executorPod.pod.getSpec.getVolumes.size() === 1)
+    val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
+    assert(executorPVC.getClaimName === s"pvc-spark-${KubernetesTestConf.EXECUTOR_ID}")
+  }
+
   test("Create and mounts persistentVolumeClaims in driver") {
     val volumeConf = KubernetesVolumeSpec(
       "testVolume",

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org