You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/10/25 04:47:15 UTC

[spark] branch master updated: [SPARK-37101][CORE] In class ShuffleBlockPusher, use config instead of key

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new adf9b64  [SPARK-37101][CORE] In class ShuffleBlockPusher, use config instead of key
adf9b64 is described below

commit adf9b64c0be8e6e5bc6042eaaecd53518fbc5e25
Author: jinhai <ji...@gmail.com>
AuthorDate: Mon Oct 25 13:46:42 2021 +0900

    [SPARK-37101][CORE] In class ShuffleBlockPusher, use config instead of key
    
    ### What changes were proposed in this pull request?
    
    In class ShuffleBlockPusher, We can use config.REDUCER_MAX_SIZE_IN_FLIGHT and config.REDUCER_MAX_REQS_IN_FLIGHT instead of "spark.reducer.maxSizeInFlight" and "spark.reducer.maxReqsInFlight"
    
    ### Why are the changes needed?
    
    Unified config instead of hard coding
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing test
    
    Closes #34372 from manbuyun/SPARK-37101.
    
    Lead-authored-by: jinhai <ji...@gmail.com>
    Co-authored-by: 漫步云端 <ji...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index 50f9c8c..8790371 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -50,9 +50,8 @@ import org.apache.spark.util.{ThreadUtils, Utils}
 private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
   private[this] val maxBlockSizeToPush = conf.get(SHUFFLE_MAX_BLOCK_SIZE_TO_PUSH)
   private[this] val maxBlockBatchSize = conf.get(SHUFFLE_MAX_BLOCK_BATCH_SIZE_FOR_PUSH)
-  private[this] val maxBytesInFlight =
-    conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024
-  private[this] val maxReqsInFlight = conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue)
+  private[this] val maxBytesInFlight = conf.get(REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024
+  private[this] val maxReqsInFlight = conf.get(REDUCER_MAX_REQS_IN_FLIGHT)
   private[this] val maxBlocksInFlightPerAddress = conf.get(REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS)
   private[this] var bytesInFlight = 0L
   private[this] var reqsInFlight = 0

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org