You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/06/12 07:40:38 UTC

[spark] branch branch-3.4 updated: [SPARK-43398][CORE] Executor timeout should be max of idle shuffle and rdd timeout

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 1431df01dcb [SPARK-43398][CORE] Executor timeout should be max of idle shuffle and rdd timeout
1431df01dcb is described below

commit 1431df01dcb10fa85ce2ca6cd065e4be0af85585
Author: Warren Zhu <wa...@gmail.com>
AuthorDate: Mon Jun 12 00:40:13 2023 -0700

    [SPARK-43398][CORE] Executor timeout should be max of idle shuffle and rdd timeout
    
    ### What changes were proposed in this pull request?
    Executor timeout should be max of idle, shuffle and rdd timeout
    
    ### Why are the changes needed?
    Wrong timeout value when combining idle, shuffle and rdd timeout
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added test in `ExecutorMonitorSuite`
    
    Closes #41082 from warrenzhu25/max-timeout.
    
    Authored-by: Warren Zhu <wa...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 7107742a381cde2e6de9425e3e436282a8c0d27c)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/scheduler/dynalloc/ExecutorMonitor.scala | 15 +++++-------
 .../scheduler/dynalloc/ExecutorMonitorSuite.scala  | 28 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index fc9248de7ee..34878b8e561 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -563,17 +563,14 @@ private[spark] class ExecutorMonitor(
     def updateTimeout(): Unit = {
       val oldDeadline = timeoutAt
       val newDeadline = if (idleStart >= 0) {
-        val timeout = if (cachedBlocks.nonEmpty || (shuffleIds != null && shuffleIds.nonEmpty)) {
-          val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutNs else Long.MaxValue
-          val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) {
-            shuffleTimeoutNs
-          } else {
-            Long.MaxValue
-          }
-          math.min(_cacheTimeout, _shuffleTimeout)
+        val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutNs else 0
+        val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) {
+          shuffleTimeoutNs
         } else {
-          idleTimeoutNs
+          0
         }
+        // timeout should be max of idleTimeout, storageTimeout and shuffleTimeout
+        val timeout = Seq(_cacheTimeout, _shuffleTimeout, idleTimeoutNs).max
         val deadline = idleStart + timeout
         if (deadline >= 0) deadline else Long.MaxValue
       } else {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
index da8c97e54d1..48382550090 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
@@ -476,6 +476,34 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     assert(monitor.executorCount == 0 )
   }
 
+  for (isShuffleTrackingEnabled <- Seq(true, false)) {
+    test(s"SPARK-43398: executor timeout should be max of shuffle and rdd timeout with" +
+      s" shuffleTrackingEnabled as $isShuffleTrackingEnabled") {
+      conf
+        .set(DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT.key, "240s")
+        .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, isShuffleTrackingEnabled)
+        .set(SHUFFLE_SERVICE_ENABLED, false)
+      monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource())
+
+      monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
+      knownExecs += "1"
+      val stage1 = stageInfo(1, shuffleId = 0)
+      monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage1)))
+      monitor.onBlockUpdated(rddUpdate(1, 0, "1"))
+      val t1 = taskInfo("1", 1)
+      monitor.onTaskStart(SparkListenerTaskStart(1, 1, t1))
+      monitor.onTaskEnd(SparkListenerTaskEnd(1, 1, "foo", Success, t1, new ExecutorMetrics, null))
+      monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), JobSucceeded))
+
+      if (isShuffleTrackingEnabled) {
+        assert(monitor.timedOutExecutors(storageDeadline).isEmpty)
+        assert(monitor.timedOutExecutors(shuffleDeadline) == Seq("1"))
+      } else {
+        assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+        assert(monitor.timedOutExecutors(storageDeadline) == Seq("1"))
+      }
+    }
+  }
 
   private def idleDeadline: Long = clock.nanoTime() + idleTimeoutNs + 1
   private def storageDeadline: Long = clock.nanoTime() + storageTimeoutNs + 1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org