You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2021/02/17 21:45:01 UTC
[spark] branch master updated: [SPARK-33763] Add metrics for better
tracking of dynamic allocation
This is an automated email from the ASF dual-hosted git repository.
holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 76e5d75 [SPARK-33763] Add metrics for better tracking of dynamic allocation
76e5d75 is described below
commit 76e5d75e369609b96248a34bc6015ec61936e652
Author: “attilapiros” <pi...@gmail.com>
AuthorDate: Wed Feb 17 13:44:36 2021 -0800
[SPARK-33763] Add metrics for better tracking of dynamic allocation
### What changes were proposed in this pull request?
This PR adds the following metrics to track executor remove reasons during dynamic allocation:
- `numberExecutorsGracefullyDecommissioned`: number of executors which reached the finished decommissioning state and shut itself down cleanly
- `numberExecutorsDecommissionUnfinished`: executors which requested to decommission but they stopped without reaching the finished decommissioning state
- `numberExecutorsKilledByDriver`: executors killed by the driver (requested to stop)
- `numberExecutorsExitedUnexpectedly`: executors exited without driver request
### Why are the changes needed?
For supporting monitoring of dynamic allocation better with these metrics.
### Does this PR introduce _any_ user-facing change?
Yes. The new metrics will be available for monitoring.
### How was this patch tested?
With unit and integration tests.
Finally manually checked the new metrics in jconsole:
<img width="1054" alt="jmx" src="https://user-images.githubusercontent.com/2017933/107458686-de8adf00-6b54-11eb-86f7-41faf2fb638f.png">
Closes #31450 from attilapiros/SPARK-33763-final.
Authored-by: “attilapiros” <pi...@gmail.com>
Signed-off-by: Holden Karau <hk...@apple.com>
---
.../apache/spark/ExecutorAllocationManager.scala | 80 +++++++++++++---------
.../executor/CoarseGrainedExecutorBackend.scala | 6 +-
.../spark/scheduler/ExecutorLossReason.scala | 4 ++
.../spark/scheduler/dynalloc/ExecutorMonitor.scala | 20 +++++-
.../spark/ExecutorAllocationManagerSuite.scala | 54 ++++++++++++++-
.../scheduler/dynalloc/ExecutorMonitorSuite.scala | 22 ++++--
.../k8s/integrationtest/DecommissionSuite.scala | 11 ++-
7 files changed, 150 insertions(+), 47 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index bdb768e..822a0a5 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.{ControlThrowable, NonFatal}
-import com.codahale.metrics.{Gauge, MetricRegistry}
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
@@ -135,14 +135,14 @@ private[spark] class ExecutorAllocationManager(
validateSettings()
// Number of executors to add for each ResourceProfile in the next round
- private val numExecutorsToAddPerResourceProfileId = new mutable.HashMap[Int, Int]
+ private[spark] val numExecutorsToAddPerResourceProfileId = new mutable.HashMap[Int, Int]
numExecutorsToAddPerResourceProfileId(defaultProfileId) = 1
// The desired number of executors at this moment in time. If all our executors were to die, this
// is the number of executors we would immediately want from the cluster manager.
// Note every profile will be allowed to have initial number,
// we may want to make this configurable per Profile in the future
- private val numExecutorsTargetPerResourceProfileId = new mutable.HashMap[Int, Int]
+ private[spark] val numExecutorsTargetPerResourceProfileId = new mutable.HashMap[Int, Int]
numExecutorsTargetPerResourceProfileId(defaultProfileId) = initialNumExecutors
// A timestamp of when an addition should be triggered, or NOT_SET if it is not set
@@ -155,14 +155,15 @@ private[spark] class ExecutorAllocationManager(
// Listener for Spark events that impact the allocation policy
val listener = new ExecutorAllocationListener
- val executorMonitor = new ExecutorMonitor(conf, client, listenerBus, clock)
-
// Executor that handles the scheduling task.
private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
// Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
- val executorAllocationManagerSource = new ExecutorAllocationManagerSource
+ val executorAllocationManagerSource = new ExecutorAllocationManagerSource(this)
+
+ val executorMonitor =
+ new ExecutorMonitor(conf, client, listenerBus, clock, executorAllocationManagerSource)
// Whether we are still waiting for the initial set of executors to be allocated.
// While this is true, we will not cancel outstanding executor requests. This is
@@ -288,7 +289,7 @@ private[spark] class ExecutorAllocationManager(
* The maximum number of executors, for the ResourceProfile id passed in, that we would need
* under the current load to satisfy all running and pending tasks, rounded up.
*/
- private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
+ private[spark] def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
val pending = listener.totalPendingTasksPerResourceProfile(rpId)
val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId)
val unschedulableTaskSets = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId)
@@ -967,34 +968,47 @@ private[spark] class ExecutorAllocationManager(
rplocalityToCount.map { case (k, v) => (k, v.toMap)}.toMap
}
}
+}
- /**
- * Metric source for ExecutorAllocationManager to expose its internal executor allocation
- * status to MetricsSystem.
- * Note: These metrics heavily rely on the internal implementation of
- * ExecutorAllocationManager, metrics or value of metrics will be changed when internal
- * implementation is changed, so these metrics are not stable across Spark version.
- */
- private[spark] class ExecutorAllocationManagerSource extends Source {
- val sourceName = "ExecutorAllocationManager"
- val metricRegistry = new MetricRegistry()
-
- private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = {
- metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] {
- override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) }
- })
- }
-
- // The metrics are going to return the sum for all the different ResourceProfiles.
- registerGauge("numberExecutorsToAdd",
- numExecutorsToAddPerResourceProfileId.values.sum, 0)
- registerGauge("numberExecutorsPendingToRemove", executorMonitor.pendingRemovalCount, 0)
- registerGauge("numberAllExecutors", executorMonitor.executorCount, 0)
- registerGauge("numberTargetExecutors",
- numExecutorsTargetPerResourceProfileId.values.sum, 0)
- registerGauge("numberMaxNeededExecutors", numExecutorsTargetPerResourceProfileId.keys
- .map(maxNumExecutorsNeededPerResourceProfile(_)).sum, 0)
+/**
+ * Metric source for ExecutorAllocationManager to expose its internal executor allocation
+ * status to MetricsSystem.
+ * Note: These metrics heavily rely on the internal implementation of
+ * ExecutorAllocationManager, metrics or value of metrics will be changed when internal
+ * implementation is changed, so these metrics are not stable across Spark version.
+ */
+private[spark] class ExecutorAllocationManagerSource(
+ executorAllocationManager: ExecutorAllocationManager) extends Source {
+ val sourceName = "ExecutorAllocationManager"
+ val metricRegistry = new MetricRegistry()
+
+ private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = {
+ metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] {
+ override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) }
+ })
}
+
+ private def getCounter(name: String): Counter = {
+ metricRegistry.counter(MetricRegistry.name("executors", name))
+ }
+
+ val gracefullyDecommissioned: Counter = getCounter("numberExecutorsGracefullyDecommissioned")
+ val decommissionUnfinished: Counter = getCounter("numberExecutorsDecommissionUnfinished")
+ val driverKilled: Counter = getCounter("numberExecutorsKilledByDriver")
+ val exitedUnexpectedly: Counter = getCounter("numberExecutorsExitedUnexpectedly")
+
+ // The metrics are going to return the sum for all the different ResourceProfiles.
+ registerGauge("numberExecutorsToAdd",
+ executorAllocationManager.numExecutorsToAddPerResourceProfileId.values.sum, 0)
+ registerGauge("numberExecutorsPendingToRemove",
+ executorAllocationManager.executorMonitor.pendingRemovalCount, 0)
+ registerGauge("numberAllExecutors",
+ executorAllocationManager.executorMonitor.executorCount, 0)
+ registerGauge("numberTargetExecutors",
+ executorAllocationManager.numExecutorsTargetPerResourceProfileId.values.sum, 0)
+ registerGauge("numberMaxNeededExecutors",
+ executorAllocationManager.numExecutorsTargetPerResourceProfileId.keys
+ .map(executorAllocationManager.maxNumExecutorsNeededPerResourceProfile(_)).sum, 0)
}
private object ExecutorAllocationManager {
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index e1d3009..43c122a 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -39,7 +39,7 @@ import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.ResourceProfile._
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.rpc._
-import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
+import org.apache.spark.scheduler.{ExecutorLossMessage, ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils}
@@ -322,13 +322,13 @@ private[spark] class CoarseGrainedExecutorBackend(
// since the start of computing it.
if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {
logInfo("No running tasks, all blocks migrated, stopping.")
- exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+ exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)
} else {
logInfo("All blocks not yet migrated.")
}
} else {
logInfo("No running tasks, no block migration configured, stopping.")
- exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+ exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)
}
} else {
logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 2644d0a..f333c01 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -40,6 +40,10 @@ private[spark] object ExecutorExited {
}
}
+private[spark] object ExecutorLossMessage {
+ val decommissionFinished = "Finished decommissioning"
+}
+
private[spark] object ExecutorKilled extends ExecutorLossReason("Executor killed by driver.")
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index 8c84b318..cecd4b0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -38,7 +38,9 @@ private[spark] class ExecutorMonitor(
conf: SparkConf,
client: ExecutorAllocationClient,
listenerBus: LiveListenerBus,
- clock: Clock) extends SparkListener with CleanerListener with Logging {
+ clock: Clock,
+ metrics: ExecutorAllocationManagerSource = null)
+ extends SparkListener with CleanerListener with Logging {
private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(
conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
@@ -352,6 +354,22 @@ private[spark] class ExecutorMonitor(
val removed = executors.remove(event.executorId)
if (removed != null) {
decrementExecResourceProfileCount(removed.resourceProfileId)
+ if (removed.decommissioning) {
+ if (event.reason == ExecutorLossMessage.decommissionFinished) {
+ metrics.gracefullyDecommissioned.inc()
+ } else {
+ metrics.decommissionUnfinished.inc()
+ }
+ } else if (removed.pendingRemoval) {
+ metrics.driverKilled.inc()
+ } else {
+ metrics.exitedUnexpectedly.inc()
+ }
+ logInfo(s"Executor ${event.executorId} is removed. Remove reason statistics: (" +
+ s"gracefully decommissioned: ${metrics.gracefullyDecommissioned.getCount()}, " +
+ s"decommision unfinished: ${metrics.decommissionUnfinished.getCount()}, " +
+ s"driver killed: ${metrics.driverKilled.getCount()}, " +
+ s"unexpectedly exited: ${metrics.exitedUnexpectedly.getCount()}).")
if (!removed.pendingRemoval || !removed.decommissioning) {
nextTimeout.set(Long.MinValue)
}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 5ae596b..2fb5140 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -936,6 +936,53 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
assert(executorsPendingToRemove(manager).isEmpty)
}
+ test("SPARK-33763: metrics to track dynamic allocation (decommissionEnabled=false)") {
+ val manager = createManager(createConf(3, 5, 3))
+ (1 to 5).map(_.toString).foreach { id => onExecutorAddedDefaultProfile(manager, id) }
+
+ assert(executorsPendingToRemove(manager).isEmpty)
+ assert(removeExecutorsDefaultProfile(manager, Seq("1", "2")) === Seq("1", "2"))
+ assert(executorsPendingToRemove(manager).contains("1"))
+ assert(executorsPendingToRemove(manager).contains("2"))
+
+ onExecutorRemoved(manager, "1", "driver requested exit")
+ assert(manager.executorAllocationManagerSource.driverKilled.getCount() === 1)
+ assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 0)
+
+ onExecutorRemoved(manager, "2", "another driver requested exit")
+ assert(manager.executorAllocationManagerSource.driverKilled.getCount() === 2)
+ assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 0)
+
+ onExecutorRemoved(manager, "3", "this will be an unexpected exit")
+ assert(manager.executorAllocationManagerSource.driverKilled.getCount() === 2)
+ assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 1)
+ }
+
+ test("SPARK-33763: metrics to track dynamic allocation (decommissionEnabled = true)") {
+ val manager = createManager(createConf(3, 5, 3, decommissioningEnabled = true))
+ (1 to 5).map(_.toString).foreach { id => onExecutorAddedDefaultProfile(manager, id) }
+
+ assert(executorsPendingToRemove(manager).isEmpty)
+ assert(removeExecutorsDefaultProfile(manager, Seq("1", "2")) === Seq("1", "2"))
+ assert(executorsDecommissioning(manager).contains("1"))
+ assert(executorsDecommissioning(manager).contains("2"))
+
+ onExecutorRemoved(manager, "1", ExecutorLossMessage.decommissionFinished)
+ assert(manager.executorAllocationManagerSource.gracefullyDecommissioned.getCount() === 1)
+ assert(manager.executorAllocationManagerSource.decommissionUnfinished.getCount() === 0)
+ assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 0)
+
+ onExecutorRemoved(manager, "2", "stopped before gracefully finished")
+ assert(manager.executorAllocationManagerSource.gracefullyDecommissioned.getCount() === 1)
+ assert(manager.executorAllocationManagerSource.decommissionUnfinished.getCount() === 1)
+ assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 0)
+
+ onExecutorRemoved(manager, "3", "this will be an unexpected exit")
+ assert(manager.executorAllocationManagerSource.gracefullyDecommissioned.getCount() === 1)
+ assert(manager.executorAllocationManagerSource.decommissionUnfinished.getCount() === 1)
+ assert(manager.executorAllocationManagerSource.exitedUnexpectedly.getCount() === 1)
+ }
+
test("remove multiple executors") {
val manager = createManager(createConf(5, 10, 5))
(1 to 10).map(_.toString).foreach { id => onExecutorAddedDefaultProfile(manager, id) }
@@ -1701,8 +1748,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
post(SparkListenerExecutorAdded(0L, id, execInfo))
}
- private def onExecutorRemoved(manager: ExecutorAllocationManager, id: String): Unit = {
- post(SparkListenerExecutorRemoved(0L, id, null))
+ private def onExecutorRemoved(
+ manager: ExecutorAllocationManager,
+ id: String,
+ reason: String = null): Unit = {
+ post(SparkListenerExecutorRemoved(0L, id, reason))
}
private def onExecutorBusy(manager: ExecutorAllocationManager, id: String): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
index 6d49479..69afdb5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable
+import com.codahale.metrics.Counter
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{doAnswer, mock, when}
@@ -57,6 +58,15 @@ class ExecutorMonitorSuite extends SparkFunSuite {
// having to use mockito APIs directly in each test.
private val knownExecs = mutable.HashSet[String]()
+ private def allocationManagerSource(): ExecutorAllocationManagerSource = {
+ val metricSource = mock(classOf[ExecutorAllocationManagerSource])
+ when(metricSource.driverKilled).thenReturn(new Counter)
+ when(metricSource.decommissionUnfinished).thenReturn(new Counter)
+ when(metricSource.gracefullyDecommissioned).thenReturn(new Counter)
+ when(metricSource.exitedUnexpectedly).thenReturn(new Counter)
+ metricSource
+ }
+
override def beforeEach(): Unit = {
super.beforeEach()
knownExecs.clear()
@@ -65,7 +75,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
when(client.isExecutorActive(any())).thenAnswer { invocation =>
knownExecs.contains(invocation.getArguments()(0).asInstanceOf[String])
}
- monitor = new ExecutorMonitor(conf, client, null, clock)
+ monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource())
}
test("basic executor timeout") {
@@ -231,7 +241,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
conf.set(SHUFFLE_SERVICE_ENABLED, true).set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
- monitor = new ExecutorMonitor(conf, client, null, clock)
+ monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource())
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.MEMORY_ONLY))
@@ -292,7 +302,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
test("shuffle block tracking") {
val bus = mockListenerBus()
conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true).set(SHUFFLE_SERVICE_ENABLED, false)
- monitor = new ExecutorMonitor(conf, client, bus, clock)
+ monitor = new ExecutorMonitor(conf, client, bus, clock, allocationManagerSource())
// 3 jobs: 2 and 3 share a shuffle, 1 has a separate shuffle.
val stage1 = stageInfo(1, shuffleId = 0)
@@ -360,7 +370,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
test("SPARK-28839: Avoids NPE in context cleaner when shuffle service is on") {
val bus = mockListenerBus()
conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true).set(SHUFFLE_SERVICE_ENABLED, true)
- monitor = new ExecutorMonitor(conf, client, bus, clock) {
+ monitor = new ExecutorMonitor(conf, client, bus, clock, allocationManagerSource()) {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
throw new IllegalStateException("No event should be sent.")
}
@@ -372,7 +382,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
test("shuffle tracking with multiple executors and concurrent jobs") {
val bus = mockListenerBus()
conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true).set(SHUFFLE_SERVICE_ENABLED, false)
- monitor = new ExecutorMonitor(conf, client, bus, clock)
+ monitor = new ExecutorMonitor(conf, client, bus, clock, allocationManagerSource())
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo))
@@ -417,7 +427,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
.set(DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT, Long.MaxValue)
.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
.set(SHUFFLE_SERVICE_ENABLED, false)
- monitor = new ExecutorMonitor(conf, client, null, clock)
+ monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource())
// Generate events that will make executor 1 be idle, while still holding shuffle data.
// The executor should not be eligible for removal since the timeout is basically "infinite".
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index 9291861..75c27f6 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -16,6 +16,10 @@
*/
package org.apache.spark.deploy.k8s.integrationtest
+import org.scalatest.concurrent.PatienceConfiguration
+import org.scalatest.time.Minutes
+import org.scalatest.time.Span
+
import org.apache.spark.internal.config
private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
@@ -104,13 +108,15 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
mainClass = "",
expectedDriverLogOnCompletion = Seq(
"Finished waiting, stopping Spark",
- "Decommission executors"),
+ "Decommission executors",
+ "Remove reason statistics: (gracefully decommissioned: 1, decommision unfinished: 0, " +
+ "driver killed: 0, unexpectedly exited: 0)."),
appArgs = Array.empty[String],
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
isJVM = false,
pyFiles = None,
- executorPatience = None,
+ executorPatience = Some(None, Some(DECOMMISSIONING_FINISHED_TIMEOUT)),
decommissioningTest = false)
}
@@ -151,4 +157,5 @@ private[spark] object DecommissionSuite {
val PYSPARK_DECOMISSIONING: String = TEST_LOCAL_PYSPARK + "decommissioning.py"
val PYSPARK_DECOMISSIONING_CLEANUP: String = TEST_LOCAL_PYSPARK + "decommissioning_cleanup.py"
val PYSPARK_SCALE: String = TEST_LOCAL_PYSPARK + "autoscale.py"
+ val DECOMMISSIONING_FINISHED_TIMEOUT = PatienceConfiguration.Timeout(Span(4, Minutes))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org