You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/04/03 07:05:25 UTC

[spark] branch branch-3.1 updated: [SPARK-34948][K8S] Add ownerReference to executor configmap to fix leakages

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 71c2f15  [SPARK-34948][K8S] Add ownerReference to executor configmap to fix leakages
71c2f15 is described below

commit 71c2f158e51106d6fcf24abae0f5b2723f48a96e
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Sat Apr 3 00:00:17 2021 -0700

    [SPARK-34948][K8S] Add ownerReference to executor configmap to fix leakages
    
    This PR aims to add `ownerReference` to the executor ConfigMap to fix leakage.
    
    SPARK-30985 maintains the executor config map explicitly inside Spark. However, this config map can be leaked when Spark drivers die accidentally or are killed by K8s. We need to add `ownerReference` to make K8s do the garbage collection these automatically.
    
    The number of ConfigMap is one of the resource quota. So, the leaked configMaps currently cause Spark jobs submission failures.
    
    No.
    
    Pass the CIs and check manually.
    
    K8s IT is tested manually.
    ```
    KubernetesSuite:
    - Run SparkPi with no resources
    - Run SparkPi with a very long application name.
    - Use SparkLauncher.NO_RESOURCE
    - Run SparkPi with a master URL without a scheme.
    - Run SparkPi with an argument.
    - Run SparkPi with custom labels, annotations, and environment variables.
    - All pods have the same service account by default
    - Run extraJVMOptions check on driver
    - Run SparkRemoteFileTest using a remote data file
    - Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j.properties
    - Run SparkPi with env and mount secrets.
    - Run PySpark on simple pi.py example
    - Run PySpark to test a pyfiles example
    - Run PySpark with memory customization
    - Run in client mode.
    - Start pod creation from template
    - PVs with local storage
    - Launcher client dependencies
    - SPARK-33615: Launcher client archives
    - SPARK-33748: Launcher python client respecting PYSPARK_PYTHON
    - SPARK-33748: Launcher python client respecting spark.pyspark.python and spark.pyspark.driver.python
    - Launcher python client dependencies using a zip file
    - Test basic decommissioning
    - Test basic decommissioning with shuffle cleanup
    - Test decommissioning with dynamic allocation & shuffle cleanups
    - Test decommissioning timeouts
    - Run SparkR on simple dataframe.R example
    Run completed in 19 minutes, 2 seconds.
    Total number of tests run: 27
    Suites: completed 2, aborted 0
    Tests: succeeded 27, failed 0, canceled 0, ignored 0, pending 0
    All tests passed.
    ```
    
    **BEFORE**
    ```
    $ k get cm spark-exec-450b417895b3b2c7-conf-map -oyaml | grep ownerReferences
    ```
    
    **AFTER**
    ```
    $ k get cm spark-exec-bb37a27895b1c26c-conf-map -oyaml | grep ownerReferences
            f:ownerReferences:
    ```
    
    Closes #32042 from dongjoon-hyun/SPARK-34948.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit a42dc93a2abf9490d68146b3586aec7fe2f9c102)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 2 +-
 .../scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala  | 7 +++++--
 .../cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala       | 1 +
 3 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 5fc81a6..5ebd172 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -70,7 +70,7 @@ private[spark] class ExecutorPodsAllocator(
 
   private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)
 
-  private val driverPod = kubernetesDriverPodName
+  val driverPod = kubernetesDriverPodName
     .map(name => Option(kubernetesClient.pods()
       .withName(name)
       .get())
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index d58e38a..887afca 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -20,11 +20,13 @@ import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 
 import scala.concurrent.Future
 
+import io.fabric8.kubernetes.api.model.Pod
 import io.fabric8.kubernetes.client.KubernetesClient
 
 import org.apache.spark.SparkContext
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesUtils
 import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
@@ -67,13 +69,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
     }
   }
 
-  private def setUpExecutorConfigMap(): Unit = {
+  private def setUpExecutorConfigMap(driverPod: Option[Pod]): Unit = {
     val configMapName = KubernetesClientUtils.configMapNameExecutor
     val confFilesMap = KubernetesClientUtils
       .buildSparkConfDirFilesMap(configMapName, conf, Map.empty)
     val labels =
       Map(SPARK_APP_ID_LABEL -> applicationId(), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE)
     val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap, labels)
+    KubernetesUtils.addOwnerReference(driverPod.orNull, Seq(configMap))
     kubernetesClient.configMaps().create(configMap)
   }
 
@@ -96,7 +99,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
     podAllocator.start(applicationId(), this)
     watchEvents.start(applicationId())
     pollEvents.start(applicationId())
-    setUpExecutorConfigMap()
+    setUpExecutorConfigMap(podAllocator.driverPod)
   }
 
   override def stop(): Unit = {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index e4a73e2..3573ffc 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -112,6 +112,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
       .thenReturn(driverEndpointRef)
     when(kubernetesClient.pods()).thenReturn(podOperations)
     when(kubernetesClient.configMaps()).thenReturn(configMapsOperations)
+    when(podAllocator.driverPod).thenReturn(None)
     schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
       taskScheduler,
       sc,

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org