You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2021/07/23 22:22:25 UTC

[spark] branch master updated: [SPARK-35956][K8S] Support auto assigning labels to decommissioning pods

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bee2799  [SPARK-35956][K8S] Support auto assigning labels to decommissioning pods
bee2799 is described below

commit bee279997f2115af6b15e3dbb7433dccef7f14af
Author: Holden Karau <hk...@netflix.com>
AuthorDate: Fri Jul 23 15:21:38 2021 -0700

    [SPARK-35956][K8S] Support auto assigning labels to decommissioning pods
    
    ### What changes were proposed in this pull request?
    
    Add a new configuration flag to allow Spark to provide hints to the scheduler when we are decommissioning or exiting a pod that this pod will have the least impact for a pre-emption event.
    
    ### Why are the changes needed?
    
    Kubernetes added the concepts of pod disruption budgets (which can have selectors based on labels) as well pod deletion for providing hints to the scheduler as to what we would prefer to have pre-empted.
    
    ### Does this PR introduce _any_ user-facing change?
    
    New configuration flag
    
    ### How was this patch tested?
    
    The deletion unit test was extended.
    
    Closes #33270 from holdenk/SPARK-35956-support-auto-assigning-labels-to-decommissioning-pods.
    
    Lead-authored-by: Holden Karau <hk...@netflix.com>
    Co-authored-by: Holden Karau <ho...@pigscanfly.ca>
    Co-authored-by: Holden Karau <hk...@apple.com>
    Signed-off-by: Holden Karau <hk...@netflix.com>
---
 docs/running-on-kubernetes.md                      | 28 +++++++++---
 .../scala/org/apache/spark/deploy/k8s/Config.scala | 18 ++++++++
 .../k8s/KubernetesClusterSchedulerBackend.scala    | 50 +++++++++++++++++++++-
 .../KubernetesClusterSchedulerBackendSuite.scala   | 47 +++++++++++++++++++-
 .../k8s/integrationtest/DecommissionSuite.scala    | 40 ++++++++++++++---
 5 files changed, 168 insertions(+), 15 deletions(-)

diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 09f7d2ab..b30d61d 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -8,9 +8,9 @@ license: |
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at
- 
+
      http://www.apache.org/licenses/LICENSE-2.0
- 
+
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -422,7 +422,7 @@ Your Kubernetes config file typically lives under `.kube/config` in your home di
 
 ### Contexts
 
-Kubernetes configuration files can contain multiple contexts that allow for switching between different clusters and/or user identities.  By default Spark on Kubernetes will use your current context (which can be checked by running `kubectl config current-context`) when doing the initial auto-configuration of the Kubernetes client.  
+Kubernetes configuration files can contain multiple contexts that allow for switching between different clusters and/or user identities.  By default Spark on Kubernetes will use your current context (which can be checked by running `kubectl config current-context`) when doing the initial auto-configuration of the Kubernetes client.
 
 In order to use an alternative context users can specify the desired context via the Spark configuration property `spark.kubernetes.context` e.g. `spark.kubernetes.context=minikube`.
 
@@ -1038,7 +1038,7 @@ See the [configuration page](configuration.html) for information on Spark config
    <code>spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key</code>.
   </td>
   <td>2.4.0</td>
-</tr>   
+</tr>
 <tr>
   <td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path</code></td>
   <td>(none)</td>
@@ -1270,7 +1270,7 @@ See the [configuration page](configuration.html) for information on Spark config
   </td>
   <td>3.0.0</td>
 </tr>
-<tr>  
+<tr>
   <td><code>spark.kubernetes.appKillPodDeletionGracePeriod</code></td>
   <td>(none)</td>
   <td>
@@ -1288,6 +1288,24 @@ See the [configuration page](configuration.html) for information on Spark config
   </td>
   <td>3.0.0</td>
 </tr>
+<tr>
+  <td><code>spark.kubernetes.executor.decommmissionLabel<code></td>
+  <td>(none)</td>
+  <td>
+    Label to be applied to pods which are exiting or being decommissioned. Intended for use
+    with pod disruption budgets, deletion costs, and similar.
+  </td>
+  <td>3.3.0</td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.executor.decommmissionLabelValue<code></td>
+  <td>(none)</td>
+  <td>
+    Value to be applied with the label when
+    <code>spark.kubernetes.executor.decommmissionLabel</code> is enabled.
+  </td>
+  <td>3.3.0</td>
+</tr>
 </table>
 
 #### Pod template properties
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 49c0a42..33370b7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -305,6 +305,24 @@ private[spark] object Config extends Logging {
       .toSequence
       .createWithDefault(Nil)
 
+  val KUBERNETES_EXECUTOR_DECOMMISSION_LABEL =
+    ConfigBuilder("spark.kubernetes.executor.decommmissionLabel")
+      .doc("Label to apply to a pod which is being decommissioned." +
+        " Designed for use with pod disruption budgets and similar mechanism" +
+        " such as pod-deletion-cost.")
+      .version("3.3.0")
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE =
+    ConfigBuilder("spark.kubernetes.executor.decommmissionLabelValue")
+      .doc("Label value to apply to a pod which is being decommissioned." +
+        " Designed for use with pod disruption budgets and similar mechanism" +
+        " such as pod-deletion-cost.")
+      .version("3.3.0")
+      .stringConf
+      .createOptional
+
   val KUBERNETES_ALLOCATION_BATCH_SIZE =
     ConfigBuilder("spark.kubernetes.allocation.batch.size")
       .doc("Number of pods to launch at once in each round of executor allocation.")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 42a9300..25f6851 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -18,9 +18,11 @@ package org.apache.spark.scheduler.cluster.k8s
 
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 
+import scala.collection.JavaConverters._
 import scala.concurrent.Future
 
 import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.api.model.PodBuilder
 import io.fabric8.kubernetes.client.KubernetesClient
 
 import org.apache.spark.SparkContext
@@ -32,7 +34,8 @@ import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.{RpcAddress, RpcCallContext}
-import org.apache.spark.scheduler.{ExecutorKilled, ExecutorLossReason, TaskSchedulerImpl}
+import org.apache.spark.scheduler.{ExecutorDecommissionInfo, ExecutorKilled, ExecutorLossReason,
+  TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
 import org.apache.spark.util.{ThreadUtils, Utils}
@@ -182,7 +185,52 @@ private[spark] class KubernetesClusterSchedulerBackend(
     super.getExecutorIds()
   }
 
+  private def labelDecommissioningExecs(execIds: Seq[String]) = {
+    // Only kick off the labeling task if we have a label.
+    conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL).foreach { label =>
+      val labelTask = new Runnable() {
+        override def run(): Unit = Utils.tryLogNonFatalError {
+
+          val podsToLabel = kubernetesClient.pods()
+            .withLabel(SPARK_APP_ID_LABEL, applicationId())
+            .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+            .withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*)
+            .list().getItems().asScala
+
+          podsToLabel.foreach { pod =>
+            kubernetesClient.pods()
+              .inNamespace(pod.getMetadata.getNamespace)
+              .withName(pod.getMetadata.getName)
+              .edit({p: Pod => new PodBuilder(p).editMetadata()
+                .addToLabels(label,
+                  conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE).getOrElse(""))
+                .endMetadata()
+                .build()})
+          }
+        }
+      }
+      executorService.execute(labelTask)
+    }
+  }
+
+  override def decommissionExecutors(
+      executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean,
+      triggeredByExecutor: Boolean): Seq[String] = {
+    // If decommissioning is triggered by the executor the K8s cluster manager has already
+    // picked the pod to evict so we don't need to update the labels.
+    if (!triggeredByExecutor) {
+      labelDecommissioningExecs(executorsAndDecomInfo.map(_._1))
+    }
+    super.decommissionExecutors(executorsAndDecomInfo, adjustTargetNumExecutors,
+      triggeredByExecutor)
+  }
+
   override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
+    // If we've decided to remove some executors we should tell Kubernetes that we don't care.
+    labelDecommissioningExecs(executorIds)
+
+    // Tell the executors to exit themselves.
     executorIds.foreach { id =>
       removeExecutor(id, ExecutorKilled)
     }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 5dd84e8..bf17aa3 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -19,8 +19,9 @@ package org.apache.spark.scheduler.cluster.k8s
 import java.util.Arrays
 import java.util.concurrent.TimeUnit
 
-import io.fabric8.kubernetes.api.model.{Pod, PodList}
+import io.fabric8.kubernetes.api.model.{ObjectMeta, Pod, PodList}
 import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
 import org.jmock.lib.concurrent.DeterministicScheduler
 import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
 import org.mockito.ArgumentMatchers.{any, eq => mockitoEq}
@@ -44,6 +45,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
   private val sparkConf = new SparkConf(false)
     .set("spark.executor.instances", "3")
     .set("spark.app.id", TEST_SPARK_APP_ID)
+    .set("spark.kubernetes.executor.decommmissionLabel", "soLong")
+    .set("spark.kubernetes.executor.decommmissionLabelValue", "cruelWorld")
 
   @Mock
   private var sc: SparkContext = _
@@ -166,26 +169,66 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
 
   test("Kill executors") {
     schedulerBackendUnderTest.start()
+
+    val operation = mock(classOf[NonNamespaceOperation[
+      Pod, PodList, PodResource[Pod]]])
+
+    when(podOperations.inNamespace(any())).thenReturn(operation)
     when(podOperations.withField(any(), any())).thenReturn(labeledPods)
+    when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
     when(labeledPods.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
     when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
     when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2")).thenReturn(labeledPods)
 
+    val pod1 = mock(classOf[Pod])
+    val pod1Metadata = mock(classOf[ObjectMeta])
+    when(pod1Metadata.getNamespace).thenReturn("coffeeIsLife")
+    when(pod1Metadata.getName).thenReturn("pod1")
+    when(pod1.getMetadata).thenReturn(pod1Metadata)
+
+    val pod2 = mock(classOf[Pod])
+    val pod2Metadata = mock(classOf[ObjectMeta])
+    when(pod2Metadata.getNamespace).thenReturn("coffeeIsLife")
+    when(pod2Metadata.getName).thenReturn("pod2")
+    when(pod2.getMetadata).thenReturn(pod2Metadata)
+
+    val pod1op = mock(classOf[PodResource[Pod]])
+    val pod2op = mock(classOf[PodResource[Pod]])
+    when(operation.withName("pod1")).thenReturn(pod1op)
+    when(operation.withName("pod2")).thenReturn(pod2op)
+
     val podList = mock(classOf[PodList])
     when(labeledPods.list()).thenReturn(podList)
     when(podList.getItems()).thenReturn(Arrays.asList[Pod]())
+    schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2,
+      TimeUnit.MILLISECONDS)
+    verify(labeledPods, never()).delete()
 
     schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
     verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
     verify(driverEndpointRef).send(RemoveExecutor("2", ExecutorKilled))
     verify(labeledPods, never()).delete()
+    verify(pod1op, never()).edit(any(
+      classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+    verify(pod2op, never()).edit(any(
+      classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
     schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2,
       TimeUnit.MILLISECONDS)
     verify(labeledPods, never()).delete()
+    verify(pod1op, never()).edit(any(
+      classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+    verify(pod2op, never()).edit(any(
+      classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
 
-    when(podList.getItems()).thenReturn(Arrays.asList(mock(classOf[Pod])))
+    when(podList.getItems()).thenReturn(Arrays.asList(pod1))
     schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
     verify(labeledPods, never()).delete()
+    schedulerExecutorService.runUntilIdle()
+    verify(pod1op).edit(any(
+      classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+    verify(pod2op, never()).edit(any(
+      classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+    verify(labeledPods, never()).delete()
     schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2,
       TimeUnit.MILLISECONDS)
     verify(labeledPods).delete()
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index 75c27f6..1250126 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -16,9 +16,12 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest
 
-import org.scalatest.concurrent.PatienceConfiguration
-import org.scalatest.time.Minutes
-import org.scalatest.time.Span
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.Pod
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.matchers.should.Matchers._
+import org.scalatest.time.{Minutes, Seconds, Span}
 
 import org.apache.spark.internal.config
 
@@ -98,10 +101,33 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
       .set(config.DYN_ALLOCATION_MIN_EXECUTORS.key, "1")
       .set(config.DYN_ALLOCATION_INITIAL_EXECUTORS.key, "2")
       .set(config.DYN_ALLOCATION_ENABLED.key, "true")
-      // The default of 30 seconds is fine, but for testing we just want to get this done fast.
-      .set("spark.storage.decommission.replicationReattemptInterval", "1")
+      // The default of 30 seconds is fine, but for testing we just want to
+      // give enough time to validate the labels are set.
+      .set("spark.storage.decommission.replicationReattemptInterval", "75")
+      // Configure labels for decommissioning pods.
+      .set("spark.kubernetes.executor.decommmissionLabel", "solong")
+      .set("spark.kubernetes.executor.decommmissionLabelValue", "cruelworld")
 
-    var execLogs: String = ""
+    // This is called on all exec pods but we only care about exec 0 since it's the "first."
+    // We only do this inside of this test since the other tests trigger k8s side deletes where we
+    // do not apply labels.
+    def checkFirstExecutorPodGetsLabeled(pod: Pod): Unit = {
+      if (pod.getMetadata.getName.endsWith("-1")) {
+        val client = kubernetesTestComponents.kubernetesClient
+        // The label will be added eventually, but k8s objects don't refresh.
+        Eventually.eventually(
+          PatienceConfiguration.Timeout(Span(1200, Seconds)),
+          PatienceConfiguration.Interval(Span(1, Seconds))) {
+
+          val currentPod = client.pods().withName(pod.getMetadata.getName).get
+          val labels = currentPod.getMetadata.getLabels.asScala
+
+          labels should not be (null)
+          labels should (contain key ("solong") and contain value ("cruelworld"))
+        }
+      }
+      doBasicExecutorPyPodCheck(pod)
+    }
 
     runSparkApplicationAndVerifyCompletion(
       appResource = PYSPARK_SCALE,
@@ -113,7 +139,7 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
           "driver killed: 0, unexpectedly exited: 0)."),
       appArgs = Array.empty[String],
       driverPodChecker = doBasicDriverPyPodCheck,
-      executorPodChecker = doBasicExecutorPyPodCheck,
+      executorPodChecker = checkFirstExecutorPodGetsLabeled,
       isJVM = false,
       pyFiles = None,
       executorPatience = Some(None, Some(DECOMMISSIONING_FINISHED_TIMEOUT)),

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org