You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2021/08/06 14:48:09 UTC

[spark] branch master updated: [SPARK-36423][SHUFFLE] Randomize order of blocks in a push request to improve block merge ratio for push-based shuffle

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e72951  [SPARK-36423][SHUFFLE] Randomize order of blocks in a push request to improve block merge ratio for push-based shuffle
6e72951 is described below

commit 6e729515fd2bb228afed964b50f0d02329684934
Author: Min Shen <ms...@linkedin.com>
AuthorDate: Fri Aug 6 09:47:42 2021 -0500

    [SPARK-36423][SHUFFLE] Randomize order of blocks in a push request to improve block merge ratio for push-based shuffle
    
    ### What changes were proposed in this pull request?
    
    On the client side, we are currently randomizing the order of push requests before processing each request. In addition we can further randomize the order of blocks within each push request before pushing them.
    In our benchmark, this has resulted in a 60%-70% reduction of blocks that fail to be merged due to bock collision (the existing block merge ratio is already pretty good in general, and this further improves it).
    
    ### Why are the changes needed?
    
    Improve block merge ratio for push-based shuffle
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Straightforward small change, no additional test needed.
    
    Closes #33649 from Victsm/SPARK-36423.
    
    Lead-authored-by: Min Shen <ms...@linkedin.com>
    Co-authored-by: Min Shen <vi...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala  | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index 56f915b..ecaa4f0 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -242,10 +242,16 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
         handleResult(PushResult(blockId, exception))
       }
     }
+    // In addition to randomizing the order of the push requests, further randomize the order
+    // of blocks within the push request to further reduce the likelihood of shuffle server side
+    // collision of pushed blocks. This does not increase the cost of reading unmerged shuffle
+    // files on the executor side, because we are still reading MB-size chunks and only randomize
+    // the in-memory sliced buffers post reading.
+    val (blockPushIds, blockPushBuffers) = Utils.randomize(blockIds.zip(
+      sliceReqBufferIntoBlockBuffers(request.reqBuffer, request.blocks.map(_._2)))).unzip
     SparkEnv.get.blockManager.blockStoreClient.pushBlocks(
-      address.host, address.port, blockIds.toArray,
-      sliceReqBufferIntoBlockBuffers(request.reqBuffer, request.blocks.map(_._2)),
-      blockPushListener)
+      address.host, address.port, blockPushIds.toArray,
+      blockPushBuffers.toArray, blockPushListener)
   }
 
   /**

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org