You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/26 16:13:23 UTC
[spark] branch master updated: [SPARK-31259][CORE] Fix log message
about fetch request size in ShuffleBlockFetcherIterator
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 33f532a [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator
33f532a is described below
commit 33f532a9f201fb9c7895d685b3dce82cf042dc61
Author: yi.wu <yi...@databricks.com>
AuthorDate: Thu Mar 26 09:11:13 2020 -0700
[SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator
### What changes were proposed in this pull request?
Fix incorrect log of `cureRequestSize`.
### Why are the changes needed?
In batch mode, `curRequestSize` can be the total size of several block groups. And each group should have its own request size instead of using the total size.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
It's only affect log.
Closes #28028 from Ngone51/fix_curRequestSize.
Authored-by: yi.wu <yi...@databricks.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index f1a7d88..404e055 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -329,9 +329,8 @@ final class ShuffleBlockFetcherIterator(
private def createFetchRequest(
blocks: Seq[FetchBlockInfo],
- address: BlockManagerId,
- curRequestSize: Long): FetchRequest = {
- logDebug(s"Creating fetch request of $curRequestSize at $address "
+ address: BlockManagerId): FetchRequest = {
+ logDebug(s"Creating fetch request of ${blocks.map(_.size).sum} at $address "
+ s"with ${blocks.size} blocks")
FetchRequest(address, blocks)
}
@@ -339,17 +338,16 @@ final class ShuffleBlockFetcherIterator(
private def createFetchRequests(
curBlocks: Seq[FetchBlockInfo],
address: BlockManagerId,
- curRequestSize: Long,
isLast: Boolean,
collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] = {
val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks)
var retBlocks = Seq.empty[FetchBlockInfo]
if (mergedBlocks.length <= maxBlocksInFlightPerAddress) {
- collectedRemoteRequests += createFetchRequest(mergedBlocks, address, curRequestSize)
+ collectedRemoteRequests += createFetchRequest(mergedBlocks, address)
} else {
mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks =>
if (blocks.length == maxBlocksInFlightPerAddress || isLast) {
- collectedRemoteRequests += createFetchRequest(blocks, address, curRequestSize)
+ collectedRemoteRequests += createFetchRequest(blocks, address)
} else {
// The last group does not exceed `maxBlocksInFlightPerAddress`. Put it back
// to `curBlocks`.
@@ -377,14 +375,14 @@ final class ShuffleBlockFetcherIterator(
// For batch fetch, the actual block in flight should count for merged block.
val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress
if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
- curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = false,
+ curBlocks = createFetchRequests(curBlocks, address, isLast = false,
collectedRemoteRequests).to[ArrayBuffer]
curRequestSize = curBlocks.map(_.size).sum
}
}
// Add in the final request
if (curBlocks.nonEmpty) {
- curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = true,
+ curBlocks = createFetchRequests(curBlocks, address, isLast = true,
collectedRemoteRequests).to[ArrayBuffer]
curRequestSize = curBlocks.map(_.size).sum
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org