You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2021/02/17 21:45:01 UTC

[spark] branch master updated: [SPARK-33763] Add metrics for better tracking of dynamic allocation

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 76e5d75  [SPARK-33763] Add metrics for better tracking of dynamic allocation
76e5d75 is described below

commit 76e5d75e369609b96248a34bc6015ec61936e652
Author: “attilapiros” <pi...@gmail.com>
AuthorDate: Wed Feb 17 13:44:36 2021 -0800

    [SPARK-33763] Add metrics for better tracking of dynamic allocation
    
    ### What changes were proposed in this pull request?
    
    This PR adds the following metrics to track executor remove reasons during dynamic allocation:
    -  `numberExecutorsGracefullyDecommissioned`: number of executors which reached the finished decommissioning state and shut itself down cleanly
    - `numberExecutorsDecommissionUnfinished`: executors which requested to decommission but they stopped without reaching the finished decommissioning state
    - `numberExecutorsKilledByDriver`: executors killed by the driver (requested to stop)
    -  `numberExecutorsExitedUnexpectedly`: executors exited without driver request
    
    ### Why are the changes needed?
    
    For supporting monitoring of dynamic allocation better with these metrics.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. The new metrics will be available for monitoring.
    
    ### How was this patch tested?
    
    With unit and integration tests.
    
    Finally manually checked the new metrics in jconsole:
    <img width="1054" alt="jmx" src="https://user-images.githubusercontent.com/2017933/107458686-de8adf00-6b54-11eb-86f7-41faf2fb638f.png">
    
    Closes #31450 from attilapiros/SPARK-33763-final.
    
    Authored-by: “attilapiros” <pi...@gmail.com>
    Signed-off-by: Holden Karau <hk...@apple.com>
---
 .../apache/spark/ExecutorAllocationManager.scala   | 80 +++++++++++++---------
 .../executor/CoarseGrainedExecutorBackend.scala    |  6 +-
 .../spark/scheduler/ExecutorLossReason.scala       |  4 ++
 .../spark/scheduler/dynalloc/ExecutorMonitor.scala | 20 +++++-
 .../spark/ExecutorAllocationManagerSuite.scala     | 54 ++++++++++++++-
 .../scheduler/dynalloc/ExecutorMonitorSuite.scala  | 22 ++++--
 .../k8s/integrationtest/DecommissionSuite.scala    | 11 ++-
 7 files changed, 150 insertions(+), 47 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index bdb768e..822a0a5 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.{ControlThrowable, NonFatal}
 
-import com.codahale.metrics.{Gauge, MetricRegistry}
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
 
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
@@ -135,14 +135,14 @@ private[spark] class ExecutorAllocationManager(
   validateSettings()
 
   // Number of executors to add for each ResourceProfile in the next round
-  private val numExecutorsToAddPerResourceProfileId = new mutable.HashMap[Int, Int]
+  private[spark] val numExecutorsToAddPerResourceProfileId = new mutable.HashMap[Int, Int]
   numExecutorsToAddPerResourceProfileId(defaultProfileId) = 1
 
   // The desired number of executors at this moment in time. If all our executors were to die, this
   // is the number of executors we would immediately want from the cluster manager.
   // Note every profile will be allowed to have initial number,
   // we may want to make this configurable per Profile in the future
-  private val numExecutorsTargetPerResourceProfileId = new mutable.HashMap[Int, Int]
+  private[spark] val numExecutorsTargetPerResourceProfileId = new mutable.HashMap[Int, Int]
   numExecutorsTargetPerResourceProfileId(defaultProfileId) = initialNumExecutors
 
   // A timestamp of when an addition should be triggered, or NOT_SET if it is not set
@@ -155,14 +155,15 @@ private[spark] class ExecutorAllocationManager(
   // Listener for Spark events that impact the allocation policy
   val listener = new ExecutorAllocationListener
 
-  val executorMonitor = new ExecutorMonitor(conf, client, listenerBus, clock)
-
   // Executor that handles the scheduling task.
   private val executor =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
 
   // Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
-  val executorAllocationManagerSource = new ExecutorAllocationManagerSource
+  val executorAllocationManagerSource = new ExecutorAllocationManagerSource(this)
+
+  val executorMonitor =
+    new ExecutorMonitor(conf, client, listenerBus, clock, executorAllocationManagerSource)
 
   // Whether we are still waiting for the initial set of executors to be allocated.
   // While this is true, we will not cancel outstanding executor requests. This is
@@ -288,7 +289,7 @@ private[spark] class ExecutorAllocationManager(
    * The maximum number of executors, for the ResourceProfile id passed in, that we would need
    * under the current load to satisfy all running and pending tasks, rounded up.
    */
-  private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
+  private[spark] def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
     val pending = listener.totalPendingTasksPerResourceProfile(rpId)
     val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId)
     val unschedulableTaskSets = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId)
@@ -967,34 +968,47 @@ private[spark] class ExecutorAllocationManager(
         rplocalityToCount.map { case (k, v) => (k, v.toMap)}.toMap
     }
   }
+}
 
-  /**
-   * Metric source for ExecutorAllocationManager to expose its internal executor allocation
-   * status to MetricsSystem.
-   * Note: These metrics heavily rely on the internal implementation of
-   * ExecutorAllocationManager, metrics or value of metrics will be changed when internal
-   * implementation is changed, so these metrics are not stable across Spark version.
-   */
-  private[spark] class ExecutorAllocationManagerSource extends Source {
-    val sourceName = "ExecutorAllocationManager"
-    val metricRegistry = new MetricRegistry()
-
-    private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = {
-      metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] {
-        override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) }
-      })
-    }
-
-    // The metrics are going to return the sum for all the different ResourceProfiles.
-    registerGauge("numberExecutorsToAdd",
-      numExecutorsToAddPerResourceProfileId.values.sum, 0)
-    registerGauge("numberExecutorsPendingToRemove", executorMonitor.pendingRemovalCount, 0)
-    registerGauge("numberAllExecutors", executorMonitor.executorCount, 0)
-    registerGauge("numberTargetExecutors",
-      numExecutorsTargetPerResourceProfileId.values.sum, 0)
-    registerGauge("numberMaxNeededExecutors", numExecutorsTargetPerResourceProfileId.keys
-        .map(maxNumExecutorsNeededPerResourceProfile(_)).sum, 0)
+/**
+ * Metric source for ExecutorAllocationManager to expose its internal executor allocation
+ * status to MetricsSystem.
+ * Note: These metrics heavily rely on the internal implementation of
+ * ExecutorAllocationManager, metrics or value of metrics will be changed when internal
+ * implementation is changed, so these metrics are not stable across Spark version.
+ */
+private[spark] class ExecutorAllocationManagerSource(
+    executorAllocationManager: ExecutorAllocationManager) extends Source {
+  val sourceName = "ExecutorAllocationManager"
+  val metricRegistry = new MetricRegistry()
+
+  private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = {
+    metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] {
+      override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) }
+    })
   }
+
+  private def getCounter(name: String): Counter = {
+    metricRegistry.counter(MetricRegistry.name("executors", name))
+  }
+
+  val gracefullyDecommissioned: Counter = getCounter("numberExecutorsGracefullyDecommissioned")
+  val decommissionUnfinished: Counter = getCounter("numberExecutorsDecommissionUnfinished")
+  val driverKilled: Counter = getCounter("numberExecutorsKilledByDriver")
+  val exitedUnexpectedly: Counter = getCounter("numberExecutorsExitedUnexpectedly")
+
+  // The metrics are going to return the sum for all the different ResourceProfiles.
+  registerGauge("numberExecutorsToAdd",
+    executorAllocationManager.numExecutorsToAddPerResourceProfileId.values.sum, 0)
+  registerGauge("numberExecutorsPendingToRemove",
+    executorAllocationManager.executorMonitor.pendingRemovalCount, 0)
+  registerGauge("numberAllExecutors",
+    executorAllocationManager.executorMonitor.executorCount, 0)
+  registerGauge("numberTargetExecutors",
+    executorAllocationManager.numExecutorsTargetPerResourceProfileId.values.sum, 0)
+  registerGauge("numberMaxNeededExecutors",
+    executorAllocationManager.numExecutorsTargetPerResourceProfileId.keys
+      .map(executorAllocationManager.maxNumExecutorsNeededPerResourceProfile(_)).sum, 0)
 }
 
 private object ExecutorAllocationManager {
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index e1d3009..43c122a 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -39,7 +39,7 @@ import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.resource.ResourceProfile._
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.rpc._
-import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
+import org.apache.spark.scheduler.{ExecutorLossMessage, ExecutorLossReason, TaskDescription}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils}
@@ -322,13 +322,13 @@ private[spark] class CoarseGrainedExecutorBackend(
                 // since the start of computing it.
                 if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {
                   logInfo("No running tasks, all blocks migrated, stopping.")
-                  exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+                  exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)
                 } else {
                   logInfo("All blocks not yet migrated.")
                 }
               } else {
                 logInfo("No running tasks, no block migration configured, stopping.")
-                exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+                exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)
               }
             } else {
               logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 2644d0a..f333c01 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -40,6 +40,10 @@ private[spark] object ExecutorExited {
   }
 }
 
+private[spark] object ExecutorLossMessage {
+  val decommissionFinished = "Finished decommissioning"
+}
+
 private[spark] object ExecutorKilled extends ExecutorLossReason("Executor killed by driver.")
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index 8c84b318..cecd4b0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -38,7 +38,9 @@ private[spark] class ExecutorMonitor(
     conf: SparkConf,
     client: ExecutorAllocationClient,
     listenerBus: LiveListenerBus,
-    clock: Clock) extends SparkListener with CleanerListener with Logging {
+    clock: Clock,
+    metrics: ExecutorAllocationManagerSource = null)
+  extends SparkListener with CleanerListener with Logging {
 
   private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(
     conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
@@ -352,6 +354,22 @@ private[spark] class ExecutorMonitor(
     val removed = executors.remove(event.executorId)
     if (removed != null) {
       decrementExecResourceProfileCount(removed.resourceProfileId)
+      if (removed.decommissioning) {
+        if (event.reason == ExecutorLossMessage.decommissionFinished) {
+          metrics.gracefullyDecommissioned.inc()
+        } else {
+          metrics.decommissionUnfinished.inc()
+        }
+      } else if (removed.pendingRemoval) {
+        metrics.driverKilled.inc()
+      } else {
+        metrics.exitedUnexpectedly.inc()
+      }
+      logInfo(s"Executor ${event.executorId} is removed. Remove reason statistics: (" +
+        s"gracefully decommissioned: ${metrics.gracefullyDecommissioned.getCount()}, " +
+        s"decommision unfinished: ${metrics.decommissionUnfinished.getCount()}, " +
+        s"driver killed: ${metrics.driverKilled.getCount()}, " +
+        s"unexpectedly exited: ${metrics.exitedUnexpectedly.getCount()}).")
       if (!removed.pendingRemoval || !removed.decommissioning) {
         nextTimeout.set(Long.MinValue)
       }
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 5ae596b..2fb5140 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -936,6 +936,53 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     assert(executorsPendingToRemove(manager).isEmpty)
   }
 
+  test("SPARK-33763: metrics to track dynamic allocation (decommissionEnabled=false)") {
+    val manager = createManager(createConf(3, 5, 3))
+    (1 to 5).map(_.toString).foreach { id => onExecutorAddedDefaultProfile(manager, id) }
+
+    assert(executorsPendingToRemove(manager).isEmpty)
+    assert(removeExecutorsDefaultProfile(manager, Seq("1", "2")) === Seq("1", "2"))
+    assert(executorsPendingToRemove(manager).contains("1"))
+    assert(executorsPendingToRemove(manager).contains("2"))
+
+    onExecutorRemoved(manager, "1", "driver requested exit")
+    assert(manager.executorAllocationManagerSource.driverKilled.getCount() === 1)
+    assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 0)
+
+    onExecutorRemoved(manager, "2", "another driver requested exit")
+    assert(manager.executorAllocationManagerSource.driverKilled.getCount() === 2)
+    assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 0)
+
+    onExecutorRemoved(manager, "3", "this will be an unexpected exit")
+    assert(manager.executorAllocationManagerSource.driverKilled.getCount() === 2)
+    assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 1)
+  }
+
+  test("SPARK-33763: metrics to track dynamic allocation (decommissionEnabled = true)") {
+    val manager = createManager(createConf(3, 5, 3, decommissioningEnabled = true))
+    (1 to 5).map(_.toString).foreach { id => onExecutorAddedDefaultProfile(manager, id) }
+
+    assert(executorsPendingToRemove(manager).isEmpty)
+    assert(removeExecutorsDefaultProfile(manager, Seq("1", "2")) === Seq("1", "2"))
+    assert(executorsDecommissioning(manager).contains("1"))
+    assert(executorsDecommissioning(manager).contains("2"))
+
+    onExecutorRemoved(manager, "1", ExecutorLossMessage.decommissionFinished)
+    assert(manager.executorAllocationManagerSource.gracefullyDecommissioned.getCount() === 1)
+    assert(manager.executorAllocationManagerSource.decommissionUnfinished.getCount() === 0)
+    assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 0)
+
+    onExecutorRemoved(manager, "2", "stopped before gracefully finished")
+    assert(manager.executorAllocationManagerSource.gracefullyDecommissioned.getCount() === 1)
+    assert(manager.executorAllocationManagerSource.decommissionUnfinished.getCount() === 1)
+    assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 0)
+
+    onExecutorRemoved(manager, "3", "this will be an unexpected exit")
+    assert(manager.executorAllocationManagerSource.gracefullyDecommissioned.getCount() === 1)
+    assert(manager.executorAllocationManagerSource.decommissionUnfinished.getCount() === 1)
+    assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 1)
+  }
+
   test("remove multiple executors") {
     val manager = createManager(createConf(5, 10, 5))
     (1 to 10).map(_.toString).foreach { id => onExecutorAddedDefaultProfile(manager, id) }
@@ -1701,8 +1748,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     post(SparkListenerExecutorAdded(0L, id, execInfo))
   }
 
-  private def onExecutorRemoved(manager: ExecutorAllocationManager, id: String): Unit = {
-    post(SparkListenerExecutorRemoved(0L, id, null))
+  private def onExecutorRemoved(
+      manager: ExecutorAllocationManager,
+      id: String,
+      reason: String = null): Unit = {
+    post(SparkListenerExecutorRemoved(0L, id, reason))
   }
 
   private def onExecutorBusy(manager: ExecutorAllocationManager, id: String): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
index 6d49479..69afdb5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable
 
+import com.codahale.metrics.Counter
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{doAnswer, mock, when}
 
@@ -57,6 +58,15 @@ class ExecutorMonitorSuite extends SparkFunSuite {
   // having to use mockito APIs directly in each test.
   private val knownExecs = mutable.HashSet[String]()
 
+  private def allocationManagerSource(): ExecutorAllocationManagerSource = {
+    val metricSource = mock(classOf[ExecutorAllocationManagerSource])
+    when(metricSource.driverKilled).thenReturn(new Counter)
+    when(metricSource.decommissionUnfinished).thenReturn(new Counter)
+    when(metricSource.gracefullyDecommissioned).thenReturn(new Counter)
+    when(metricSource.exitedUnexpectedly).thenReturn(new Counter)
+    metricSource
+  }
+
   override def beforeEach(): Unit = {
     super.beforeEach()
     knownExecs.clear()
@@ -65,7 +75,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     when(client.isExecutorActive(any())).thenAnswer { invocation =>
       knownExecs.contains(invocation.getArguments()(0).asInstanceOf[String])
     }
-    monitor = new ExecutorMonitor(conf, client, null, clock)
+    monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource())
   }
 
   test("basic executor timeout") {
@@ -231,7 +241,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     assert(monitor.timedOutExecutors(storageDeadline) ===  Seq("1"))
 
     conf.set(SHUFFLE_SERVICE_ENABLED, true).set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
-    monitor = new ExecutorMonitor(conf, client, null, clock)
+    monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource())
 
     monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
     monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.MEMORY_ONLY))
@@ -292,7 +302,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
   test("shuffle block tracking") {
     val bus = mockListenerBus()
     conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true).set(SHUFFLE_SERVICE_ENABLED, false)
-    monitor = new ExecutorMonitor(conf, client, bus, clock)
+    monitor = new ExecutorMonitor(conf, client, bus, clock, allocationManagerSource())
 
     // 3 jobs: 2 and 3 share a shuffle, 1 has a separate shuffle.
     val stage1 = stageInfo(1, shuffleId = 0)
@@ -360,7 +370,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
   test("SPARK-28839: Avoids NPE in context cleaner when shuffle service is on") {
     val bus = mockListenerBus()
     conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true).set(SHUFFLE_SERVICE_ENABLED, true)
-    monitor = new ExecutorMonitor(conf, client, bus, clock) {
+    monitor = new ExecutorMonitor(conf, client, bus, clock, allocationManagerSource()) {
       override def onOtherEvent(event: SparkListenerEvent): Unit = {
         throw new IllegalStateException("No event should be sent.")
       }
@@ -372,7 +382,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
   test("shuffle tracking with multiple executors and concurrent jobs") {
     val bus = mockListenerBus()
     conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true).set(SHUFFLE_SERVICE_ENABLED, false)
-    monitor = new ExecutorMonitor(conf, client, bus, clock)
+    monitor = new ExecutorMonitor(conf, client, bus, clock, allocationManagerSource())
 
     monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
     monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo))
@@ -417,7 +427,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
       .set(DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT, Long.MaxValue)
       .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
       .set(SHUFFLE_SERVICE_ENABLED, false)
-    monitor = new ExecutorMonitor(conf, client, null, clock)
+    monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource())
 
     // Generate events that will make executor 1 be idle, while still holding shuffle data.
     // The executor should not be eligible for removal since the timeout is basically "infinite".
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index 9291861..75c27f6 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -16,6 +16,10 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest
 
+import org.scalatest.concurrent.PatienceConfiguration
+import org.scalatest.time.Minutes
+import org.scalatest.time.Span
+
 import org.apache.spark.internal.config
 
 private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
@@ -104,13 +108,15 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
       mainClass = "",
       expectedDriverLogOnCompletion = Seq(
         "Finished waiting, stopping Spark",
-        "Decommission executors"),
+        "Decommission executors",
+        "Remove reason statistics: (gracefully decommissioned: 1, decommision unfinished: 0, " +
+          "driver killed: 0, unexpectedly exited: 0)."),
       appArgs = Array.empty[String],
       driverPodChecker = doBasicDriverPyPodCheck,
       executorPodChecker = doBasicExecutorPyPodCheck,
       isJVM = false,
       pyFiles = None,
-      executorPatience = None,
+      executorPatience = Some(None, Some(DECOMMISSIONING_FINISHED_TIMEOUT)),
       decommissioningTest = false)
   }
 
@@ -151,4 +157,5 @@ private[spark] object DecommissionSuite {
   val PYSPARK_DECOMISSIONING: String = TEST_LOCAL_PYSPARK + "decommissioning.py"
   val PYSPARK_DECOMISSIONING_CLEANUP: String = TEST_LOCAL_PYSPARK + "decommissioning_cleanup.py"
   val PYSPARK_SCALE: String = TEST_LOCAL_PYSPARK + "autoscale.py"
+  val DECOMMISSIONING_FINISHED_TIMEOUT = PatienceConfiguration.Timeout(Span(4, Minutes))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org