You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/25 02:54:11 UTC

[GitHub] [spark] mridulm commented on a diff in pull request #37533: [SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

mridulm commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r954454349


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2259,37 +2259,51 @@ private[spark] class DAGScheduler(
                     }
 
                     override def onShuffleMergeFailure(e: Throwable): Unit = {
+                      if (e.isInstanceOf[IOException]) {
+                        logInfo(s"Failed to connect external shuffle service " +
+                          s"${shuffleServiceLoc.hostPort}")
+                        blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+                      }
                     }
                   })
             }
           }
         }, 0, TimeUnit.SECONDS)
       } else {
-        stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-          case (shuffleServiceLoc, index) =>
-            // Sends async request to shuffle service to finalize shuffle merge on that host
-            // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is cancelled
-            // TODO: during shuffleMergeFinalizeWaitSec
-            shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-              shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-              new MergeFinalizerListener {
-                override def onShuffleMergeSuccess(statuses: MergeStatuses): Unit = {
-                  assert(shuffleId == statuses.shuffleId)
-                  eventProcessLoop.post(RegisterMergeStatuses(stage, MergeStatus.
-                    convertMergeStatusesToMergeStatusArr(statuses, shuffleServiceLoc)))
-                  results(index).set(true)
-                }
+        shuffleMergeFinalizeScheduler.schedule(new Runnable {

Review Comment:
   Thoughts on pushing finalization send into the threadpool instead ?
   
   ```
           stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
             case (shuffleServiceLoc, index) =>
               shuffleMergeFinalizeScheduler.schedule(new Runnable() {
                 override def run(): Unit = {
                       // existing code within the "case"
                  
                 }
               }, 0, TimeUnit.SECONDS));
   ```
   
   We should also bump up the default number of threads in this threadpool.
   
   This will make sure that we wait utmost for `shuffleMergeResultsTimeoutSec` seconds for finalization to complete.
   
   Thoughts ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org