You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/11/27 01:57:56 UTC

[spark] branch branch-3.3 updated: [SPARK-40872][3.3] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new a483106c83d [SPARK-40872][3.3] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size
a483106c83d is described below

commit a483106c83dbd07134fac888906c826f9d43613f
Author: gaoyajun02 <ga...@meituan.com>
AuthorDate: Sun Nov 27 10:57:42 2022 +0900

    [SPARK-40872][3.3] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size
    
    ### What changes were proposed in this pull request?
    This is a backport PR of #38333.
    When push-based shuffle is enabled, a zero-size buf error may occur when fetching shuffle chunks from bad nodes, especially when memory is full. In this case, we can fall back to original shuffle blocks.
    
    ### Why are the changes needed?
    When the reduce task obtains the shuffle chunk with a zero-size buf, we let it fall back to original shuffle block. After verification, these blocks can be read successfully without shuffle retry.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT
    
    Closes #38751 from gaoyajun02/SPARK-40872-backport.
    
    Authored-by: gaoyajun02 <ga...@meituan.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../spark/storage/PushBasedFetchHelper.scala       |  2 +
 .../storage/ShuffleBlockFetcherIterator.scala      | 70 +++++++++++++---------
 .../storage/ShuffleBlockFetcherIteratorSuite.scala | 13 ++++
 3 files changed, 57 insertions(+), 28 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
index d83d9018ade..23c0117b7ea 100644
--- a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
+++ b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
@@ -284,6 +284,8 @@ private class PushBasedFetchHelper(
    * 2. There is a failure when fetching remote shuffle chunks.
    * 3. There is a failure when processing SuccessFetchResult which is for a shuffle chunk
    *    (local or remote).
+   * 4. There is a zero-size buffer when processing SuccessFetchResult for a shuffle chunk
+   *    (local or remote).
    */
   def initiateFallbackFetchForPushMergedBlock(
       blockId: BlockId,
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index e2fc5389091..91bd3ab6d14 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -778,7 +778,7 @@ final class ShuffleBlockFetcherIterator(
             logDebug("Number of requests in flight " + reqsInFlight)
           }
 
-          if (buf.size == 0) {
+          val in = if (buf.size == 0) {
             // We will never legitimately receive a zero-size block. All blocks with zero records
             // have zero size and all zero-size blocks have no records (and hence should never
             // have been requested in the first place). This statement relies on behaviors of the
@@ -794,38 +794,52 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
-          }
-
-          val in = try {
-            val bufIn = buf.createInputStream()
-            if (checksumEnabled) {
-              val checksum = ShuffleChecksumHelper.getChecksumByAlgorithm(checksumAlgorithm)
-              checkedIn = new CheckedInputStream(bufIn, checksum)
-              checkedIn
+            if (blockId.isShuffleChunk) {
+              // Zero-size block may come from nodes with hardware failures, For shuffle chunks,
+              // the original shuffle blocks that belong to that zero-size shuffle chunk is
+              // available and we can opt to fallback immediately.
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+              // Set result to null to trigger another iteration of the while loop to get either.
+              result = null
+              null
             } else {
-              bufIn
+              throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
             }
-          } catch {
-            // The exception could only be throwed by local shuffle block
-            case e: IOException =>
-              assert(buf.isInstanceOf[FileSegmentManagedBuffer])
-              e match {
-                case ce: ClosedByInterruptException =>
-                  logError("Failed to create input stream from local block, " +
-                    ce.getMessage)
-                case e: IOException => logError("Failed to create input stream from local block", e)
-              }
-              buf.release()
-              if (blockId.isShuffleChunk) {
-                pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
-                // Set result to null to trigger another iteration of the while loop to get either.
-                result = null
-                null
+          } else {
+            try {
+              val bufIn = buf.createInputStream()
+              if (checksumEnabled) {
+                val checksum = ShuffleChecksumHelper.getChecksumByAlgorithm(checksumAlgorithm)
+                checkedIn = new CheckedInputStream(bufIn, checksum)
+                checkedIn
               } else {
-                throwFetchFailedException(blockId, mapIndex, address, e)
+                bufIn
               }
+            } catch {
+              // The exception could only be throwed by local shuffle block
+              case e: IOException =>
+                assert(buf.isInstanceOf[FileSegmentManagedBuffer])
+                e match {
+                  case ce: ClosedByInterruptException =>
+                    logError("Failed to create input stream from local block, " +
+                      ce.getMessage)
+                  case e: IOException =>
+                    logError("Failed to create input stream from local block", e)
+                }
+                buf.release()
+                if (blockId.isShuffleChunk) {
+                  pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+                  // Set result to null to trigger another iteration of the while loop to get
+                  // either.
+                  result = null
+                  null
+                } else {
+                  throwFetchFailedException(blockId, mapIndex, address, e)
+                }
+            }
           }
+
           if (in != null) {
             try {
               input = streamWrapper(blockId, in)
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index e6f05251046..78d25c8f29c 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -1786,4 +1786,17 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
       ShuffleBlockId(0, 5, 2), ShuffleBlockId(0, 6, 2)))
   }
 
+  test("SPARK-40872: fallback to original shuffle block when a push-merged shuffle chunk " +
+    "is zero-size") {
+    val blockManager = mock(classOf[BlockManager])
+    val localDirs = Array("local-dir")
+    val blocksByAddress = prepareForFallbackToLocalBlocks(
+      blockManager, Map(SHUFFLE_MERGER_IDENTIFIER -> localDirs))
+    val zeroSizeBuffer = createMockManagedBuffer(0)
+    doReturn(Seq({zeroSizeBuffer})).when(blockManager)
+      .getLocalMergedBlockData(ShuffleMergedBlockId(0, 0, 2), localDirs)
+    val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress,
+      blockManager = Some(blockManager), streamWrapperLimitSize = Some(100))
+    verifyLocalBlocksFromFallback(iterator)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org