You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/21 12:37:05 UTC

[GitHub] [spark] gaoyajun02 opened a new pull request, #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

gaoyajun02 opened a new pull request, #38333:
URL: https://github.com/apache/spark/pull/38333

   ### What changes were proposed in this pull request?
   A large number of shuffle tests in our cluster show that bad nodes with chunk corruption appear have a probability of fetching zero-size shuffleChunks. In this case, we can fall back to original shuffle blocks.
   
   
   ### Why are the changes needed?
   When the reduce task obtains the merge data with a zero-size buf error, we let it fall back to original shuffle block. After verification, these blocks can be read successfully without shuffle retry.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1298452168

   We have now located the cause of zero-size chunk loss, 
   We have located the cause of the zero-size chunk problem on the shuffle service node. and there is the following information in the system `dmesg -T`:
   ```
   e Nov  1 19:40:04 2022] EXT4-fs (sde1): Delayed block allocation failed for inode 25755946 at logical offset 0 with max blocks 15 with error 117
   [Tue Nov  1 20:01:04 2022] EXT4-fs (sde1): Delayed block allocation failed for inode 23266116 at logical offset 0 with max blocks 15 with error 117
   [Tue Nov  1 20:01:04 2022] EXT4-fs (sde1): Delayed block allocation failed for inode 23266116 at logical offset 0 with max blocks 15 with error 117
   ```
   Although this is not from the software layer, and the number of bad nodes that lose data is very low, I think it makes sense to support fallback here.
   
   cc  @otterc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1318228713

   The test failure looks unrelated, can you retrigger the tests @gaoyajun02 ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
otterc commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1012336511


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,15 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)

Review Comment:
   What is the size that is returned in the meta request? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
otterc commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1012336511


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,15 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)

Review Comment:
   What is the size that is returned in the meta response? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1024857218


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,18 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              // Zero-size block may come from nodes with hardware failures, For shuffle chunks,
+              // the original shuffle blocks that belong to that zero-size shuffle chunk is
+              // available and we can opt to fallback immediately.
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+              // Set result to null to trigger another iteration of the while loop to get either.
+              result = null
+              null
+            } else {
+              throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            }
           }
 
           val in = try {

Review Comment:
   This does not look fixed.
   If `buf.size == 0`, even though we set `result = null`, we will enter the `val in = try {` block anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
otterc commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1302475841

   > If there are hardware issues which are causing failures - it is better to move the nodes to deny list and prevent them from getting used: we will keep seeing more failures, including for vanilla shuffle.
   > 
   > On other hand, I can also look at this as a data corruption issue - @otterc what was the plan around how we support shuffle corruption diagnosis for push based shuffle ([SPARK-36206](https://issues.apache.org/jira/browse/SPARK-36206), etc). Is the expectation that we fallback ? Or we diagnose + fail ?
   > 
   I think it is more efficient to fallback and fetch map outputs instead of failing the stage and regenerating the data of the partition. When the corrupt blocks are merged shuffle blocks or chunks we don't retry to fetch them anyways and fallback immediately.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1020753726


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,18 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              // Zero-size block may come from nodes with hardware failures, For shuffle chunks,
+              // the original shuffle blocks that belong to that zero-size shuffle chunk is
+              // available and we can opt to fallback immediately.
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+              // Set result to null to trigger another iteration of the while loop to get either.
+              result = null
+              null

Review Comment:
   update, Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1316974655

   > Couple of changes ...
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1013969510


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,15 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)

Review Comment:
   ok, updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1023946842


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,18 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              // Zero-size block may come from nodes with hardware failures, For shuffle chunks,
+              // the original shuffle blocks that belong to that zero-size shuffle chunk is
+              // available and we can opt to fallback immediately.
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+              // Set result to null to trigger another iteration of the while loop to get either.
+              result = null
+              null
+            } else {
+              throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            }
           }
 
           val in = try {

Review Comment:
   resolved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1023946842


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,18 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              // Zero-size block may come from nodes with hardware failures, For shuffle chunks,
+              // the original shuffle blocks that belong to that zero-size shuffle chunk is
+              // available and we can opt to fallback immediately.
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+              // Set result to null to trigger another iteration of the while loop to get either.
+              result = null
+              null
+            } else {
+              throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            }
           }
 
           val in = try {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1321511612

   Merged to master.
   Thanks for fixing this @gaoyajun02 !
   Thanks for the review @otterc :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1315692121

   There is a pending [comment](https://github.com/apache/spark/pull/38333/files#r1019735633), can you take a look at it @gaoyajun02 ? Thx 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1311084796

   > I think it is more efficient to fallback and fetch map outputs instead of failing the stage and regenerating the data of the partition. When the corrupt blocks are merged shuffle blocks or chunks we don't retry to fetch them anyways and fallback immediately.
   
   Sounds good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1019735633


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,18 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              // Zero-size block may come from nodes with hardware failures, For shuffle chunks,
+              // the original shuffle blocks that belong to that zero-size shuffle chunk is
+              // available and we can opt to fallback immediately.
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+              // Set result to null to trigger another iteration of the while loop to get either.
+              result = null
+              null
+            } else {
+              throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            }
           }
 
           val in = try {

Review Comment:
   We want to enter this block only when `result != null`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1012466373


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,15 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)

Review Comment:
   did you mean PushMergedRemoteMetaFetchResult? 
   The size of push-merged block is not zero, since the size of each chunk cannot be obtained on the reduce side, we print the zero-size log in the following code on the server side, and confirm that the indexFile has the same offset continuously. https://github.com/apache/spark/blob/9a7596e1dde0f1dd596aa6d3b2efbcb5d1ef70ea/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala#L500
   
   Then according to the hardware layer error information, we basically determine that the problem of data loss occurs in the process of writing data.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1287088925

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1020753726


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,18 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              // Zero-size block may come from nodes with hardware failures, For shuffle chunks,
+              // the original shuffle blocks that belong to that zero-size shuffle chunk is
+              // available and we can opt to fallback immediately.
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+              // Set result to null to trigger another iteration of the while loop to get either.
+              result = null
+              null

Review Comment:
   Thanks for review, updated. @mridulm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
otterc commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1320389162

   Looks good to me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1321531047

   Thank you, @mridulm !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1323136879

   > Thank you, @gaoyajun02 , @mridulm , @otterc .
   > 
   > * Do we need to backport this to branch-3.3?
   > * According to the previous failure description, what happens in branch-3.3 in case of failure?
   
   Since the 3.3 branch does not contain the pr of SPARK-38987, if the mergedChunk is zero-size, throwFetchFailedException is actually a SparkException, which will eventually cause the app to fail due to task failure 4 times.
   
   @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] asfgit closed pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size
URL: https://github.com/apache/spark/pull/38333


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1321529913

   I was on two minds whether to fix this in 3.3 as well ...
   Yes, 3.3 is affected by it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1300369485

   > For cases like this, it might actually be better to fail the task (and recompute the parent stage) - and leverage deny list to prevent tasks from running on the problematic node ?
   
   I think it is not necessary to recalculate the parent stage. This situation is similar to chunk corruption. We can fallback obtain shuffle block. The reasons are:
   1. Obtain shuffle blocks are available
   2. Recomputing the parent stage is very expensive in some large jobs and will make the application execution time longer
   3. We observed that these bad nodes lost data and changed very low, it may only appear once a few days


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1019734725


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,18 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              // Zero-size block may come from nodes with hardware failures, For shuffle chunks,
+              // the original shuffle blocks that belong to that zero-size shuffle chunk is
+              // available and we can opt to fallback immediately.
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+              // Set result to null to trigger another iteration of the while loop to get either.
+              result = null
+              null

Review Comment:
   remove `null` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1012466373


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,15 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)

Review Comment:
   did you mean PushMergedRemoteMetaFetchResult? 
   The size of push-merged block is not zero, since the size of each chunk cannot be obtained on the reduce side, we print the zero-size log in the following code on the server side, and confirm that the indexFile has the same offset continuously, but I actually don't understand why...
   https://github.com/apache/spark/blob/9a7596e1dde0f1dd596aa6d3b2efbcb5d1ef70ea/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala#L500
   
   Then according to the hardware layer error information, we basically determine that the problem of data loss occurs in the process of writing data.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
otterc commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1013227504


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,15 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)

Review Comment:
   Can you update the comment `initiateFallbackFetchForPushMergedBlock` with this scenario as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
otterc commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1013219402


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,15 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)

Review Comment:
   So we can read the meta and index files without any issues but when we read the data, there are problems and size of the chunk is 0. Yeah I think the change looks good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1012468152


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,15 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)

Review Comment:
   And then the size of the original shuffle block is not 0, because there is no FetchFailedException in the stage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1026206654


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,18 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              // Zero-size block may come from nodes with hardware failures, For shuffle chunks,
+              // the original shuffle blocks that belong to that zero-size shuffle chunk is
+              // available and we can opt to fallback immediately.
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+              // Set result to null to trigger another iteration of the while loop to get either.
+              result = null
+              null
+            } else {
+              throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            }
           }
 
           val in = try {

Review Comment:
   oh got it, can you take a look again? Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1316957282

   > Couple of changes ...
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1315695999

   Also, can you please update to latest master @gaoyajun02 ? Not sure why we are seeing the linter failure in build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1024857218


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,18 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              // Zero-size block may come from nodes with hardware failures, For shuffle chunks,
+              // the original shuffle blocks that belong to that zero-size shuffle chunk is
+              // available and we can opt to fallback immediately.
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+              // Set result to null to trigger another iteration of the while loop to get either.
+              result = null
+              null
+            } else {
+              throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            }
           }
 
           val in = try {

Review Comment:
   This does not look fixed.
   If `buf.size == 0`, even though we set `result = null`, we will enter the `val in = try {` block.
   Can you take a look @gaoyajun02 ? Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1299631344

   For cases like this, it might actually be better to add the node to deny list and fail the task to recompute the parent stage ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1012466373


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,15 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)

Review Comment:
   did you mean PushMergedRemoteMetaFetchResult? 
   The size of push-merged block is not zero, since the size of each chunk cannot be obtained on the reduce side, we print the zero-size log in the following code on the server side, and confirm that the indexFile has the same offset continuously, but I actually don't understand why indexFile has the same offset continuously.
   https://github.com/apache/spark/blob/9a7596e1dde0f1dd596aa6d3b2efbcb5d1ef70ea/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala#L500
   
   Then according to the hardware layer error information, we basically determine that the problem of data loss occurs in the process of writing data.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1301464492

   If there are hardware issues which are causing failures - it is better to move the nodes to deny list and prevent them from getting used: we will keep seeing more failures, including for vanilla shuffle.
   
   On other hand, I can also look at this as a data corruption issue - @otterc what was the plan around how we support shuffle corruption diagnosis for push based shuffle (SPARK-36206, etc). Is the expectation that we fallback ? Or we diagnose + fail ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1012466373


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -794,7 +794,15 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from $address " +
               s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
+            if (blockId.isShuffleChunk) {
+              logWarning(msg)
+              pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)

Review Comment:
   did you mean PushMergedRemoteMetaFetchResult? 
   The size of push-merged block is not zero, since the size of each chunk cannot be obtained on the reduce side, we print the zero-size log in the following code on the server side, and confirm that the indexFile has the same offset continuously, but I actually don't know why...
   https://github.com/apache/spark/blob/9a7596e1dde0f1dd596aa6d3b2efbcb5d1ef70ea/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala#L500
   
   Then according to the hardware layer error information, we basically determine that the problem of data loss occurs in the process of writing data.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1287007388

   +CC @otterc 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1321516325

   Thank you, @gaoyajun02 , @mridulm , @otterc .
   - Do we need to backport this to branch-3.3?
   - According to the previous failure description, what happens in branch-3.3 in case of failure?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaoyajun02 commented on pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

Posted by GitBox <gi...@apache.org>.
gaoyajun02 commented on PR #38333:
URL: https://github.com/apache/spark/pull/38333#issuecomment-1323119482

   > I was on two minds whether to fix this in 3.3 as well ... Yes, 3.3 is affected by it.
   > 
   > But agree, a backport to branch-3.3 would be helpful. Can you get it a shot @gaoyajun02 ? Might need to fix some minor nits to get a patch
   
   ok, can you take a look @mridulm cc @otterc @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org