You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2021/01/11 22:26:04 UTC

[spark] branch branch-3.1 updated: [SPARK-33711][K8S] Avoid race condition between POD lifecycle manager and scheduler backend

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 7da0164  [SPARK-33711][K8S] Avoid race condition between POD lifecycle manager and scheduler backend
7da0164 is described below

commit 7da0164824dc6d99d48dcddfcbaef987fe09765e
Author: “attilapiros” <pi...@gmail.com>
AuthorDate: Mon Jan 11 14:25:12 2021 -0800

    [SPARK-33711][K8S] Avoid race condition between POD lifecycle manager and scheduler backend
    
    ### What changes were proposed in this pull request?
    
    Missing POD detection is extended by timestamp (and time limit) based check to avoid wrongfully detection of missing POD detection.
    
    The two new timestamps:
    - `fullSnapshotTs` is introduced for the `ExecutorPodsSnapshot` which only updated by the pod polling snapshot source
    - `registrationTs` is introduced for the `ExecutorData` and it is initialized at the executor registration at the scheduler backend
    
    Moreover a new config `spark.kubernetes.executor.missingPodDetectDelta` is used to specify the accepted delta between the two.
    
    ### Why are the changes needed?
    
    Watching a POD (`ExecutorPodsWatchSnapshotSource`) only inform about single POD changes. This could wrongfully lead to detecting of missing PODs (PODs known by scheduler backend but missing from POD snapshots) by the executor POD lifecycle manager.
    
    A key indicator of this error is seeing this log message:
    
    > "The executor with ID [some_id] was not found in the cluster but we didn't get a reason why. Marking the executor as failed. The executor may have been deleted but the driver missed the deletion event."
    
    So one of the problem is running the missing POD detection check even when a single POD is changed without having a full consistent snapshot about all the PODs (see `ExecutorPodsPollingSnapshotSource`).
    The other problem could be the race between the executor POD lifecycle manager and the scheduler backend: so even in case of a having a full snapshot the registration at the scheduler backend could precede the snapshot polling (and processing of those polled snapshots).
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. When the POD is missing then the reason message explaining the executor's exit is extended with both timestamps (the polling time and the executor registration time) and even the new config is mentioned.
    
    ### How was this patch tested?
    
    The existing unit tests are extended.
    
    Closes #30675 from attilapiros/SPARK-33711.
    
    Authored-by: “attilapiros” <pi...@gmail.com>
    Signed-off-by: Holden Karau <hk...@apple.com>
    (cherry picked from commit 6bd7a6200f8beaab1c68b2469df05870ea788d49)
    Signed-off-by: Holden Karau <hk...@apple.com>
---
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  6 ++-
 .../spark/scheduler/cluster/ExecutorData.scala     |  4 +-
 .../scala/org/apache/spark/deploy/k8s/Config.scala | 11 +++++
 .../cluster/k8s/ExecutorPodsAllocator.scala        |  2 +-
 .../cluster/k8s/ExecutorPodsLifecycleManager.scala | 50 +++++++++++-----------
 .../cluster/k8s/ExecutorPodsSnapshot.scala         | 12 +++---
 .../k8s/ExecutorPodsSnapshotsStoreImpl.scala       |  8 +++-
 .../DeterministicExecutorPodsSnapshotsStore.scala  |  8 ++--
 .../k8s/ExecutorPodsLifecycleManagerSuite.scala    | 22 +++++++---
 .../cluster/k8s/ExecutorPodsSnapshotSuite.scala    |  4 +-
 .../k8s/ExecutorPodsSnapshotsStoreSuite.scala      | 33 ++++++++------
 11 files changed, 101 insertions(+), 59 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 2bd0b4c..ccb5eb1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -239,7 +239,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           }
           val data = new ExecutorData(executorRef, executorAddress, hostname,
             0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,
-            resourcesInfo, resourceProfileId)
+            resourcesInfo, resourceProfileId, registrationTs = System.currentTimeMillis())
           // This must be synchronized because variables mutated
           // in this block are read when requesting executors
           CoarseGrainedSchedulerBackend.this.synchronized {
@@ -629,6 +629,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     executorDataMap.keySet.toSeq
   }
 
+  def getExecutorsWithRegistrationTs(): Map[String, Long] = synchronized {
+    executorDataMap.mapValues(v => v.registrationTs).toMap
+  }
+
   override def isExecutorActive(id: String): Boolean = synchronized {
     executorDataMap.contains(id) &&
       !executorsPendingToRemove.contains(id) &&
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index 0621461..86b44e8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -30,6 +30,7 @@ import org.apache.spark.scheduler.ExecutorResourceInfo
  * @param totalCores The total number of cores available to the executor
  * @param resourcesInfo The information of the currently available resources on the executor
  * @param resourceProfileId The id of the ResourceProfile being used by this executor
+ * @param registrationTs The registration timestamp of this executor
  */
 private[cluster] class ExecutorData(
     val executorEndpoint: RpcEndpointRef,
@@ -40,6 +41,7 @@ private[cluster] class ExecutorData(
     override val logUrlMap: Map[String, String],
     override val attributes: Map[String, String],
     override val resourcesInfo: Map[String, ExecutorResourceInfo],
-    override val resourceProfileId: Int
+    override val resourceProfileId: Int,
+    val registrationTs: Long
 ) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes,
   resourcesInfo, resourceProfileId)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 65ac3c9..487dbf5 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -449,6 +449,17 @@ private[spark] object Config extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val KUBERNETES_EXECUTOR_MISSING_POD_DETECT_DELTA =
+    ConfigBuilder("spark.kubernetes.executor.missingPodDetectDelta")
+      .doc("When a registered executor's POD is missing from the Kubernetes API server's polled " +
+        "list of PODs then this delta time is taken as the accepted time difference between the " +
+        "registration time and the time of the polling. After this time the POD is considered " +
+        "missing from the cluster and the executor will be removed.")
+      .version("3.1.1")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .checkValue(delay => delay > 0, "delay must be a positive time value")
+      .createWithDefaultString("30s")
+
   val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
   val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
   val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = "spark.kubernetes.driver.service.annotation."
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 863cb28..f4cd2d0 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -86,7 +86,7 @@ private[spark] class ExecutorPodsAllocator(
 
   private val hasPendingPods = new AtomicBoolean()
 
-  private var lastSnapshot = ExecutorPodsSnapshot(Nil)
+  private var lastSnapshot = ExecutorPodsSnapshot()
 
   // Executors that have been deleted by this allocator but not yet detected as deleted in
   // a snapshot from the API server. This is used to deny registration from these executors
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 5d91e52..593d6f6 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -43,6 +43,9 @@ private[spark] class ExecutorPodsLifecycleManager(
   import ExecutorPodsLifecycleManager._
 
   private lazy val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)
+  private lazy val missingPodDetectDelta = conf.get(KUBERNETES_EXECUTOR_MISSING_POD_DETECT_DELTA)
+
+  private var lastFullSnapshotTs: Long = 0
 
   // Keep track of which pods are inactive to avoid contacting the API server multiple times.
   // This set is cleaned up when a snapshot containing the updated pod is processed.
@@ -109,33 +112,33 @@ private[spark] class ExecutorPodsLifecycleManager(
     // Reconcile the case where Spark claims to know about an executor but the corresponding pod
     // is missing from the cluster. This would occur if we miss a deletion event and the pod
     // transitions immediately from running to absent. We only need to check against the latest
-    // snapshot for this, and we don't do this for executors in the deleted executors cache or
-    // that we just removed in this round.
-    val lostExecutors = if (snapshots.nonEmpty) {
-      schedulerBackend.getExecutorIds().map(_.toLong).toSet --
+    // fresh full snapshot (coming from ExecutorPodsPollingSnapshotSource) for this, and we don't
+    // do this for executors in the deleted executors cache or that we just removed in this round.
+    if (snapshots.nonEmpty && lastFullSnapshotTs != snapshots.last.fullSnapshotTs) {
+      lastFullSnapshotTs = snapshots.last.fullSnapshotTs
+      val lostExecutorsWithRegistrationTs =
+        schedulerBackend.getExecutorsWithRegistrationTs().map(t => (t._1.toLong, t._2)) --
         snapshots.last.executorPods.keySet -- execIdsRemovedInThisRound
-    } else {
-      Nil
-    }
 
-    lostExecutors.foreach { lostId =>
-      if (removedExecutorsCache.getIfPresent(lostId) == null) {
-        val exitReasonMessage = s"The executor with ID $lostId was not found in the" +
-          s" cluster but we didn't get a reason why. Marking the executor as failed. The" +
-          s" executor may have been deleted but the driver missed the deletion event."
-        logDebug(exitReasonMessage)
-        val exitReason = ExecutorExited(
-          UNKNOWN_EXIT_CODE,
-          exitCausedByApp = false,
-          exitReasonMessage)
-        schedulerBackend.doRemoveExecutor(lostId.toString, exitReason)
+      lostExecutorsWithRegistrationTs.foreach { case (lostExecId, lostExecRegistrationTs) =>
+        if (removedExecutorsCache.getIfPresent(lostExecId) == null &&
+            lastFullSnapshotTs - lostExecRegistrationTs > missingPodDetectDelta) {
+          val exitReasonMessage = s"The executor with ID $lostExecId (registered at " +
+            s"$lostExecRegistrationTs ms) was not found in the cluster at the polling time " +
+            s"($lastFullSnapshotTs ms) which is after the accepted detect delta time " +
+            s"($missingPodDetectDelta ms) configured by " +
+            s"`${KUBERNETES_EXECUTOR_MISSING_POD_DETECT_DELTA.key}`. " +
+            "The executor may have been deleted but the driver missed the deletion event. " +
+            "Marking this executor as failed."
+          logDebug(exitReasonMessage)
+          val exitReason = ExecutorExited(
+            UNKNOWN_EXIT_CODE,
+            exitCausedByApp = false,
+            exitReasonMessage)
+          schedulerBackend.doRemoveExecutor(lostExecId.toString, exitReason)
+        }
       }
     }
-
-    if (lostExecutors.nonEmpty) {
-      logDebug(s"Removed executors with ids ${lostExecutors.mkString(",")}" +
-        s" from Spark that were either found to be deleted or non-existent in the cluster.")
-    }
   }
 
   private def onFinalNonDeletedState(
@@ -238,4 +241,3 @@ private[spark] class ExecutorPodsLifecycleManager(
 private object ExecutorPodsLifecycleManager {
   val UNKNOWN_EXIT_CODE = -1
 }
-
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
index cb4d881..76c17cf 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
@@ -29,13 +29,15 @@ import org.apache.spark.internal.Logging
 /**
  * An immutable view of the current executor pods that are running in the cluster.
  */
-private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, ExecutorPodState]) {
+private[spark] case class ExecutorPodsSnapshot(
+    executorPods: Map[Long, ExecutorPodState],
+    fullSnapshotTs: Long) {
 
   import ExecutorPodsSnapshot._
 
   def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot = {
     val newExecutorPods = executorPods ++ toStatesByExecutorId(Seq(updatedPod))
-    new ExecutorPodsSnapshot(newExecutorPods)
+    new ExecutorPodsSnapshot(newExecutorPods, fullSnapshotTs)
   }
 }
 
@@ -43,11 +45,11 @@ object ExecutorPodsSnapshot extends Logging {
   private var shouldCheckAllContainers: Boolean = _
   private var sparkContainerName: String = DEFAULT_EXECUTOR_CONTAINER_NAME
 
-  def apply(executorPods: Seq[Pod]): ExecutorPodsSnapshot = {
-    ExecutorPodsSnapshot(toStatesByExecutorId(executorPods))
+  def apply(executorPods: Seq[Pod], fullSnapshotTs: Long): ExecutorPodsSnapshot = {
+    ExecutorPodsSnapshot(toStatesByExecutorId(executorPods), fullSnapshotTs)
   }
 
-  def apply(): ExecutorPodsSnapshot = ExecutorPodsSnapshot(Map.empty[Long, ExecutorPodState])
+  def apply(): ExecutorPodsSnapshot = ExecutorPodsSnapshot(Map.empty[Long, ExecutorPodState], 0)
 
   def setShouldCheckAllContainers(watchAllContainers: Boolean): Unit = {
     shouldCheckAllContainers = watchAllContainers
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
index 22764d9..df8769b 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
@@ -28,6 +28,8 @@ import scala.util.control.NonFatal
 import io.fabric8.kubernetes.api.model.Pod
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -54,7 +56,9 @@ import org.apache.spark.util.ThreadUtils
  * <br>
  * The subscriber notification callback is guaranteed to be called from a single thread at a time.
  */
-private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: ScheduledExecutorService)
+private[spark] class ExecutorPodsSnapshotsStoreImpl(
+    subscribersExecutor: ScheduledExecutorService,
+    clock: Clock = new SystemClock)
   extends ExecutorPodsSnapshotsStore with Logging {
 
   private val SNAPSHOT_LOCK = new Object()
@@ -99,7 +103,7 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
   }
 
   override def replaceSnapshot(newSnapshot: Seq[Pod]): Unit = SNAPSHOT_LOCK.synchronized {
-    currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
+    currentSnapshot = ExecutorPodsSnapshot(newSnapshot, clock.getTimeMillis())
     addCurrentSnapshotToSubscribers()
   }
 
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
index c30efde..dbe2f29 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
@@ -20,13 +20,15 @@ import io.fabric8.kubernetes.api.model.Pod
 import scala.collection.mutable
 
 import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
-
+import org.apache.spark.util.ManualClock
 
 class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore {
 
   ExecutorPodsSnapshot.setShouldCheckAllContainers(false)
   ExecutorPodsSnapshot.setSparkContainerName(DEFAULT_EXECUTOR_CONTAINER_NAME)
 
+  val clock = new ManualClock()
+
   private val snapshotsBuffer = mutable.Buffer.empty[ExecutorPodsSnapshot]
   private val subscribers = mutable.Buffer.empty[Seq[ExecutorPodsSnapshot] => Unit]
 
@@ -51,7 +53,7 @@ class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore
   }
 
   override def replaceSnapshot(newSnapshot: Seq[Pod]): Unit = {
-    currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
+    currentSnapshot = ExecutorPodsSnapshot(newSnapshot, clock.getTimeMillis())
     snapshotsBuffer += currentSnapshot
   }
 
@@ -60,7 +62,7 @@ class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore
       case (_, PodDeleted(_)) => false
       case _ => true
     }
-    currentSnapshot = ExecutorPodsSnapshot(nonDeleted)
+    currentSnapshot = ExecutorPodsSnapshot(nonDeleted, clock.getTimeMillis())
     snapshotsBuffer += currentSnapshot
   }
 }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index fb6f3ac..762c452 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -57,7 +57,7 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
     val removedExecutorsCache = CacheBuilder.newBuilder().build[java.lang.Long, java.lang.Long]
     snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
     namedExecutorPods = mutable.Map.empty[String, PodResource[Pod, DoneablePod]]
-    when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty[String])
+    when(schedulerBackend.getExecutorsWithRegistrationTs()).thenReturn(Map.empty[String, Long])
     when(kubernetesClient.pods()).thenReturn(podOperations)
     when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
     eventHandlerUnderTest = new ExecutorPodsLifecycleManager(
@@ -92,13 +92,23 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
 
   test("When the scheduler backend lists executor ids that aren't present in the cluster," +
     " remove those executors from Spark.") {
-    when(schedulerBackend.getExecutorIds()).thenReturn(Seq("1"))
-    val msg = s"The executor with ID 1 was not found in the cluster but we didn't" +
-      s" get a reason why. Marking the executor as failed. The executor may have been" +
-      s" deleted but the driver missed the deletion event."
-    val expectedLossReason = ExecutorExited(-1, exitCausedByApp = false, msg)
+      when(schedulerBackend.getExecutorsWithRegistrationTs()).thenReturn(Map("1" -> 7L))
+    val missingPodDelta =
+      eventHandlerUnderTest.conf.get(Config.KUBERNETES_EXECUTOR_MISSING_POD_DETECT_DELTA)
+    snapshotsStore.clock.advance(missingPodDelta + 7)
     snapshotsStore.replaceSnapshot(Seq.empty[Pod])
     snapshotsStore.notifySubscribers()
+    verify(schedulerBackend, never()).doRemoveExecutor(any(), any())
+
+    // 1 more millisecond and the accepted delta is over so the missing POD will be detected
+    snapshotsStore.clock.advance(1)
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    snapshotsStore.notifySubscribers()
+    val msg = "The executor with ID 1 (registered at 7 ms) was not found in the cluster at " +
+      "the polling time (30008 ms) which is after the accepted detect delta time (30000 ms) " +
+      "configured by `spark.kubernetes.executor.missingPodDetectDelta`. The executor may have " +
+      "been deleted but the driver missed the deletion event. Marking this executor as failed."
+    val expectedLossReason = ExecutorExited(-1, exitCausedByApp = false, msg)
     verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
   }
 
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
index 8d285ab..5e66726 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
@@ -27,7 +27,7 @@ class ExecutorPodsSnapshotSuite extends SparkFunSuite {
     (pod, state(pod))
 
   def doTest(testCases: Seq[(Pod, ExecutorPodState)]): Unit = {
-    val snapshot = ExecutorPodsSnapshot(testCases.map(_._1))
+    val snapshot = ExecutorPodsSnapshot(testCases.map(_._1), 0)
     for (((_, state), i) <- testCases.zipWithIndex) {
       assertResult(state.getClass.getName, s"executor ID $i") {
         snapshot.executorPods(i).getClass.getName
@@ -70,7 +70,7 @@ class ExecutorPodsSnapshotSuite extends SparkFunSuite {
     val originalPods = Seq(
       pendingExecutor(0),
       runningExecutor(1))
-    val originalSnapshot = ExecutorPodsSnapshot(originalPods)
+    val originalSnapshot = ExecutorPodsSnapshot(originalPods, 0)
     val snapshotWithUpdatedPod = originalSnapshot.withUpdate(succeededExecutor(1))
     assert(snapshotWithUpdatedPod.executorPods ===
       Map(
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
index 614c198..b4240bb 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
@@ -26,15 +26,18 @@ import scala.collection.mutable
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ManualClock
 
 class ExecutorPodsSnapshotsStoreSuite extends SparkFunSuite with BeforeAndAfter {
 
   private var eventBufferScheduler: DeterministicScheduler = _
   private var eventQueueUnderTest: ExecutorPodsSnapshotsStoreImpl = _
+  private var clock: ManualClock = _
 
   before {
     eventBufferScheduler = new DeterministicScheduler()
-    eventQueueUnderTest = new ExecutorPodsSnapshotsStoreImpl(eventBufferScheduler)
+    clock = new ManualClock()
+    eventQueueUnderTest = new ExecutorPodsSnapshotsStoreImpl(eventBufferScheduler, clock)
     ExecutorPodsSnapshot.setShouldCheckAllContainers(false)
   }
 
@@ -52,6 +55,7 @@ class ExecutorPodsSnapshotsStoreSuite extends SparkFunSuite with BeforeAndAfter
     assert(receivedSnapshots1 === Seq(ExecutorPodsSnapshot()))
     assert(receivedSnapshots2 === Seq(ExecutorPodsSnapshot()))
 
+    clock.advance(100)
     pushPodWithIndex(1)
     // Force time to move forward so that the buffer is emitted, scheduling the
     // processing task on the subscription executor...
@@ -60,7 +64,7 @@ class ExecutorPodsSnapshotsStoreSuite extends SparkFunSuite with BeforeAndAfter
 
     assert(receivedSnapshots1 === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0)))
     assert(receivedSnapshots2 === Seq(ExecutorPodsSnapshot()))
 
     eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
@@ -68,29 +72,29 @@ class ExecutorPodsSnapshotsStoreSuite extends SparkFunSuite with BeforeAndAfter
     // Don't repeat snapshots
     assert(receivedSnapshots1 === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0)))
     assert(receivedSnapshots2 === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0)))
     pushPodWithIndex(2)
     pushPodWithIndex(3)
     eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
 
     assert(receivedSnapshots1 === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1))),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2))),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), podWithIndex(3)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2)), 0),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), podWithIndex(3)), 0)))
     assert(receivedSnapshots2 === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0)))
 
     eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
     assert(receivedSnapshots1 === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1))),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2))),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), podWithIndex(3)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2)), 0),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), podWithIndex(3)), 0)))
     assert(receivedSnapshots1 === receivedSnapshots2)
   }
 
@@ -113,13 +117,14 @@ class ExecutorPodsSnapshotsStoreSuite extends SparkFunSuite with BeforeAndAfter
     eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
     assert(receivedSnapshots === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0)))
+    clock.advance(100)
     eventQueueUnderTest.replaceSnapshot(Seq(podWithIndex(2)))
     eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
     assert(receivedSnapshots === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1))),
-      ExecutorPodsSnapshot(Seq(podWithIndex(2)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0),
+      ExecutorPodsSnapshot(Seq(podWithIndex(2)), 100)))
   }
 
   private def pushPodWithIndex(index: Int): Unit =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org