You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/09 15:34:12 UTC

[GitHub] [spark] Ngone51 commented on a diff in pull request #39011: [SPARK-41469][CORE] Avoid unnecessary task rerun on decommissioned executor lost if shuffle data migrated

Ngone51 commented on code in PR #39011:
URL: https://github.com/apache/spark/pull/39011#discussion_r1044572392


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1046,17 +1046,46 @@ private[spark] class TaskSetManager(
 
   /** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */
   override def executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit = {
-    // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage,
-    // and we are not using an external shuffle server which could serve the shuffle outputs.
-    // The reason is the next stage wouldn't be able to fetch the data from this dead executor
-    // so we would need to rerun these tasks on other executors.
-    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) {
+    // Re-enqueue any tasks with potential shuffle data loss that ran on the failed executor
+    // if this is a shuffle map stage, and we are not using an external shuffle server which
+    // could serve the shuffle outputs or the executor lost is caused by decommission (which
+    // can destroy the whole host). The reason is the next stage wouldn't be able to fetch the
+    // data from this dead executor so we would need to rerun these tasks on other executors.
+    val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
+      (reason.isInstanceOf[ExecutorDecommission] || !env.blockManager.externalShuffleServiceEnabled)
+    if (maybeShuffleMapOutputLoss && !isZombie) {
       for ((tid, info) <- taskInfos if info.executorId == execId) {
         val index = info.index
+        lazy val isShuffleMapOutputAvailable = reason match {
+          case ExecutorDecommission(_, _) =>
+            val shuffleId = tasks(0).asInstanceOf[ShuffleMapTask].shuffleId
+            val mapId = if (conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
+              info.partitionId
+            } else {
+              tid
+            }
+            val locationOpt = env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+              .getMapOutputLocation(shuffleId, mapId)
+            // There are 3 cases of locationOpt:
+            // 1) locationOpt.isDefined && locationOpt.get.host == host:
+            //    this case implies that the shuffle map output is still on the lost executor. The
+            //    map output file is supposed to lose so we should rerun this task;
+            // 2) locationOpt.isDefined && locationOpt.get.host != host:
+            //    this case implies that the shuffle map output has been migrated to another
+            //    host. The task doesn't need to rerun;
+            // 3) location.isEmpty:

Review Comment:
   ```suggestion
               // 3) locationOpt.isEmpty:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org