You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/22 21:37:37 UTC

[GitHub] [spark] otterc opened a new pull request #31934: [SPARK-32916][FOLLOW-UP][SHUFFLE] Fixes cases of corruption in merged shuffle …

otterc opened a new pull request #31934:
URL: https://github.com/apache/spark/pull/31934


   
   ### What changes were proposed in this pull request?
   This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in #30062.
   
   There are 2 scenarios where the merged blocks gets corrupted:
   1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
    - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
    - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.
   
   2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails. 
   
   Also adding additional changes that improve the code.
   
   1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
   2. Additional minor changes suggested by @mridulm during an internal review.
   
   ### Why are the changes needed?
   These are bug fixes and simplify the code.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.
   
   Co-authored-by: Chandni Singh chsingh@linkedin.com
   Co-authored-by: Min Shen mshen@linkedin.com
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31934: [SPARK-32916][FOLLOW-UP][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-804427909


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805033110


   > Hi, @otterc .
   > [SPARK-32916](https://issues.apache.org/jira/browse/SPARK-32916) is already released with `Fix Version: 3.1.0`. To have an independent `Fix Version`, this should have a new JIRA issue.
   
   I created SPARK-34840 to address this. cc. @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805061280


   > hmm..I try to understand how those 2 scenarios cause the merged block corrupted.
   > 
   > 1. Do you mean called `StreamCallback.onFailure()` for 2 times cause the block corrupted?Seems like the thing `onFailure` does is only to `setCurrentMapIndex(-1)` and `setEncounteredFailure(true)`. And they don't touch files, e.g., reset position or truncate.
   > 2. I can see how the duplicate stream may interfere with an active stream. e.g., the active stream may see `getCurrentMapIndex` < 0 and `isEncounteredFailure=true` while writing normally itself. But it seems like the active stream is able to heal itself with the current framework.
   > 
   > I properly missed some details. Could you elaborate more about how corruption happens? Thanks.
   
   In both the scenarios, the `currentMapId` of the shuffle partition is modified to -1 which can interfere with an active stream (stream that is writing). By interfering, I mean it gives a chance to another stream which is waiting to merge to same shuffle partition to start writing without the active stream completing successfully or with failure.
   
    Providing examples for both of these:
   1. When on `onFailure` is called twice
   - Say stream1 merging `shufflePush_0_1_2` wrote some data and has `isWriting=true`. Now it failed, so it sets `currentMapId` of partition_0_2 to `-1`. 
   - Another stream2 which wants to merge `shufflePush_0_2_2` can now start merging its bufs to partition_0_2 and it sets `currentMapId` of partition_0_2 to `2`. 
   - Another stream3 which wants to merge `shufflePush_0_3_2` will defer its buffers because `stream2` is the active one right now (`currentMapId` is 2).
   - stream2 has only merged few bufs, but then `stream1.onFailure()` is invoked again and that will change the `currentMapId` of partiton_0_2 to `-1`. This becomes a problem because `stream2` hasn't completed successfully (or with failure) and now `stream3` is `allowedToWrite`. If `stream3` starts writing buffers when `stream2` has not appended all its buffers, then the data of `shufflePush_0_2_2` will be corrupted.
   
   2. Duplicate stream. 
   - Say stream1 merging `shufflePush_0_1_2` wrote some data and has `isWriting=true`. It completed successfully and then sets `currentMapId` of partition_0_2 to `-1`. 
   - Now `stream1duplicate` which is also trying to merge `shufflePush_0_1_2` will be `allowedToWrite` because the `currentMapId` of partition_0_2 is `-1` and it sets `isWriting=true`. However,  we identify that it is a duplication stream and just return without modifying `currentMapId`.
   - stream2 which tries to merge `shufflePush_0_2_2` will be `allowedToWrite` because `currentMapId=-1`. It sets `currentMapId=2` and start writing.
   - If `stream2Duplicate` encounters a failure now, it has `isWriting` on and so can reset `currentMapId` of partition_0_2. This again gives a chance to another stream say stream3 to `allowedToWrite` without stream2 to complete.
   
   I have added UTs for both these cases as well with similar examples.
   @Ngone51 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a change in pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc commented on a change in pull request #31934:
URL: https://github.com/apache/spark/pull/31934#discussion_r600664075



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -738,14 +723,14 @@ public void onFailure(String streamId, Throwable throwable) throws IOException {
           Map<Integer, AppShufflePartitionInfo> shufflePartitions =
             mergeManager.partitions.get(partitionInfo.appShuffleId);
           if (shufflePartitions != null && shufflePartitions.containsKey(partitionInfo.reduceId)) {
-            logger.debug("{} shuffleId {} reduceId {} set encountered failure",
+            logger.debug("{} shuffleId {} reduceId {} encountered failure",
               partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
               partitionInfo.reduceId);
             partitionInfo.setCurrentMapIndex(-1);
-            partitionInfo.setEncounteredFailure(true);
           }
         }
       }
+      isWriting = false;

Review comment:
       I can move this to `if` scope and that would not change the behavior or cause any issues. The only reason I had it outside because it was consistent with where this flag is unset in `onComplete`. I understand that is a very trivial thing so can move this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805496338


   Ok, I get it now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805415392


   @otterc Thanks for the explanation. Now I understand the cause.
   
   To confirm, for the example 2, I think the first 2 steps are not necessary, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #31934: [SPARK-32916][FOLLOW-UP][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-804420882


   > does this cause data corruption if people use it with spark 3.1.1 release? Or are these block somehow caught and shuffle ends up failing?
   
   - In 3.1.1 push-based shuffle is not complete. As in, the changes needed to fetch merged shuffle blocks are not there. So, users in 3.1.1 can't use push-based shuffle.
   
   - In our implementation of fetch merged shuffle blocks (which is not in 3.1.1), if the client encounters any issues with the merged blocks then it falls-back to fetching original unmerged blocks that made that merged block. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #31934:
URL: https://github.com/apache/spark/pull/31934#discussion_r600994466



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -738,14 +723,14 @@ public void onFailure(String streamId, Throwable throwable) throws IOException {
           Map<Integer, AppShufflePartitionInfo> shufflePartitions =
             mergeManager.partitions.get(partitionInfo.appShuffleId);
           if (shufflePartitions != null && shufflePartitions.containsKey(partitionInfo.reduceId)) {
-            logger.debug("{} shuffleId {} reduceId {} set encountered failure",
+            logger.debug("{} shuffleId {} reduceId {} encountered failure",
               partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
               partitionInfo.reduceId);
             partitionInfo.setCurrentMapIndex(-1);
-            partitionInfo.setEncounteredFailure(true);
           }
         }
       }
+      isWriting = false;

Review comment:
       Ok, keeping it consistent sounds fine. we can leave it as it is since it's trivial.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #31934: [SPARK-32916][FOLLOW-UP][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-804418379


   does this cause data corruption if people use it with spark 3.1.1 release?  Or are these block somehow caught and shuffle ends up failing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc edited a comment on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc edited a comment on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805488580


   > To confirm, for the example 2, I think the first 2 steps are not necessary, right?
   
   @Ngone51 I think the first 2 steps are necessary because in this edge case this can only happen when a stream is trying to merge a duplicate block, which was `stream1duplicate` in my example, and fails. The problem with that is that we were setting `isWriting=true` early in such cases. So when it fails then it can unset currentMapId. 
   
   Let me know if I am missing some other cases. I can add UTs for them as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-807172157


   LGTM, thanks @otterc.
   Merging to master and 3.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a change in pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc commented on a change in pull request #31934:
URL: https://github.com/apache/spark/pull/31934#discussion_r600693768



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -639,7 +624,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
           // written to disk due to this reason. We thus decide to optimize for server
           // throughput and memory usage.
           if (deferredBufs == null) {
-            deferredBufs = new LinkedList<>();
+            deferredBufs = new ArrayList<>();

Review comment:
       Sure, fixed it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805045621


   Thank you, @otterc !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc edited a comment on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc edited a comment on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805488580


   > To confirm, for the example 2, I think the first 2 steps are not necessary, right?
   
   @Ngone51 I think the first 2 steps are necessary because in this edge case this can only happen when a stream is trying to merge a duplicate block, which was `stream1duplicate` in my example, and fails. The problem with that is that we were setting `isWriting=true` early in such cases. So when it fails then it can unset currentMapId. 
   
   Let me know if I am missing some other cases. I can add a UTs for them as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805493922


   > > * If stream2Duplicate encounters a failure now, it has isWriting on and so can reset currentMapId of partition_0_2. This again gives a chance to another stream say stream3 to allowedToWrite without stream2 to complete.
   > 
   > So, it should be `stream1Duplicate` instead of `stream2Duplicate` here?
   
   Right, that was a typo. Yes, it should be `stream1Duplicate`. Thanks for pointing it out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-807181096


   Thanks for the reviews @Ngone51, @dongjoon-hyun !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc edited a comment on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc edited a comment on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805061280


   > hmm..I try to understand how those 2 scenarios cause the merged block corrupted.
   > 
   > 1. Do you mean called `StreamCallback.onFailure()` for 2 times cause the block corrupted?Seems like the thing `onFailure` does is only to `setCurrentMapIndex(-1)` and `setEncounteredFailure(true)`. And they don't touch files, e.g., reset position or truncate.
   > 2. I can see how the duplicate stream may interfere with an active stream. e.g., the active stream may see `getCurrentMapIndex` < 0 and `isEncounteredFailure=true` while writing normally itself. But it seems like the active stream is able to heal itself with the current framework.
   > 
   > I properly missed some details. Could you elaborate more about how corruption happens? Thanks.
   
   In both the scenarios, the `currentMapId` of the shuffle partition is modified to -1 which can interfere with an active stream (stream that is writing). By interfering, I mean it gives a chance to another stream which is waiting to merge to same shuffle partition to start writing without the active stream completing successfully or with failure.
   
    Providing examples for both of these:
   1. When on `onFailure` is called twice
   - Say stream1 merging `shufflePush_0_1_2` wrote some data and has `isWriting=true`. Now it failed, so it sets `currentMapId` of partition_0_2 to `-1`. 
   - Another stream2 which wants to merge `shufflePush_0_2_2` can now start merging its bufs to partition_0_2 and it sets `currentMapId` of partition_0_2 to `2`. 
   - Another stream3 which wants to merge `shufflePush_0_3_2` will defer its buffers because `stream2` is the active one right now (`currentMapId` is 2).
   - stream2 has only merged few bufs, but then `stream1.onFailure()` is invoked again and that will change the `currentMapId` of partiton_0_2 to `-1`. This becomes a problem because `stream2` hasn't completed successfully (or with failure) and now `stream3` is `allowedToWrite`. If `stream3` starts writing buffers when `stream2` has not appended all its buffers, then the data of `shufflePush_0_2_2` will be corrupted.
   
   2. Duplicate stream. 
   - Say stream1 merging `shufflePush_0_1_2` wrote some data and has `isWriting=true`. It completed successfully and then sets `currentMapId` of partition_0_2 to `-1`. 
   - Now `stream1duplicate` which is also trying to merge `shufflePush_0_1_2` will be `allowedToWrite` because the `currentMapId` of partition_0_2 is `-1` and it sets `isWriting=true`. However,  we identify that it is a duplication stream and just return without modifying `currentMapId`.
   - stream2 which tries to merge `shufflePush_0_2_2` will be `allowedToWrite` because `currentMapId=-1`. It sets `currentMapId=2` and start writing.
   - If `stream1Duplicate` encounters a failure now, it has `isWriting` on and so can reset `currentMapId` of partition_0_2. This again gives a chance to another stream say stream3 to `allowedToWrite` without stream2 to complete.
   
   I have added UTs for both these cases as well with similar examples.
   @Ngone51 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #31934: [SPARK-32916][FOLLOW-UP][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-804481115


   This test failure is unrelated:
   ```
   [info] *** 1 TEST FAILED ***
   [error] Failed: Total 2998, Failed 1, Errors 0, Passed 2997, Ignored 7, Canceled 1
   [error] Failed tests:
   [error] 	org.apache.spark.storage.FallbackStorageSuite
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] asfgit closed pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #31934:
URL: https://github.com/apache/spark/pull/31934


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #31934: [SPARK-32916][FOLLOW-UP][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #31934:
URL: https://github.com/apache/spark/pull/31934#discussion_r599553997



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -639,7 +624,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
           // written to disk due to this reason. We thus decide to optimize for server
           // throughput and memory usage.
           if (deferredBufs == null) {
-            deferredBufs = new LinkedList<>();
+            deferredBufs = new ArrayList<>();

Review comment:
       Why changed to `ArrayList`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a change in pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc commented on a change in pull request #31934:
URL: https://github.com/apache/spark/pull/31934#discussion_r599761803



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -639,7 +624,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
           // written to disk due to this reason. We thus decide to optimize for server
           // throughput and memory usage.
           if (deferredBufs == null) {
-            deferredBufs = new LinkedList<>();
+            deferredBufs = new ArrayList<>();

Review comment:
       Quoting @mridulm 
   "make this ArrayList instead - LinkedList has bad perf charactestics in comparison, and is preferable only when we have removes."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc edited a comment on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc edited a comment on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805493922


   > > * If stream2Duplicate encounters a failure now, it has isWriting on and so can reset currentMapId of partition_0_2. This again gives a chance to another stream say stream3 to allowedToWrite without stream2 to complete.
   > 
   > So, it should be `stream1Duplicate` instead of `stream2Duplicate` here?
   
   Right, that was a typo. Yes, it should be `stream1Duplicate`. Thanks for pointing it out.
   I edited the example as well for others if they go through it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #31934:
URL: https://github.com/apache/spark/pull/31934#discussion_r600213006



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -738,14 +723,14 @@ public void onFailure(String streamId, Throwable throwable) throws IOException {
           Map<Integer, AppShufflePartitionInfo> shufflePartitions =
             mergeManager.partitions.get(partitionInfo.appShuffleId);
           if (shufflePartitions != null && shufflePartitions.containsKey(partitionInfo.reduceId)) {
-            logger.debug("{} shuffleId {} reduceId {} set encountered failure",
+            logger.debug("{} shuffleId {} reduceId {} encountered failure",
               partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
               partitionInfo.reduceId);
             partitionInfo.setCurrentMapIndex(-1);
-            partitionInfo.setEncounteredFailure(true);
           }
         }
       }
+      isWriting = false;

Review comment:
       Move this into the `if` condition scope?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #31934: [SPARK-32916][FOLLOW-UP][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-804415668


   @tgravescs @Ngone51 @attilapiros @mridulm @Victsm
   Please help review these bug fixes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805492402


   > * If stream2Duplicate encounters a failure now, it has isWriting on and so can reset currentMapId of partition_0_2. This again gives a chance to another stream say stream3 to allowedToWrite without stream2 to complete.
   
   So, it should be `stream1Duplicate` instead of `stream2Duplicate` here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805488580


   
   > To confirm, for the example 2, I think the first 2 steps are not necessary, right?
   @Ngone51 I think the first 2 steps are necessary because in this edge case this can only happen when a stream is trying to merge a duplicate block, which was `stream1duplicate` in my example, and fails. The problem with that is that we were setting `isWriting=true` early in such cases. So when it fails then it can unset currentMapId. 
   
   Let me know if I am missing some other cases. I can add a UTs for them as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #31934:
URL: https://github.com/apache/spark/pull/31934#discussion_r600681920



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -639,7 +624,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
           // written to disk due to this reason. We thus decide to optimize for server
           // throughput and memory usage.
           if (deferredBufs == null) {
-            deferredBufs = new LinkedList<>();
+            deferredBufs = new ArrayList<>();

Review comment:
       Could you fix Java Linter Error? GitHub Action is complaining for the following.
   ```
   src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:[33,8] (imports) UnusedImports: Unused import - java.util.LinkedList.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a change in pull request #31934: [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …

Posted by GitBox <gi...@apache.org>.
otterc commented on a change in pull request #31934:
URL: https://github.com/apache/spark/pull/31934#discussion_r600664075



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -738,14 +723,14 @@ public void onFailure(String streamId, Throwable throwable) throws IOException {
           Map<Integer, AppShufflePartitionInfo> shufflePartitions =
             mergeManager.partitions.get(partitionInfo.appShuffleId);
           if (shufflePartitions != null && shufflePartitions.containsKey(partitionInfo.reduceId)) {
-            logger.debug("{} shuffleId {} reduceId {} set encountered failure",
+            logger.debug("{} shuffleId {} reduceId {} encountered failure",
               partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
               partitionInfo.reduceId);
             partitionInfo.setCurrentMapIndex(-1);
-            partitionInfo.setEncounteredFailure(true);
           }
         }
       }
+      isWriting = false;

Review comment:
       I can move this to `if` scope and that would not change the behavior or cause any issues. The only reason I had it outside because it was consistent with where this flag is unset in `onComplete`. I understand that is a very trivial cosmetic reason so can move this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org