You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2021/02/09 18:07:20 UTC

[spark] branch master updated: [SPARK-34334][K8S] Correctly identify timed out pending pod requests as excess request

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b2dc38b  [SPARK-34334][K8S] Correctly identify timed out pending pod requests as excess request
b2dc38b is described below

commit b2dc38b6546552cf3fcfdcd466d7d04d9aa3078c
Author: “attilapiros” <pi...@gmail.com>
AuthorDate: Tue Feb 9 10:06:55 2021 -0800

    [SPARK-34334][K8S] Correctly identify timed out pending pod requests as excess request
    
    ### What changes were proposed in this pull request?
    
    Fixing identification of timed-out pending pod requests as excess requests to delete when the excess is higher than the newly created timed out requests and there is some non-timed out newly created requests too.
    
    ### Why are the changes needed?
    
    After https://github.com/apache/spark/pull/29981 only timed out newly created requests and timed out pending requests are taken as excess request.
    
    But there is small bug when the excess is higher than the newly created timed out requests and there is some non-timed out newly created requests as well. Because all the newly created requests are counted as excess request when items are chosen from the timed out pod pending requests.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
     There is new unit test added: `SPARK-34334: correctly identify timed out pending pod requests as excess`.
    
    Closes #31445 from attilapiros/SPARK-34334.
    
    Authored-by: “attilapiros” <pi...@gmail.com>
    Signed-off-by: Holden Karau <hk...@apple.com>
---
 .../cluster/k8s/ExecutorPodsAllocator.scala        | 11 +++---
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 45 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 5 deletions(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index f4cd2d0..eb35de8 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -223,14 +223,15 @@ private[spark] class ExecutorPodsAllocator(
 
       if (knownPodCount > targetNum) {
         val excess = knownPodCount - targetNum
+        val newlyCreatedToDelete = newlyCreatedExecutorsForRpId
+          .filter { case (_, (_, createTime)) =>
+            currentTime - createTime > executorIdleTimeout
+          }.keys.take(excess).toList
         val knownPendingToDelete = currentPendingExecutors
           .filter(x => isExecutorIdleTimedOut(x._2, currentTime))
+          .take(excess - newlyCreatedToDelete.size)
           .map { case (id, _) => id }
-          .take(excess - newlyCreatedExecutorsForRpId.size)
-        val toDelete = newlyCreatedExecutorsForRpId
-          .filter { case (_, (_, createTime)) =>
-            currentTime - createTime > executorIdleTimeout
-          }.keys.take(excess).toList ++ knownPendingToDelete
+        val toDelete = newlyCreatedToDelete ++ knownPendingToDelete
 
         if (toDelete.nonEmpty) {
           logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index d4d8980..eaf5fd8 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -216,6 +216,51 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     assert(!podsAllocatorUnderTest.isDeleted("4"))
   }
 
+  test("SPARK-34334: correctly identify timed out pending pod requests as excess") {
+    when(podOperations
+      .withField("status.phase", "Pending"))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
+      .thenReturn(podOperations)
+
+    val startTime = Instant.now.toEpochMilli
+    waitForExecutorPodsClock.setTime(startTime)
+
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+    verify(podOperations).create(podWithAttachedContainerForId(1))
+    verify(podOperations).create(any())
+
+    snapshotsStore.updatePod(pendingExecutor(1))
+    snapshotsStore.notifySubscribers()
+
+    waitForExecutorPodsClock.advance(executorIdleTimeout)
+
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2))
+    snapshotsStore.notifySubscribers()
+    verify(podOperations).create(podWithAttachedContainerForId(2))
+
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+    snapshotsStore.notifySubscribers()
+
+    verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")
+    verify(podOperations, never()).delete()
+
+    waitForExecutorPodsClock.advance(executorIdleTimeout)
+    snapshotsStore.notifySubscribers()
+
+    // before SPARK-34334 this verify() call failed as the non-timed out newly created request
+    // decreased the number of requests taken from timed out pending pod requests
+    verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")
+    verify(podOperations).delete()
+  }
+
   test("SPARK-33099: Respect executor idle timeout configuration") {
     when(podOperations
       .withField("status.phase", "Pending"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org