You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/08/16 23:08:58 UTC

[spark] branch branch-3.2 updated: [SPARK-36052][K8S] Introducing a limit for pending PODs

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new eb09be9  [SPARK-36052][K8S] Introducing a limit for pending PODs
eb09be9 is described below

commit eb09be9e68737bf2f29ca5391874f4c5fa0de3e2
Author: attilapiros <pi...@gmail.com>
AuthorDate: Tue Aug 10 20:16:21 2021 -0700

    [SPARK-36052][K8S] Introducing a limit for pending PODs
    
    Introducing a limit for pending PODs (newly created/requested executors included).
    This limit is global for all the resource profiles. So first we have to count all the newly created and pending PODs (decreased by the ones which requested to be deleted) then we can share the remaining pending POD slots among the resource profiles.
    
    Without this PR dynamic allocation could request too many PODs and the K8S scheduler could be overloaded and scheduling of PODs will be affected by the load.
    
    No.
    
    With new unit tests.
    
    Closes #33492 from attilapiros/SPARK-36052.
    
    Authored-by: attilapiros <pi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 1dced492fb286a7ada73d886fe264f5df0e2b3da)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../scala/org/apache/spark/deploy/k8s/Config.scala | 12 +++
 .../cluster/k8s/ExecutorPodsAllocator.scala        | 85 +++++++++++++++-------
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 77 ++++++++++++++++++++
 3 files changed, 148 insertions(+), 26 deletions(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 9fceca7..4d352f7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -543,6 +543,18 @@ private[spark] object Config extends Logging {
       .checkValue(delay => delay > 0, "delay must be a positive time value")
       .createWithDefaultString("30s")
 
+  val KUBERNETES_MAX_PENDING_PODS =
+    ConfigBuilder("spark.kubernetes.allocation.maxPendingPods")
+      .doc("Maximum number of pending PODs allowed during executor allocation for this " +
+        "application. Those newly requested executors which are unknown by Kubernetes yet are " +
+        "also counted into this limit as they will change into pending PODs by time. " +
+        "This limit is independent from the resource profiles as it limits the sum of all " +
+        "allocation for all the used resource profiles.")
+      .version("3.2.0")
+      .intConf
+      .checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer")
+      .createWithDefault(Int.MaxValue)
+
   val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
   val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
   val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = "spark.kubernetes.driver.service.annotation."
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index d6dc13e..cee5360 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -57,6 +57,8 @@ private[spark] class ExecutorPodsAllocator(
 
   private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
 
+  private val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS)
+
   private val podCreationTimeout = math.max(
     podAllocationDelay * 5,
     conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
@@ -217,9 +219,15 @@ private[spark] class ExecutorPodsAllocator(
       }
     }
 
+    // sum of all the pending pods unknown by the scheduler (total for all the resources)
     var totalPendingCount = 0
-    // The order we request executors for each ResourceProfile is not guaranteed.
-    totalExpectedExecutorsPerResourceProfileId.asScala.foreach { case (rpId, targetNum) =>
+    // total not running pods (including scheduler known & unknown, pending & newly requested ones)
+    var totalNotRunningPodCount = 0
+    val podsToAllocateWithRpId = totalExpectedExecutorsPerResourceProfileId
+      .asScala
+      .toSeq
+      .sortBy(_._1)
+      .flatMap { case (rpId, targetNum) =>
       val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, mutable.HashMap.empty)
 
       val currentRunningCount = podsForRpId.values.count {
@@ -235,7 +243,7 @@ private[spark] class ExecutorPodsAllocator(
       }
       // This variable is used later to print some debug logs. It's updated when cleaning up
       // excess pod requests, since currentPendingExecutorsForRpId is immutable.
-      var knownPendingCount = currentPendingExecutorsForRpId.size
+      var pendingCountForRpId = currentPendingExecutorsForRpId.size
 
       val newlyCreatedExecutorsForRpId =
         newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) =>
@@ -248,12 +256,12 @@ private[spark] class ExecutorPodsAllocator(
         }
 
       if (podsForRpId.nonEmpty) {
-        logDebug(s"ResourceProfile Id: $rpId " +
+        logDebug(s"ResourceProfile Id: $rpId (" +
           s"pod allocation status: $currentRunningCount running, " +
           s"${currentPendingExecutorsForRpId.size} unknown pending, " +
           s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known pending, " +
           s"${newlyCreatedExecutorsForRpId.size} unknown newly created, " +
-          s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created.")
+          s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created)")
       }
 
       // It's possible that we have outstanding pods that are outdated when dynamic allocation
@@ -264,21 +272,22 @@ private[spark] class ExecutorPodsAllocator(
       //
       // TODO: with dynamic allocation off, handle edge cases if we end up with more running
       // executors than expected.
-      val knownPodCount = currentRunningCount +
+      var notRunningPodCountForRpId =
         currentPendingExecutorsForRpId.size + schedulerKnownPendingExecsForRpId.size +
         newlyCreatedExecutorsForRpId.size + schedulerKnownNewlyCreatedExecsForRpId.size
+      val podCountForRpId = currentRunningCount + notRunningPodCountForRpId
 
-      if (knownPodCount > targetNum) {
-        val excess = knownPodCount - targetNum
+      if (podCountForRpId > targetNum) {
+        val excess = podCountForRpId - targetNum
         val newlyCreatedToDelete = newlyCreatedExecutorsForRpId
           .filter { case (_, (_, createTime)) =>
             currentTime - createTime > executorIdleTimeout
           }.keys.take(excess).toList
-        val knownPendingToDelete = currentPendingExecutorsForRpId
+        val pendingToDelete = currentPendingExecutorsForRpId
           .filter(x => isExecutorIdleTimedOut(x._2, currentTime))
           .take(excess - newlyCreatedToDelete.size)
           .map { case (id, _) => id }
-        val toDelete = newlyCreatedToDelete ++ knownPendingToDelete
+        val toDelete = newlyCreatedToDelete ++ pendingToDelete
 
         if (toDelete.nonEmpty) {
           logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
@@ -293,32 +302,49 @@ private[spark] class ExecutorPodsAllocator(
               .withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
               .delete()
             newlyCreatedExecutors --= newlyCreatedToDelete
-            knownPendingCount -= knownPendingToDelete.size
+            pendingCountForRpId -= pendingToDelete.size
+            notRunningPodCountForRpId -= toDelete.size
           }
         }
       }
-
-      if (newlyCreatedExecutorsForRpId.isEmpty
-        && knownPodCount < targetNum) {
-        requestNewExecutors(targetNum, knownPodCount, applicationId, rpId, k8sKnownPVCNames)
-      }
-      totalPendingCount += knownPendingCount
+      totalPendingCount += pendingCountForRpId
+      totalNotRunningPodCount += notRunningPodCountForRpId
 
       // The code below just prints debug messages, which are only useful when there's a change
       // in the snapshot state. Since the messages are a little spammy, avoid them when we know
       // there are no useful updates.
       if (log.isDebugEnabled && snapshots.nonEmpty) {
-        val outstanding = knownPendingCount + newlyCreatedExecutorsForRpId.size
+        val outstanding = pendingCountForRpId + newlyCreatedExecutorsForRpId.size
         if (currentRunningCount >= targetNum && !dynamicAllocationEnabled) {
           logDebug(s"Current number of running executors for ResourceProfile Id $rpId is " +
             "equal to the number of requested executors. Not scaling up further.")
         } else {
-          if (outstanding > 0) {
-            logDebug(s"Still waiting for $outstanding executors for ResourceProfile " +
-              s"Id $rpId before requesting more.")
+          if (newlyCreatedExecutorsForRpId.nonEmpty) {
+            logDebug(s"Still waiting for ${newlyCreatedExecutorsForRpId.size} executors for " +
+              s"ResourceProfile Id $rpId before requesting more.")
           }
         }
       }
+      if (newlyCreatedExecutorsForRpId.isEmpty && podCountForRpId < targetNum) {
+        Some(rpId, podCountForRpId, targetNum)
+      } else {
+        // for this resource profile we do not request more PODs
+        None
+      }
+    }
+
+    val remainingSlotFromPendingPods = maxPendingPods - totalNotRunningPodCount
+    if (remainingSlotFromPendingPods > 0 && podsToAllocateWithRpId.size > 0) {
+      ExecutorPodsAllocator.splitSlots(podsToAllocateWithRpId, remainingSlotFromPendingPods)
+        .foreach { case ((rpId, podCountForRpId, targetNum), sharedSlotFromPendingPods) =>
+        val numMissingPodsForRpId = targetNum - podCountForRpId
+        val numExecutorsToAllocate =
+          math.min(math.min(numMissingPodsForRpId, podAllocationSize), sharedSlotFromPendingPods)
+        logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " +
+          s"ResourceProfile Id: $rpId, target: $targetNum, known: $podCountForRpId, " +
+          s"sharedSlotFromPendingPods: $sharedSlotFromPendingPods.")
+        requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, k8sKnownPVCNames)
+      }
     }
     deletedExecutorIds = _deletedExecutorIds
 
@@ -347,14 +373,10 @@ private[spark] class ExecutorPodsAllocator(
   }
 
   private def requestNewExecutors(
-      expected: Int,
-      running: Int,
+      numExecutorsToAllocate: Int,
       applicationId: String,
       resourceProfileId: Int,
       pvcsInUse: Seq[String]): Unit = {
-    val numExecutorsToAllocate = math.min(expected - running, podAllocationSize)
-    logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " +
-      s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.")
     // Check reusable PVCs for this executor allocation batch
     val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
     for ( _ <- 0 until numExecutorsToAllocate) {
@@ -440,3 +462,14 @@ private[spark] class ExecutorPodsAllocator(
     }
   }
 }
+
+private[spark] object ExecutorPodsAllocator {
+
+  // A utility function to split the available slots among the specified consumers
+  def splitSlots[T](consumers: Seq[T], slots: Int): Seq[(T, Int)] = {
+    val d = slots / consumers.size
+    val r = slots % consumers.size
+    consumers.take(r).map((_, d + 1)) ++ consumers.takeRight(consumers.size - r).map((_, d))
+  }
+
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index a54f1a1..5b33da6 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -117,6 +117,83 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
   }
 
+  test("SPARK-36052: test splitSlots") {
+    val seq1 = Seq("a")
+    assert(ExecutorPodsAllocator.splitSlots(seq1, 0) === Seq(("a", 0)))
+    assert(ExecutorPodsAllocator.splitSlots(seq1, 1) === Seq(("a", 1)))
+    assert(ExecutorPodsAllocator.splitSlots(seq1, 2) === Seq(("a", 2)))
+
+    val seq2 = Seq("a", "b", "c")
+    assert(ExecutorPodsAllocator.splitSlots(seq2, 0) === Seq(("a", 0), ("b", 0), ("c", 0)))
+    assert(ExecutorPodsAllocator.splitSlots(seq2, 1) === Seq(("a", 1), ("b", 0), ("c", 0)))
+    assert(ExecutorPodsAllocator.splitSlots(seq2, 2) === Seq(("a", 1), ("b", 1), ("c", 0)))
+    assert(ExecutorPodsAllocator.splitSlots(seq2, 3) === Seq(("a", 1), ("b", 1), ("c", 1)))
+    assert(ExecutorPodsAllocator.splitSlots(seq2, 4) === Seq(("a", 2), ("b", 1), ("c", 1)))
+  }
+
+  test("SPARK-36052: pending pod limit with multiple resource profiles") {
+    when(podOperations
+      .withField("status.phase", "Pending"))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
+      .thenReturn(podOperations)
+
+    val startTime = Instant.now.toEpochMilli
+    waitForExecutorPodsClock.setTime(startTime)
+
+    val rpb = new ResourceProfileBuilder()
+    val ereq = new ExecutorResourceRequests()
+    val treq = new TaskResourceRequests()
+    ereq.cores(4).memory("2g")
+    treq.cpus(2)
+    rpb.require(ereq).require(treq)
+    val rp = rpb.build()
+
+    val confWithLowMaxPendingPods = conf.clone.set(KUBERNETES_MAX_PENDING_PODS.key, "3")
+    podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithLowMaxPendingPods, secMgr,
+      executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
+    podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
+
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 3))
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
+    verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id))
+    verify(podOperations).create(podWithAttachedContainerForId(2, defaultProfile.id))
+    verify(podOperations).create(podWithAttachedContainerForId(3, rp.id))
+
+    // Mark executor 2 and 3 as pending, leave 1 as newly created but this does not free up
+    // any pending pod slot so no new pod is requested
+    snapshotsStore.updatePod(pendingExecutor(2, defaultProfile.id))
+    snapshotsStore.updatePod(pendingExecutor(3, rp.id))
+    snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
+    verify(podOperations, times(3)).create(any())
+    verify(podOperations, never()).delete()
+
+    // Downscaling for defaultProfile resource ID with 1 executor to make one free slot
+    // for pendings pods
+    waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 3))
+    snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
+    verify(podOperations).create(podWithAttachedContainerForId(4, rp.id))
+    verify(podOperations, times(1)).delete()
+
+    // Make one pod running this way we have one more free slot for pending pods
+    snapshotsStore.updatePod(runningExecutor(3, rp.id))
+    snapshotsStore.updatePod(pendingExecutor(4, rp.id))
+    snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
+    verify(podOperations).create(podWithAttachedContainerForId(5, rp.id))
+    verify(podOperations, times(1)).delete()
+  }
+
   test("Initially request executors in batches. Do not request another batch if the" +
     " first has not finished.") {
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1)))

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org