You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2021/11/11 08:57:17 UTC

[spark] branch master updated: [SPARK-37023][CORE] Avoid fetching merge status when shuffleMergeEnabled is false for a shuffleDependency during retry

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f1532a2  [SPARK-37023][CORE] Avoid fetching merge status when shuffleMergeEnabled is false for a shuffleDependency during retry
f1532a2 is described below

commit f1532a291179665a3b69dad640a770ecfcbed629
Author: Minchu Yang <mi...@minyang-mn3.linkedin.biz>
AuthorDate: Thu Nov 11 02:56:46 2021 -0600

    [SPARK-37023][CORE] Avoid fetching merge status when shuffleMergeEnabled is false for a shuffleDependency during retry
    
    ### What changes were proposed in this pull request?
    
    At high level, created a helper method `getMapSizesByExecutorIdImpl` on which `getMapSizesByExecutorId` and `getPushBasedShuffleMapSizesByExecutorId` can rely. It takes a parameter `useMergeResult`, which helps to check if fetching merge result is needed or not, and pass it as `canFetchMergeResult` into `getStatuses`.
    
    ### Why are the changes needed?
    
    During some stage retry cases, the `shuffleDependency.shuffleMergeEnabled` can be set to false, but there will be `mergeStatus` since the Driver has already collected the merged status for its shuffle dependency. If this is the case, the current implementation would set the enableBatchFetch to false, since there are mergeStatus, to cause the assertion in `MapOutoutputTracker.getMapSizesByExecutorId` failed:
    ```
    assert(mapSizesByExecutorId.enableBatchFetch == true)
    ```
    
    The proposed fix helps resolve the issue.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Passed the existing UTs.
    
    Closes #34461 from rmcyang/SPARK-37023.
    
    Authored-by: Minchu Yang <mi...@minyang-mn3.linkedin.biz>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../scala/org/apache/spark/MapOutputTracker.scala  | 63 +++++++++++++++++-----
 .../org/apache/spark/MapOutputTrackerSuite.scala   | 56 +++++++++++++++++++
 2 files changed, 107 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 588f7d2..af26abc 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -538,12 +538,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
-    val mapSizesByExecutorId = getPushBasedShuffleMapSizesByExecutorId(
-      shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
-    assert(mapSizesByExecutorId.enableBatchFetch == true)
-    mapSizesByExecutorId.iter
-  }
+      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
 
   /**
    * Called from executors to get the server URIs and output sizes for each shuffle block that
@@ -1096,7 +1091,20 @@ private[spark] class MapOutputTrackerMaster(
   }
 
   // This method is only called in local-mode.
-  def getPushBasedShuffleMapSizesByExecutorId(
+  override def getMapSizesByExecutorId(
+      shuffleId: Int,
+      startMapIndex: Int,
+      endMapIndex: Int,
+      startPartition: Int,
+      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
+    val mapSizesByExecutorId = getPushBasedShuffleMapSizesByExecutorId(
+      shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
+    assert(mapSizesByExecutorId.enableBatchFetch == true)
+    mapSizesByExecutorId.iter
+  }
+
+  // This method is only called in local-mode.
+  override def getPushBasedShuffleMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
@@ -1174,14 +1182,44 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
    */
   private val fetchingLock = new KeyLock[Int]
 
+  override def getMapSizesByExecutorId(
+      shuffleId: Int,
+      startMapIndex: Int,
+      endMapIndex: Int,
+      startPartition: Int,
+      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
+    val mapSizesByExecutorId = getMapSizesByExecutorIdImpl(
+      shuffleId, startMapIndex, endMapIndex, startPartition, endPartition, useMergeResult = false)
+    assert(mapSizesByExecutorId.enableBatchFetch == true)
+    mapSizesByExecutorId.iter
+  }
+
   override def getPushBasedShuffleMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
       endPartition: Int): MapSizesByExecutorId = {
+    getMapSizesByExecutorIdImpl(
+      shuffleId, startMapIndex, endMapIndex, startPartition, endPartition, useMergeResult = true)
+  }
+
+  private def getMapSizesByExecutorIdImpl(
+      shuffleId: Int,
+      startMapIndex: Int,
+      endMapIndex: Int,
+      startPartition: Int,
+      endPartition: Int,
+      useMergeResult: Boolean): MapSizesByExecutorId = {
     logDebug(s"Fetching outputs for shuffle $shuffleId")
-    val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId, conf)
+    val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId, conf,
+      // EnableBatchFetch can be set to false during stage retry when the
+      // shuffleDependency.shuffleMergeEnabled is set to false, and Driver
+      // has already collected the mergedStatus for its shuffle dependency.
+      // In this case, boolean check helps to insure that the unnecessary
+      // mergeStatus won't be fetched, thus mergedOutputStatuses won't be
+      // passed to convertMapStatuses. See details in [SPARK-37023].
+      if (useMergeResult) fetchMergeResult else false)
     try {
       val actualEndMapIndex =
         if (endMapIndex == Int.MaxValue) mapOutputStatuses.length else endMapIndex
@@ -1205,7 +1243,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     logDebug(s"Fetching backup outputs for shuffle $shuffleId, partition $partitionId")
     // Fetch the map statuses and merge statuses again since they might have already been
     // cleared by another task running in the same executor.
-    val (mapOutputStatuses, mergeResultStatuses) = getStatuses(shuffleId, conf)
+    val (mapOutputStatuses, mergeResultStatuses) = getStatuses(shuffleId, conf, fetchMergeResult)
     try {
       val mergeStatus = mergeResultStatuses(partitionId)
       // If the original MergeStatus is no longer available, we cannot identify the list of
@@ -1230,7 +1268,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     logDebug(s"Fetching backup outputs for shuffle $shuffleId, partition $partitionId")
     // Fetch the map statuses and merge statuses again since they might have already been
     // cleared by another task running in the same executor.
-    val (mapOutputStatuses, _) = getStatuses(shuffleId, conf)
+    val (mapOutputStatuses, _) = getStatuses(shuffleId, conf, fetchMergeResult)
     try {
       MapOutputTracker.getMapStatusesForMergeStatus(shuffleId, partitionId, mapOutputStatuses,
         chunkTracker)
@@ -1252,8 +1290,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
    */
   private def getStatuses(
       shuffleId: Int,
-      conf: SparkConf): (Array[MapStatus], Array[MergeStatus]) = {
-    if (fetchMergeResult) {
+      conf: SparkConf,
+      canFetchMergeResult: Boolean): (Array[MapStatus], Array[MergeStatus]) = {
+    if (canFetchMergeResult) {
       val mapOutputStatuses = mapStatuses.get(shuffleId).orNull
       val mergeOutputStatuses = mergeStatuses.get(shuffleId).orNull
 
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 8bebecf..0ee2c77 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -854,4 +854,60 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
     masterTracker.stop()
     rpcEnv.shutdown()
   }
+
+  test("SPARK-37023: Avoid fetching merge status when shuffleMergeEnabled is false") {
+    val newConf = new SparkConf
+    newConf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
+    newConf.set(IS_TESTING, true)
+    newConf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+    val hostname = "localhost"
+    val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(newConf))
+
+    val masterTracker = newTrackerMaster()
+    masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+      new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf))
+
+    val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(newConf))
+    val slaveTracker = new MapOutputTrackerWorker(newConf)
+    slaveTracker.trackerEndpoint =
+      slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
+
+    masterTracker.registerShuffle(10, 4, 1)
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+    val bitmap = new RoaringBitmap()
+    bitmap.add(0)
+    bitmap.add(1)
+    bitmap.add(3)
+
+    val blockMgrId = BlockManagerId("a", "hostA", 1000)
+    masterTracker.registerMapOutput(10, 0, MapStatus(blockMgrId, Array(1000L), 0))
+    masterTracker.registerMapOutput(10, 1, MapStatus(blockMgrId, Array(1000L), 1))
+    masterTracker.registerMapOutput(10, 2, MapStatus(blockMgrId, Array(1000L), 2))
+    masterTracker.registerMapOutput(10, 3, MapStatus(blockMgrId, Array(1000L), 3))
+
+    masterTracker.registerMergeResult(10, 0, MergeStatus(blockMgrId, 0,
+      bitmap, 3000L))
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+    val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+
+    val mapSizesByExecutorId = slaveTracker.getMapSizesByExecutorId(10, 0)
+    // mapSizesByExecutorId does not contain the merged block, since merge status is not fetched
+    assert(mapSizesByExecutorId.toSeq ===
+      Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000, 0),
+        (ShuffleBlockId(10, 1, 0), size1000, 1),
+        (ShuffleBlockId(10, 2, 0), size1000, 2),
+        (ShuffleBlockId(10, 3, 0), size1000, 3)))))
+    val pushBasedShuffleMapSizesByExecutorId =
+      slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0)
+    // pushBasedShuffleMapSizesByExecutorId will contain the merged block, since merge status
+    // is fetched
+    assert(pushBasedShuffleMapSizesByExecutorId.iter.toSeq ===
+      Seq((blockMgrId, ArrayBuffer((ShuffleMergedBlockId(10, 0, 0), 3000, -1),
+        (ShuffleBlockId(10, 2, 0), size1000, 2)))))
+
+    masterTracker.stop()
+    slaveTracker.stop()
+    rpcEnv.shutdown()
+    slaveRpcEnv.shutdown()
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org