You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/06/12 07:40:38 UTC
[spark] branch branch-3.4 updated: [SPARK-43398][CORE] Executor timeout should be max of idle shuffle and rdd timeout
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 1431df01dcb [SPARK-43398][CORE] Executor timeout should be max of idle shuffle and rdd timeout
1431df01dcb is described below
commit 1431df01dcb10fa85ce2ca6cd065e4be0af85585
Author: Warren Zhu <wa...@gmail.com>
AuthorDate: Mon Jun 12 00:40:13 2023 -0700
[SPARK-43398][CORE] Executor timeout should be max of idle shuffle and rdd timeout
### What changes were proposed in this pull request?
Executor timeout should be max of idle, shuffle and rdd timeout
### Why are the changes needed?
Wrong timeout value when combining idle, shuffle and rdd timeout
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added test in `ExecutorMonitorSuite`
Closes #41082 from warrenzhu25/max-timeout.
Authored-by: Warren Zhu <wa...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 7107742a381cde2e6de9425e3e436282a8c0d27c)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/scheduler/dynalloc/ExecutorMonitor.scala | 15 +++++-------
.../scheduler/dynalloc/ExecutorMonitorSuite.scala | 28 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 9 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index fc9248de7ee..34878b8e561 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -563,17 +563,14 @@ private[spark] class ExecutorMonitor(
def updateTimeout(): Unit = {
val oldDeadline = timeoutAt
val newDeadline = if (idleStart >= 0) {
- val timeout = if (cachedBlocks.nonEmpty || (shuffleIds != null && shuffleIds.nonEmpty)) {
- val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutNs else Long.MaxValue
- val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) {
- shuffleTimeoutNs
- } else {
- Long.MaxValue
- }
- math.min(_cacheTimeout, _shuffleTimeout)
+ val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutNs else 0
+ val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) {
+ shuffleTimeoutNs
} else {
- idleTimeoutNs
+ 0
}
+ // timeout should be max of idleTimeout, storageTimeout and shuffleTimeout
+ val timeout = Seq(_cacheTimeout, _shuffleTimeout, idleTimeoutNs).max
val deadline = idleStart + timeout
if (deadline >= 0) deadline else Long.MaxValue
} else {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
index da8c97e54d1..48382550090 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
@@ -476,6 +476,34 @@ class ExecutorMonitorSuite extends SparkFunSuite {
assert(monitor.executorCount == 0 )
}
+ for (isShuffleTrackingEnabled <- Seq(true, false)) {
+ test(s"SPARK-43398: executor timeout should be max of shuffle and rdd timeout with" +
+ s" shuffleTrackingEnabled as $isShuffleTrackingEnabled") {
+ conf
+ .set(DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT.key, "240s")
+ .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, isShuffleTrackingEnabled)
+ .set(SHUFFLE_SERVICE_ENABLED, false)
+ monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource())
+
+ monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
+ knownExecs += "1"
+ val stage1 = stageInfo(1, shuffleId = 0)
+ monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage1)))
+ monitor.onBlockUpdated(rddUpdate(1, 0, "1"))
+ val t1 = taskInfo("1", 1)
+ monitor.onTaskStart(SparkListenerTaskStart(1, 1, t1))
+ monitor.onTaskEnd(SparkListenerTaskEnd(1, 1, "foo", Success, t1, new ExecutorMetrics, null))
+ monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), JobSucceeded))
+
+ if (isShuffleTrackingEnabled) {
+ assert(monitor.timedOutExecutors(storageDeadline).isEmpty)
+ assert(monitor.timedOutExecutors(shuffleDeadline) == Seq("1"))
+ } else {
+ assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+ assert(monitor.timedOutExecutors(storageDeadline) == Seq("1"))
+ }
+ }
+ }
private def idleDeadline: Long = clock.nanoTime() + idleTimeoutNs + 1
private def storageDeadline: Long = clock.nanoTime() + storageTimeoutNs + 1
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org