You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/12/04 06:09:00 UTC

(spark) branch branch-3.5 updated: [SPARK-46182][CORE] Track `lastTaskFinishTime` using the exact task finished event

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 273ef5708fc3 [SPARK-46182][CORE] Track `lastTaskFinishTime` using the exact task finished event
273ef5708fc3 is described below

commit 273ef5708fc33872cfe3091627617bbac8fdd56f
Author: Xingbo Jiang <xi...@databricks.com>
AuthorDate: Sun Dec 3 22:08:20 2023 -0800

    [SPARK-46182][CORE] Track `lastTaskFinishTime` using the exact task finished event
    
    ### What changes were proposed in this pull request?
    
    We found a race condition between lastTaskRunningTime and lastShuffleMigrationTime that could lead to a decommissioned executor exit before all the shuffle blocks have been discovered. The issue could lead to immediate task retry right after an executor exit, thus longer query execution time.
    
    To fix the issue, we choose to update the lastTaskRunningTime only when a task updates its status to finished through the StatusUpdate event. This is better than the current approach (which use a thread to check for number of running tasks every second), because in this way we clearly know whether the shuffle block refresh happened after all tasks finished running or not, thus resolved the race condition mentioned above.
    
    ### Why are the changes needed?
    
    To fix a race condition that could lead to shuffle data lost, thus longer query execution time.
    
    ### How was this patch tested?
    
    This is a very subtle race condition that is hard to write a unit test using current unit test framework. And we are confident the change is low risk. Thus only verify by passing all the existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #44090 from jiangxb1987/SPARK-46182.
    
    Authored-by: Xingbo Jiang <xi...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 6f112f7b1a50a2b8a59952c69f67dd5f80ab6633)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/executor/CoarseGrainedExecutorBackend.scala    | 16 +++++++---------
 1 file changed, 7 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index c695a9ec2851..537522326fc7 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -21,7 +21,7 @@ import java.net.URL
 import java.nio.ByteBuffer
 import java.util.Locale
 import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
 import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
@@ -80,6 +80,10 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   private var decommissioned = false
 
+  // Track the last time in ns that at least one task is running. If no task is running and all
+  // shuffle/RDD data migration are done, the decommissioned executor should exit.
+  private var lastTaskFinishTime = new AtomicLong(System.nanoTime())
+
   override def onStart(): Unit = {
     if (env.conf.get(DECOMMISSION_ENABLED)) {
       val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL)
@@ -269,6 +273,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
     if (TaskState.isFinished(state)) {
       taskResources.remove(taskId)
+      lastTaskFinishTime.set(System.nanoTime())
     }
     driver match {
       case Some(driverRef) => driverRef.send(msg)
@@ -341,7 +346,6 @@ private[spark] class CoarseGrainedExecutorBackend(
 
       val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
         override def run(): Unit = {
-          var lastTaskRunningTime = System.nanoTime()
           val sleep_time = 1000 // 1s
           // This config is internal and only used by unit tests to force an executor
           // to hang around for longer when decommissioned.
@@ -358,7 +362,7 @@ private[spark] class CoarseGrainedExecutorBackend(
                 val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo()
                 // We can only trust allBlocksMigrated boolean value if there were no tasks running
                 // since the start of computing it.
-                if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {
+                if (allBlocksMigrated && (migrationTime > lastTaskFinishTime.get())) {
                   logInfo("No running tasks, all blocks migrated, stopping.")
                   exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)
                 } else {
@@ -370,12 +374,6 @@ private[spark] class CoarseGrainedExecutorBackend(
               }
             } else {
               logInfo(s"Blocked from shutdown by ${executor.numRunningTasks} running tasks")
-              // If there is a running task it could store blocks, so make sure we wait for a
-              // migration loop to complete after the last task is done.
-              // Note: this is only advanced if there is a running task, if there
-              // is no running task but the blocks are not done migrating this does not
-              // move forward.
-              lastTaskRunningTime = System.nanoTime()
             }
             Thread.sleep(sleep_time)
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org