You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2022/06/06 22:07:59 UTC

[spark] branch master updated: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f80041fdfdd [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true
f80041fdfdd is described below

commit f80041fdfddae66bead7a3950028ee04d1b60bd2
Author: Aravind Patnam <ap...@linkedin.com>
AuthorDate: Mon Jun 6 17:07:36 2022 -0500

    [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true
    
    ### What changes were proposed in this pull request?
    Adds the corruption exception handling for merged shuffle chunk when spark.shuffle.detectCorrupt is set to true(default value is true)
    
    ### Why are the changes needed?
    Prior to Spark 3.0, spark.shuffle.detectCorrupt is set to true by default, and this configuration is one of the knob for early corruption detection. So the fallback can be triggered as expected.
    
    After Spark 3.0, even though spark.shuffle.detectCorrupt is still set to true by default, but the early corruption detect knob is controlled with a new configuration spark.shuffle.detectCorrupt.useExtraMemory, and it set to false by default. Thus the default behavior, with only Magnet enabled after Spark 3.2.0(internal li-3.1.1), will disable the early corruption detection, thus no fallback will be triggered. And it will drop to throw an exception when start to read the corrupted blocks.
    
    We handle the corrupted stream for merged blocks by throwing a FetchFailedException in this case. This will trigger a retry based on the values of spark.shuffle.detectCorrupt.useExtraMemory and spark.shuffle.detectCorrupt.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    - Tested on internal cluster
    - Added UT
    
    This is a PR to tackle some of the build weirdness found in PR 36601 (https://github.com/apache/spark/pull/36601).
    It contains the exact same diff. Closed that one out and recreated it here.
    
    Closes #36734 from akpatnam25/SPARK-38987.
    
    Authored-by: Aravind Patnam <ap...@linkedin.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  22 +++-
 .../storage/ShuffleBlockFetcherIterator.scala      |   3 +
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 115 ++++++++++++++++++++-
 .../storage/ShuffleBlockFetcherIteratorSuite.scala |  28 +++++
 4 files changed, 163 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7d26d9e8d61..289296f6fdb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1885,6 +1885,16 @@ private[spark] class DAGScheduler(
               mapOutputTracker.
                 unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex))
             }
+          } else {
+            // Unregister the merge result of <shuffleId, reduceId> if there is a FetchFailed event
+            // and is not a  MetaDataFetchException which is signified by bmAddress being null
+            if (bmAddress != null &&
+              bmAddress.executorId.equals(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)) {
+              assert(pushBasedShuffleEnabled, "Push based shuffle expected to " +
+                "be enabled when handling merge block fetch failure.")
+              mapOutputTracker.
+                unregisterMergeResult(shuffleId, reduceId, bmAddress, None)
+            }
           }
 
           if (failedStage.rdd.isBarrier()) {
@@ -2449,7 +2459,15 @@ private[spark] class DAGScheduler(
     val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
     logDebug(s"Considering removal of executor $execId; " +
       s"fileLost: $fileLost, currentEpoch: $currentEpoch")
-    if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) {
+    // Check if the execId is a shuffle push merger. We do not remove the executor if it is,
+    // and only remove the outputs on the host.
+    val isShuffleMerger = execId.equals(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+    if (isShuffleMerger && pushBasedShuffleEnabled) {
+      hostToUnregisterOutputs.foreach(
+        host => blockManagerMaster.removeShufflePushMergerLocation(host))
+    }
+    if (!isShuffleMerger &&
+      (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch)) {
       executorFailureEpoch(execId) = currentEpoch
       logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
       if (pushBasedShuffleEnabled) {
@@ -2461,6 +2479,8 @@ private[spark] class DAGScheduler(
       clearCacheLocs()
     }
     if (fileLost) {
+      // When the fetch failure is for a merged shuffle chunk, ignoreShuffleFileLostEpoch is true
+      // and so all the files will be removed.
       val remove = if (ignoreShuffleFileLostEpoch) {
         true
       } else if (!shuffleFileLostEpoch.contains(execId) ||
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index c91aaa8ddb7..b5dd2f640b8 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -1166,6 +1166,9 @@ final class ShuffleBlockFetcherIterator(
       case ShuffleBlockBatchId(shuffleId, mapId, startReduceId, _) =>
         throw SparkCoreErrors.fetchFailedError(address, shuffleId, mapId, mapIndex, startReduceId,
           msg, e)
+      case ShuffleBlockChunkId(shuffleId, _, reduceId, _) =>
+        throw SparkCoreErrors.fetchFailedError(address, shuffleId,
+          SHUFFLE_PUSH_MAP_ID.toLong, SHUFFLE_PUSH_MAP_ID, reduceId, msg, e)
       case _ => throw SparkCoreErrors.failToGetNonShuffleBlockError(blockId, e)
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 47fb8d70e5d..e51ced860db 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -293,6 +293,10 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     override def removeExecutor(execId: String): Unit = {
       // don't need to propagate to the driver, which we don't have
     }
+
+    override def removeShufflePushMergerLocation(host: String): Unit = {
+      // don't need to propagate to the driver, which we don't have
+    }
   }
 
   /** The list of results that DAGScheduler has collected. */
@@ -4342,6 +4346,101 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assertDataStructuresEmpty()
   }
 
+  test("SPARK-38987: corrupted merged shuffle block FetchFailure should unregister merge results") {
+    initPushBasedShuffleConfs(conf)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
+
+    scheduler = new MyDAGScheduler(
+      sc,
+      taskScheduler,
+      sc.listenerBus,
+      mapOutputTracker,
+      blockManagerMaster,
+      sc.env,
+      shuffleMergeFinalize = false,
+      shuffleMergeRegister = false)
+    dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler)
+
+    val parts = 2
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    val shuffleMapStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+      Seq((0, makeMergeStatus("hostA", shuffleDep.shuffleMergeId, isShufflePushMerger = true))))
+    scheduler.handleShuffleMergeFinalized(shuffleMapStage,
+      shuffleMapStage.shuffleDep.shuffleMergeId)
+    scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+      Seq((1, makeMergeStatus("hostA", shuffleDep.shuffleMergeId, isShufflePushMerger = true))))
+
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) == 1)
+
+    // Complete shuffle map stage with FetchFailed on hostA
+    complete(taskSets(0), taskSets(0).tasks.zipWithIndex.map {
+      case (task, _) =>
+        (FetchFailed(
+          makeBlockManagerId("hostA", execId = Some(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)),
+          shuffleDep.shuffleId, -1L, -1, 0, "corruption fetch failure"), null)
+    }.toSeq)
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) == 0)
+  }
+
+  test("SPARK-38987: All shuffle outputs for a shuffle push" +
+    " merger executor should be cleaned up on a fetch failure when" +
+    "spark.files.fetchFailure.unRegisterOutputOnHost is true") {
+    initPushBasedShuffleConfs(conf)
+    conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
+
+    val shuffleMapRdd = new MyRDD(sc, 3, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker)
+
+    submit(reduceRdd, Array(0, 1, 2))
+    // Map stage completes successfully,
+    // two tasks are run on an executor on hostA and one on an executor on hostB
+    completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostA", "hostB"))
+    // Now the executor on hostA is lost
+    runEvent(ExecutorLost(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER,
+      ExecutorExited(-100, false, "Container marked as failed")))
+
+    // Shuffle push merger executor should not be removed and the shuffle files are not unregistered
+    verify(blockManagerMaster, times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+    verify(mapOutputTracker,
+      times(0)).removeOutputsOnExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+
+    // Now a fetch failure from the lost executor occurs
+    complete(taskSets(1), Seq(
+      (FetchFailed(BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, "hostA", 12345),
+        shuffleId, 0L, 0, 0, "ignored"), null)
+    ))
+
+    // Verify that we are not removing the executor,
+    // and that we are only removing the outputs on the host
+    verify(blockManagerMaster, times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+    verify(blockManagerMaster, times(1)).removeShufflePushMergerLocation("hostA")
+    verify(mapOutputTracker,
+      times(1)).removeOutputsOnHost("hostA")
+
+    // There should be no map statuses or merge statuses on the host
+    val shuffleStatuses = mapOutputTracker.shuffleStatuses(shuffleId)
+    val mapStatuses = shuffleStatuses.mapStatuses
+    val mergeStatuses = shuffleStatuses.mergeStatuses
+    assert(mapStatuses.count(_ != null) === 1)
+    assert(mapStatuses.count(s => s != null
+      && s.location.executorId == BlockManagerId.SHUFFLE_MERGER_IDENTIFIER) === 0)
+    assert(mergeStatuses.count(s => s != null
+      && s.location.executorId == BlockManagerId.SHUFFLE_MERGER_IDENTIFIER) === 0)
+    // hostB-exec should still have its shuffle files
+    assert(mapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1)
+  }
+
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
    * Note that this checks only the host and not the executor ID.
@@ -4402,12 +4501,20 @@ object DAGSchedulerSuite {
   def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2, mapTaskId: Long = -1): MapStatus =
     MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes), mapTaskId)
 
-  def makeBlockManagerId(host: String): BlockManagerId = {
-    BlockManagerId(host + "-exec", host, 12345)
+  def makeBlockManagerId(host: String, execId: Option[String] = None): BlockManagerId = {
+    BlockManagerId(execId.getOrElse(host + "-exec"), host, 12345)
   }
 
-  def makeMergeStatus(host: String, shuffleMergeId: Int, size: Long = 1000): MergeStatus =
-    MergeStatus(makeBlockManagerId(host), shuffleMergeId, mock(classOf[RoaringBitmap]), size)
+  def makeMergeStatus(host: String, shuffleMergeId: Int, size: Long = 1000,
+    isShufflePushMerger: Boolean = false): MergeStatus = {
+    val execId = if (isShufflePushMerger) {
+      Some(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+    } else {
+      None
+    }
+    MergeStatus(makeBlockManagerId(host, execId),
+      shuffleMergeId, mock(classOf[RoaringBitmap]), size)
+  }
 
   def addMergerLocs(locs: Seq[String]): Unit = {
     locs.foreach { loc => mergerLocs.append(makeBlockManagerId(loc)) }
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index e6f05251046..f8fe28c0512 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -1786,4 +1786,32 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
       ShuffleBlockId(0, 5, 2), ShuffleBlockId(0, 6, 2)))
   }
 
+  test("SPARK-38987: failure to fetch corrupted shuffle block chunk should " +
+    "throw a FetchFailedException when early detection is unable to catch corruption") {
+    val blockManager = mock(classOf[BlockManager])
+    val localDirs = Array("local-dir")
+    val localHost = "test-local-host"
+    val localBmId = BlockManagerId("test-client", localHost, 1)
+    doReturn(localBmId).when(blockManager).blockManagerId
+    initHostLocalDirManager(blockManager, Map(SHUFFLE_MERGER_IDENTIFIER -> localDirs))
+    // Prepare shuffle block chunks
+    val pushMergedBmId = BlockManagerId(SHUFFLE_MERGER_IDENTIFIER, localHost, 1)
+    val blocksByAddress = Map[BlockManagerId, Seq[(BlockId, Long, Int)]](
+      (localBmId, toBlockList(Seq(ShuffleBlockChunkId(0, 0, 2, 0)), 1L, 1)),
+      (pushMergedBmId, toBlockList(
+        Seq(ShuffleMergedBlockId(0, 0, 2)), 2L, SHUFFLE_PUSH_MAP_ID)))
+
+    val corruptBuffer = createMockManagedBuffer(2)
+    doReturn(Seq({corruptBuffer})).when(blockManager)
+      .getLocalMergedBlockData(ShuffleMergedBlockId(0, 0, 2), localDirs)
+    val corruptStream = mock(classOf[InputStream])
+    when(corruptStream.read(any(), any(), any())).thenThrow(new IOException("corrupt"))
+    doReturn(corruptStream).when(corruptBuffer).createInputStream()
+    // Disable corruption detection in the iterator
+    val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress,
+      blockManager = Some(blockManager), streamWrapperLimitSize = Some(100),
+      detectCorruptUseExtraMemory = false, detectCorrupt = false)
+    intercept[FetchFailedException] { iterator.next() }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org