You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/11/27 01:57:56 UTC
[spark] branch branch-3.3 updated: [SPARK-40872][3.3] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new a483106c83d [SPARK-40872][3.3] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size
a483106c83d is described below
commit a483106c83dbd07134fac888906c826f9d43613f
Author: gaoyajun02 <ga...@meituan.com>
AuthorDate: Sun Nov 27 10:57:42 2022 +0900
[SPARK-40872][3.3] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size
### What changes were proposed in this pull request?
This is a backport PR of #38333.
When push-based shuffle is enabled, a zero-size buf error may occur when fetching shuffle chunks from bad nodes, especially when memory is full. In this case, we can fall back to original shuffle blocks.
### Why are the changes needed?
When the reduce task obtains the shuffle chunk with a zero-size buf, we let it fall back to original shuffle block. After verification, these blocks can be read successfully without shuffle retry.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
Closes #38751 from gaoyajun02/SPARK-40872-backport.
Authored-by: gaoyajun02 <ga...@meituan.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../spark/storage/PushBasedFetchHelper.scala | 2 +
.../storage/ShuffleBlockFetcherIterator.scala | 70 +++++++++++++---------
.../storage/ShuffleBlockFetcherIteratorSuite.scala | 13 ++++
3 files changed, 57 insertions(+), 28 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
index d83d9018ade..23c0117b7ea 100644
--- a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
+++ b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
@@ -284,6 +284,8 @@ private class PushBasedFetchHelper(
* 2. There is a failure when fetching remote shuffle chunks.
* 3. There is a failure when processing SuccessFetchResult which is for a shuffle chunk
* (local or remote).
+ * 4. There is a zero-size buffer when processing SuccessFetchResult for a shuffle chunk
+ * (local or remote).
*/
def initiateFallbackFetchForPushMergedBlock(
blockId: BlockId,
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index e2fc5389091..91bd3ab6d14 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -778,7 +778,7 @@ final class ShuffleBlockFetcherIterator(
logDebug("Number of requests in flight " + reqsInFlight)
}
- if (buf.size == 0) {
+ val in = if (buf.size == 0) {
// We will never legitimately receive a zero-size block. All blocks with zero records
// have zero size and all zero-size blocks have no records (and hence should never
// have been requested in the first place). This statement relies on behaviors of the
@@ -794,38 +794,52 @@ final class ShuffleBlockFetcherIterator(
// since the last call.
val msg = s"Received a zero-size buffer for block $blockId from $address " +
s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
- throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
- }
-
- val in = try {
- val bufIn = buf.createInputStream()
- if (checksumEnabled) {
- val checksum = ShuffleChecksumHelper.getChecksumByAlgorithm(checksumAlgorithm)
- checkedIn = new CheckedInputStream(bufIn, checksum)
- checkedIn
+ if (blockId.isShuffleChunk) {
+ // Zero-size block may come from nodes with hardware failures, For shuffle chunks,
+ // the original shuffle blocks that belong to that zero-size shuffle chunk is
+ // available and we can opt to fallback immediately.
+ logWarning(msg)
+ pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+ // Set result to null to trigger another iteration of the while loop to get either.
+ result = null
+ null
} else {
- bufIn
+ throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
}
- } catch {
- // The exception could only be throwed by local shuffle block
- case e: IOException =>
- assert(buf.isInstanceOf[FileSegmentManagedBuffer])
- e match {
- case ce: ClosedByInterruptException =>
- logError("Failed to create input stream from local block, " +
- ce.getMessage)
- case e: IOException => logError("Failed to create input stream from local block", e)
- }
- buf.release()
- if (blockId.isShuffleChunk) {
- pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
- // Set result to null to trigger another iteration of the while loop to get either.
- result = null
- null
+ } else {
+ try {
+ val bufIn = buf.createInputStream()
+ if (checksumEnabled) {
+ val checksum = ShuffleChecksumHelper.getChecksumByAlgorithm(checksumAlgorithm)
+ checkedIn = new CheckedInputStream(bufIn, checksum)
+ checkedIn
} else {
- throwFetchFailedException(blockId, mapIndex, address, e)
+ bufIn
}
+ } catch {
+ // The exception could only be throwed by local shuffle block
+ case e: IOException =>
+ assert(buf.isInstanceOf[FileSegmentManagedBuffer])
+ e match {
+ case ce: ClosedByInterruptException =>
+ logError("Failed to create input stream from local block, " +
+ ce.getMessage)
+ case e: IOException =>
+ logError("Failed to create input stream from local block", e)
+ }
+ buf.release()
+ if (blockId.isShuffleChunk) {
+ pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+ // Set result to null to trigger another iteration of the while loop to get
+ // either.
+ result = null
+ null
+ } else {
+ throwFetchFailedException(blockId, mapIndex, address, e)
+ }
+ }
}
+
if (in != null) {
try {
input = streamWrapper(blockId, in)
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index e6f05251046..78d25c8f29c 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -1786,4 +1786,17 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
ShuffleBlockId(0, 5, 2), ShuffleBlockId(0, 6, 2)))
}
+ test("SPARK-40872: fallback to original shuffle block when a push-merged shuffle chunk " +
+ "is zero-size") {
+ val blockManager = mock(classOf[BlockManager])
+ val localDirs = Array("local-dir")
+ val blocksByAddress = prepareForFallbackToLocalBlocks(
+ blockManager, Map(SHUFFLE_MERGER_IDENTIFIER -> localDirs))
+ val zeroSizeBuffer = createMockManagedBuffer(0)
+ doReturn(Seq({zeroSizeBuffer})).when(blockManager)
+ .getLocalMergedBlockData(ShuffleMergedBlockId(0, 0, 2), localDirs)
+ val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress,
+ blockManager = Some(blockManager), streamWrapperLimitSize = Some(100))
+ verifyLocalBlocksFromFallback(iterator)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org