You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/26 16:13:23 UTC

[spark] branch master updated: [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 33f532a  [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator
33f532a is described below

commit 33f532a9f201fb9c7895d685b3dce82cf042dc61
Author: yi.wu <yi...@databricks.com>
AuthorDate: Thu Mar 26 09:11:13 2020 -0700

    [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator
    
    ### What changes were proposed in this pull request?
    
    Fix incorrect log of `cureRequestSize`.
    
    ### Why are the changes needed?
    
    In batch mode, `curRequestSize` can be the total size of several block groups. And each group should have its own request size instead of using the total size.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    It's only affect log.
    
    Closes #28028 from Ngone51/fix_curRequestSize.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index f1a7d88..404e055 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -329,9 +329,8 @@ final class ShuffleBlockFetcherIterator(
 
   private def createFetchRequest(
       blocks: Seq[FetchBlockInfo],
-      address: BlockManagerId,
-      curRequestSize: Long): FetchRequest = {
-    logDebug(s"Creating fetch request of $curRequestSize at $address "
+      address: BlockManagerId): FetchRequest = {
+    logDebug(s"Creating fetch request of ${blocks.map(_.size).sum} at $address "
       + s"with ${blocks.size} blocks")
     FetchRequest(address, blocks)
   }
@@ -339,17 +338,16 @@ final class ShuffleBlockFetcherIterator(
   private def createFetchRequests(
       curBlocks: Seq[FetchBlockInfo],
       address: BlockManagerId,
-      curRequestSize: Long,
       isLast: Boolean,
       collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] = {
     val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks)
     var retBlocks = Seq.empty[FetchBlockInfo]
     if (mergedBlocks.length <= maxBlocksInFlightPerAddress) {
-      collectedRemoteRequests += createFetchRequest(mergedBlocks, address, curRequestSize)
+      collectedRemoteRequests += createFetchRequest(mergedBlocks, address)
     } else {
       mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks =>
         if (blocks.length == maxBlocksInFlightPerAddress || isLast) {
-          collectedRemoteRequests += createFetchRequest(blocks, address, curRequestSize)
+          collectedRemoteRequests += createFetchRequest(blocks, address)
         } else {
           // The last group does not exceed `maxBlocksInFlightPerAddress`. Put it back
           // to `curBlocks`.
@@ -377,14 +375,14 @@ final class ShuffleBlockFetcherIterator(
       // For batch fetch, the actual block in flight should count for merged block.
       val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress
       if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
-        curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = false,
+        curBlocks = createFetchRequests(curBlocks, address, isLast = false,
           collectedRemoteRequests).to[ArrayBuffer]
         curRequestSize = curBlocks.map(_.size).sum
       }
     }
     // Add in the final request
     if (curBlocks.nonEmpty) {
-      curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = true,
+      curBlocks = createFetchRequests(curBlocks, address, isLast = true,
         collectedRemoteRequests).to[ArrayBuffer]
       curRequestSize = curBlocks.map(_.size).sum
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org