You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/10/25 04:47:15 UTC
[spark] branch master updated: [SPARK-37101][CORE] In class
ShuffleBlockPusher, use config instead of key
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new adf9b64 [SPARK-37101][CORE] In class ShuffleBlockPusher, use config instead of key
adf9b64 is described below
commit adf9b64c0be8e6e5bc6042eaaecd53518fbc5e25
Author: jinhai <ji...@gmail.com>
AuthorDate: Mon Oct 25 13:46:42 2021 +0900
[SPARK-37101][CORE] In class ShuffleBlockPusher, use config instead of key
### What changes were proposed in this pull request?
In class ShuffleBlockPusher, We can use config.REDUCER_MAX_SIZE_IN_FLIGHT and config.REDUCER_MAX_REQS_IN_FLIGHT instead of "spark.reducer.maxSizeInFlight" and "spark.reducer.maxReqsInFlight"
### Why are the changes needed?
Unified config instead of hard coding
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing test
Closes #34372 from manbuyun/SPARK-37101.
Lead-authored-by: jinhai <ji...@gmail.com>
Co-authored-by: 漫步云端 <ji...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index 50f9c8c..8790371 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -50,9 +50,8 @@ import org.apache.spark.util.{ThreadUtils, Utils}
private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
private[this] val maxBlockSizeToPush = conf.get(SHUFFLE_MAX_BLOCK_SIZE_TO_PUSH)
private[this] val maxBlockBatchSize = conf.get(SHUFFLE_MAX_BLOCK_BATCH_SIZE_FOR_PUSH)
- private[this] val maxBytesInFlight =
- conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024
- private[this] val maxReqsInFlight = conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue)
+ private[this] val maxBytesInFlight = conf.get(REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024
+ private[this] val maxReqsInFlight = conf.get(REDUCER_MAX_REQS_IN_FLIGHT)
private[this] val maxBlocksInFlightPerAddress = conf.get(REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS)
private[this] var bytesInFlight = 0L
private[this] var reqsInFlight = 0
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org