You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2022/11/15 02:26:48 UTC

[incubator-uniffle] branch master updated: [Feature] Support the estimated number of ShuffleServers required. (#322)

This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 70076fa3 [Feature] Support the estimated number of ShuffleServers required. (#322)
70076fa3 is described below

commit 70076fa359ed3bff445c71c732ab24d84097d1f6
Author: Xianming Lei <31...@users.noreply.github.com>
AuthorDate: Tue Nov 15 10:26:44 2022 +0800

    [Feature] Support the estimated number of ShuffleServers required. (#322)
    
    ### What changes were proposed in this pull request?
    Estimated number of ShuffleServers required.
    
    ### Why are the changes needed?
    Avoid manually specifying the number of ShuffleServers to improve user experience.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    
    ### How was this patch tested?
    UT.
    
    Co-authored-by: leixianming <le...@didiglobal.com>
---
 .../org/apache/hadoop/mapreduce/RssMRConfig.java   | 18 +++++++++-
 .../org/apache/hadoop/mapreduce/RssMRUtils.java    | 36 +++++++++++++++++++
 .../hadoop/mapreduce/v2/app/RssMRAppMaster.java    |  6 +---
 .../apache/hadoop/mapreduce/RssMRUtilsTest.java    | 42 ++++++++++++++++++++++
 .../org/apache/spark/shuffle/RssSparkConfig.java   | 17 +++++----
 .../apache/spark/shuffle/RssSparkShuffleUtils.java | 11 ++++++
 .../spark/shuffle/RssSparkShuffleUtilsTest.java    | 24 ++++++++++++-
 .../apache/spark/shuffle/RssShuffleManager.java    |  2 +-
 .../apache/spark/shuffle/RssShuffleManager.java    |  6 ++--
 .../uniffle/client/util/RssClientConfig.java       |  8 +++--
 .../org/apache/uniffle/common/util/Constants.java  |  3 ++
 docs/client_guide.md                               |  8 ++---
 .../ContinuousSelectPartitionStrategyTest.java     |  1 -
 13 files changed, 156 insertions(+), 26 deletions(-)

diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index d89b4f12..96bec024 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -162,7 +162,23 @@ public class RssMRConfig {
           MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES;
   public static final int RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE =
           RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE;
-  
+
+  public static final String RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED =
+      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED;
+  public static final boolean RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE =
+      RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE;
+
+  public static final String RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR =
+      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR;
+
+  public static final double RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE
+      = RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE;
+
+  public static final String RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER =
+      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER;
+  public static final int RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE =
+      RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE;
+
   public static final String RSS_CONF_FILE = "rss_conf.xml";
 
   public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 53026bbd..347c20b4 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -205,4 +205,40 @@ public class RssMRUtils {
         & MAX_ATTEMPT_ID;
     return (attemptId << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH)) + mapId;
   }
+
+  public static int estimateTaskConcurrency(JobConf jobConf) {
+    double dynamicFactor = jobConf.getDouble(RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR,
+        RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE);
+    double slowStart = jobConf.getDouble(Constants.MR_SLOW_START, Constants.MR_SLOW_START_DEFAULT_VALUE);
+    int mapNum = jobConf.getNumMapTasks();
+    int reduceNum = jobConf.getNumReduceTasks();
+    int mapLimit = jobConf.getInt(Constants.MR_MAP_LIMIT, Constants.MR_MAP_LIMIT_DEFAULT_VALUE);
+    int reduceLimit = jobConf.getInt(Constants.MR_REDUCE_LIMIT, Constants.MR_REDUCE_LIMIT_DEFAULT_VALUE);
+
+    int estimateMapNum = mapLimit > 0 ? Math.min(mapNum, mapLimit) : mapNum;
+    int estimateReduceNum = reduceLimit > 0 ? Math.min(reduceNum, reduceLimit) : reduceNum;
+    if (slowStart == 1) {
+      return (int) (Math.max(estimateMapNum, estimateReduceNum) * dynamicFactor);
+    } else {
+      return (int) (((1 - slowStart) * estimateMapNum + estimateReduceNum) * dynamicFactor);
+    }
+  }
+
+  public static int getRequiredShuffleServerNumber(JobConf jobConf) {
+    int requiredShuffleServerNumber = jobConf.getInt(
+        RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
+        RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE
+    );
+    boolean enabledEstimateServer = jobConf.getBoolean(
+        RssMRConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED,
+        RssMRConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE
+    );
+    if (!enabledEstimateServer || requiredShuffleServerNumber > 0) {
+      return requiredShuffleServerNumber;
+    }
+    int taskConcurrency = estimateTaskConcurrency(jobConf);
+    int taskConcurrencyPerServer = jobConf.getInt(RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER,
+        RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);
+    return (int) Math.ceil(taskConcurrency * 1.0 / taskConcurrencyPerServer);
+  }
 }
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index cd838c75..d891452d 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -190,11 +190,7 @@ public class RssMRAppMaster extends MRAppMaster {
         conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, originalAttempts + inc);
       }
       
-      int requiredAssignmentShuffleServersNum = conf.getInt(
-              RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
-              RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE
-      );
-      
+      int requiredAssignmentShuffleServersNum = RssMRUtils.getRequiredShuffleServerNumber(conf);
       // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
       long retryInterval = conf.getLong(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
               RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
index de89de3b..385693f3 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
@@ -170,4 +170,46 @@ public class RssMRUtilsTest {
     assertEquals(Integer.toString(RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE),
         conf.get(RssMRConfig.RSS_CLIENT_RETRY_MAX));
   }
+
+  @Test
+  public void testEstimateTaskConcurrency() {
+    JobConf jobConf = new JobConf();
+    jobConf.setInt("mapreduce.job.maps", 500);
+    jobConf.setInt("mapreduce.job.reduces", 20);
+    assertEquals(495, RssMRUtils.estimateTaskConcurrency(jobConf));
+
+    jobConf.setDouble(Constants.MR_SLOW_START, 1.0);
+    assertEquals(500, RssMRUtils.estimateTaskConcurrency(jobConf));
+    jobConf.setInt(Constants.MR_MAP_LIMIT, 200);
+    jobConf.setInt(Constants.MR_REDUCE_LIMIT, 200);
+    assertEquals(200, RssMRUtils.estimateTaskConcurrency(jobConf));
+
+    jobConf.setDouble("mapreduce.rss.estimate.task.concurrency.dynamic.factor", 0.5);
+    assertEquals(100, RssMRUtils.estimateTaskConcurrency(jobConf));
+  }
+
+  @Test
+  public void testGetRequiredShuffleServerNumber() {
+    JobConf jobConf = new JobConf();
+    jobConf.setInt("mapreduce.job.maps", 500);
+    jobConf.setInt("mapreduce.job.reduces", 20);
+    jobConf.setInt(RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, 10);
+    assertEquals(10, RssMRUtils.getRequiredShuffleServerNumber(jobConf));
+
+    jobConf.setBoolean(RssMRConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED, true);
+    assertEquals(10, RssMRUtils.getRequiredShuffleServerNumber(jobConf));
+
+    jobConf.unset(RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
+    assertEquals(7, RssMRUtils.getRequiredShuffleServerNumber(jobConf));
+
+    jobConf.setDouble(Constants.MR_SLOW_START, 1.0);
+    assertEquals(7, RssMRUtils.getRequiredShuffleServerNumber(jobConf));
+
+    jobConf.setInt(Constants.MR_MAP_LIMIT, 200);
+    jobConf.setInt(Constants.MR_REDUCE_LIMIT, 200);
+    assertEquals(3, RssMRUtils.getRequiredShuffleServerNumber(jobConf));
+
+    jobConf.setDouble("mapreduce.rss.estimate.task.concurrency.dynamic.factor", 0.5);
+    assertEquals(2, RssMRUtils.getRequiredShuffleServerNumber(jobConf));
+  }
 }
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 2dfca2cd..f14459ca 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -248,12 +248,17 @@ public class RssSparkConfig {
                    + " to be allocated"))
       .createWithDefault(RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE);
 
-  public static final ConfigEntry<Boolean> RSS_ESTIMATE_TASK_CONCURRENCY_ENABLED = createBooleanBuilder(
-      new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_ENABLED)
-          .doc("When the Coordinator enables rss.coordinator.select.partition.strategy,"
-                   + " this configuration item is valid and is used to estimate how many consecutive"
-                   + " PartitionRanges should be allocated to a ShuffleServer"))
-      .createWithDefault(RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DEFAULT_VALUE);
+  public static final ConfigEntry<Boolean> RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED = createBooleanBuilder(
+      new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED)
+          .doc("Whether to estimate the number of ShuffleServers to be allocated based on the number"
+                   + " of concurrent tasks."))
+      .createWithDefault(RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE);
+
+  public static final ConfigEntry<Integer> RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER = createIntegerBuilder(
+      new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX +  RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER)
+          .doc("How many tasks concurrency to allocate a ShuffleServer, you need to enable"
+                   + " spark.rss.estimate.server.assignment.enabled"))
+      .createWithDefault(RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);
 
   public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
       ImmutableSet.of(RSS_STORAGE_TYPE.key(), RSS_REMOTE_STORAGE_PATH.key());
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
index a260c9a9..2320f55b 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
@@ -161,4 +161,15 @@ public class RssSparkShuffleUtils {
     }
     return taskConcurrency;
   }
+
+  public static int getRequiredShuffleServerNumber(SparkConf sparkConf) {
+    boolean enabledEstimateServer = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED);
+    int requiredShuffleServerNumber = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
+    if (!enabledEstimateServer || requiredShuffleServerNumber > 0) {
+      return requiredShuffleServerNumber;
+    }
+    int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
+    int taskConcurrencyPerServer = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER);
+    return (int) Math.ceil(estimateTaskConcurrency * 1.0 / taskConcurrencyPerServer);
+  }
 }
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
index 28159be2..856fff8c 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
@@ -165,7 +165,6 @@ public class RssSparkShuffleUtilsTest {
     sparkConf.set(Constants.SPARK_DYNAMIC_ENABLED, "true");
     sparkConf.set(Constants.SPARK_MAX_DYNAMIC_EXECUTOR, "200");
     sparkConf.set(Constants.SPARK_MIN_DYNAMIC_EXECUTOR, "100");
-    sparkConf.set(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_ENABLED, true);
     sparkConf.set(Constants.SPARK_EXECUTOR_CORES, "2");
     int taskConcurrency;
 
@@ -188,4 +187,27 @@ public class RssSparkShuffleUtilsTest {
     taskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
     assertEquals(70, taskConcurrency);
   }
+
+  @Test
+  public void testGetRequiredShuffleServerNumber() {
+    SparkConf sparkConf = new SparkConf();
+    sparkConf.set(Constants.SPARK_DYNAMIC_ENABLED, "true");
+    sparkConf.set(Constants.SPARK_MAX_DYNAMIC_EXECUTOR, "200");
+    sparkConf.set(Constants.SPARK_MIN_DYNAMIC_EXECUTOR, "100");
+    sparkConf.set(Constants.SPARK_EXECUTOR_CORES, "4");
+
+    assertEquals(-1, RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf));
+
+    sparkConf.set(RssSparkConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED, true);
+    assertEquals(10, RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf));
+
+    sparkConf.set(Constants.SPARK_TASK_CPUS, "2");
+    assertEquals(5, RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf));
+
+    sparkConf.set(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR, 0.5);
+    assertEquals(4, RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf));
+
+    sparkConf.set(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER, 100);
+    assertEquals(3, RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf));
+  }
 }
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 774cd3ca..f195b91e 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -223,7 +223,7 @@ public class RssShuffleManager implements ShuffleManager {
     // get all register info according to coordinator's response
     Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
 
-    int requiredShuffleServerNumber = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
+    int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
 
     // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
     long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index e780cb3a..c1cdc1c0 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -277,14 +277,12 @@ public class RssShuffleManager implements ShuffleManager {
 
     Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
 
-    int requiredShuffleServerNumber = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
+    int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
 
     // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
     long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
     int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
-    boolean enabledEstimateTaskConcurrency = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_ENABLED);
-    int estimateTaskConcurrency = enabledEstimateTaskConcurrency
-                                  ? RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf) : -1;
+    int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
     Map<Integer, List<ShuffleServerInfo>> partitionToServers;
     try {
       partitionToServers = RetryUtils.retry(() -> {
diff --git a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
index 160f5d3d..caf025ce 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
@@ -79,6 +79,10 @@ public class RssClientConfig {
       "rss.estimate.task.concurrency.dynamic.factor";
   public static final double RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE = 1.0;
 
-  public static final String RSS_ESTIMATE_TASK_CONCURRENCY_ENABLED = "rss.estimate.task.concurrency.enabled";
-  public static final boolean RSS_ESTIMATE_TASK_CONCURRENCY_DEFAULT_VALUE = false;
+  public static final String RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED = "rss.estimate.server.assignment.enabled";
+  public static final boolean RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE = false;
+
+  public static final String RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER = "rss.estimate.task.concurrency.per.server";
+  public static final int RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE = 80;
+
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 4fd2f12a..1921c312 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -61,5 +61,8 @@ public class Constants {
   public static final String MR_REDUCES = "mapreduce.job.reduces";
   public static final String MR_MAP_LIMIT = "mapreduce.job.running.map.limit";
   public static final String MR_REDUCE_LIMIT = "mapreduce.job.running.reduce.limit";
+  public static int MR_MAP_LIMIT_DEFAULT_VALUE = 0;
+  public static int MR_REDUCE_LIMIT_DEFAULT_VALUE = 0;
   public static final String MR_SLOW_START = "mapreduce.job.reduce.slowstart.completedmaps";
+  public static double MR_SLOW_START_DEFAULT_VALUE = 0.05;
 }
diff --git a/docs/client_guide.md b/docs/client_guide.md
index 71b9f5e1..c2884fa0 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -73,9 +73,6 @@ and Continuous partition assignment mechanism.
       ```bash
         # Default value is ROUND, it will poll to allocate partitions to ShuffleServer
         rss.coordinator.select.partition.strategy CONTINUOUS
-    
-        # Default value is false, the CONTINUOUS allocation mechanism relies on enabling this configuration, and estimates how many consecutive allocations should be allocated based on task concurrency
-        --conf spark.rss.estimate.task.concurrency.enabled=true
         
         # Default value is 1.0, used to estimate task concurrency, how likely is this part of the resource between spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors to be allocated
         --conf spark.rss.estimate.task.concurrency.dynamic.factor=1.0
@@ -118,8 +115,9 @@ These configurations are shared by all types of clients.
 |<client_type>.rss.client.io.compression.codec|lz4|The compression codec is used to compress the shuffle data. Default codec is `lz4`. Other options are`ZSTD` and `SNAPPY`.|
 |<client_type>.rss.client.io.compression.zstd.level|3|The zstd compression level, the default level is 3|
 |<client_type>.rss.client.shuffle.data.distribution.type|NORMAL|The type of partition shuffle data distribution, including normal and local_order. The default value is normal. Now this config is only valid in Spark3.x|
-|<client_type>.rss.estimate.task.concurrency.enabled|false|Only works in spark3, whether to enable task concurrency estimation, only valid if rss.coordinator.select.partition.strategy is CONTINUOUS, this feature can improve performance in AQE scenarios.|
-|<client_type>.rss.estimate.task.concurrency.dynamic.factor|1.0|Between 0 and 1, used to estimate task concurrency, how likely is this part of the resource between spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors to be allocated. Only works in spark3, <client_type>.rss.estimate.task.concurrency.enabled=true, and Coordinator's rss.coordinator.select.partition.strategy is CONTINUOUS.|
+|<client_type>.rss.estimate.task.concurrency.dynamic.factor|1.0|Between 0 and 1, used to estimate task concurrency, when the client is spark, it represents how likely is this part of the resource between spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors to be allocated, when the client is mr, it represents how likely the resources of map and reduce are satisfied. Effective when <client_type>.rss.estimate.server.assignment.enabled=true or Coordinator's rss.coor [...]
+|<client_type>.rss.estimate.server.assignment.enabled|false|Support mr and spark, whether to enable estimation of the number of ShuffleServers that need to be allocated based on the number of concurrent tasks.|
+|<client_type>.rss.estimate.task.concurrency.per.server|80|It takes effect when rss.estimate.server.assignment.enabled=true, how many tasks are concurrently assigned to a ShuffleServer.|
 Notice:
 
 1. `<client_type>` should be `spark` or `mapreduce`
diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
index 02a61012..c7c537a7 100644
--- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
+++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
@@ -131,7 +131,6 @@ public class ContinuousSelectPartitionStrategyTest extends SparkIntegrationTestB
     sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA.key(), String.valueOf(replicateWrite));
     sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA_WRITE.key(), String.valueOf(replicateWrite));
     sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA_READ.key(), String.valueOf(replicateRead));
-    sparkConf.set(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_ENABLED, true);
     sparkConf.set("spark.shuffle.manager",
         "org.apache.uniffle.test.GetShuffleReportForMultiPartTest$RssShuffleManagerWrapper");
   }