You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2024/03/12 11:36:16 UTC

(incubator-uniffle) branch master updated: [#731][FOLLOWUP] feat(Spark): Configure blockIdLayout for Spark based on max partitions (#1566)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 142e3ff1a [#731][FOLLOWUP] feat(Spark): Configure blockIdLayout for Spark based on max partitions (#1566)
142e3ff1a is described below

commit 142e3ff1a706a0dacae867432a949d70858ee18a
Author: Enrico Minack <gi...@enrico.minack.dev>
AuthorDate: Tue Mar 12 12:36:11 2024 +0100

    [#731][FOLLOWUP] feat(Spark): Configure blockIdLayout for Spark based on max partitions (#1566)
    
    ### What changes were proposed in this pull request?
    The configuration of the block id layout for Spark2 and Spark3 can be simplified by only providing the maximal number of partitions. Increments `SHUFFLE_SERVER_VERSION` as the "new" Spark client requires to connect to a "new" shuffle server.
    
    ### Why are the changes needed?
    Currently, optimally configuring the block id layout for Spark is quite complex: https://github.com/apache/incubator-uniffle/pull/1528/files#diff-09ce7eaa98815d62ca00b2a8b0a45b0a922b047014c1f91dc17081b3fef8e7a8R106-R112
    
    Three values have to be provided: the number of bits for sequence number, partition id and task attempt id. The task attempt id has to provide more bits than the partition id, in the sense that the maximal number of attempts can be stored. This requires to also account for speculative execution.
    
    The RssShuffleManager can do the computation and derive the optimal block id layout configuration from the maximal number of partitions only.
    
    ### Does this PR introduce _any_ user-facing change?
    Adds optional configuration `spark.rss.blockId.maxPartitions`.
    
    ### How was this patch tested?
    Unit and integration tests.
---
 .../org/apache/spark/shuffle/RssSparkConfig.java   |   9 +
 .../shuffle/manager/RssShuffleManagerBase.java     | 224 +++++++++++++-
 .../shuffle/manager/RssShuffleManagerBaseTest.java | 344 +++++++++++++++++++++
 .../apache/spark/shuffle/RssShuffleManager.java    |  18 +-
 .../apache/spark/shuffle/RssShuffleManager.java    |  27 +-
 .../apache/uniffle/common/util/BlockIdLayout.java  |   2 +
 .../org/apache/uniffle/common/util/Constants.java  |   2 +-
 docs/client_guide/spark_client_guide.md            |  23 +-
 .../apache/uniffle/test/ShuffleServerGrpcTest.java |  15 -
 .../apache/uniffle/test/RssShuffleManagerTest.java |   4 +-
 .../uniffle/server/ShuffleServerGrpcService.java   |  28 +-
 11 files changed, 633 insertions(+), 63 deletions(-)

diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index f0f25816c..ee1278c87 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -373,6 +373,15 @@ public class RssSparkConfig {
                   .doc("Whether to enable the resubmit stage."))
           .createWithDefault(false);
 
+  public static final ConfigEntry<Integer> RSS_MAX_PARTITIONS =
+      createIntegerBuilder(
+              new ConfigBuilder("spark.rss.blockId.maxPartitions")
+                  .doc(
+                      "Sets the maximum number of partitions to be supported by block ids. "
+                          + "This determines the bits reserved in block ids for the "
+                          + "sequence number, the partition id and the task attempt id."))
+          .createWithDefault(1048576);
+
   // spark2 doesn't have this key defined
   public static final String SPARK_SHUFFLE_COMPRESS_KEY = "spark.shuffle.compress";
 
diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 14c99b8b8..c75207b89 100644
--- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -19,10 +19,14 @@ package org.apache.uniffle.shuffle.manager;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.MapOutputTracker;
@@ -38,6 +42,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.config.ConfigOption;
+import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 
@@ -50,6 +56,213 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac
   private Method unregisterAllMapOutputMethod;
   private Method registerShuffleMethod;
 
+  /** See static overload of this method. */
+  public abstract void configureBlockIdLayout(SparkConf sparkConf, RssConf rssConf);
+
+  /**
+   * Derives block id layout config from maximum number of allowed partitions. This value can be set
+   * in either SparkConf or RssConf via RssSparkConfig.RSS_MAX_PARTITIONS, where SparkConf has
+   * precedence.
+   *
+   * <p>Computes the number of required bits for partition id and task attempt id and reserves
+   * remaining bits for sequence number. Adds RssClientConf.BLOCKID_SEQUENCE_NO_BITS,
+   * RssClientConf.BLOCKID_PARTITION_ID_BITS, and RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS to the
+   * given RssConf and adds them prefixed with "spark." to the given SparkConf.
+   *
+   * <p>If RssSparkConfig.RSS_MAX_PARTITIONS is not set, given values for
+   * RssClientConf.BLOCKID_SEQUENCE_NO_BITS, RssClientConf.BLOCKID_PARTITION_ID_BITS, and
+   * RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS are copied
+   *
+   * <p>Then, BlockIdLayout can consistently be created from both configs:
+   *
+   * <p>BlockIdLayout.from(rssConf) BlockIdLayout.from(RssSparkConfig.toRssConf(sparkConf))
+   *
+   * @param sparkConf Spark config providing max partitions
+   * @param rssConf Rss config to amend
+   * @param maxFailures Spark max failures
+   * @param speculation Spark speculative execution
+   */
+  @VisibleForTesting
+  protected static void configureBlockIdLayout(
+      SparkConf sparkConf, RssConf rssConf, int maxFailures, boolean speculation) {
+    if (sparkConf.contains(RssSparkConfig.RSS_MAX_PARTITIONS.key())) {
+      configureBlockIdLayoutFromMaxPartitions(sparkConf, rssConf, maxFailures, speculation);
+    } else {
+      configureBlockIdLayoutFromLayoutConfig(sparkConf, rssConf, maxFailures, speculation);
+    }
+  }
+
+  private static void configureBlockIdLayoutFromMaxPartitions(
+      SparkConf sparkConf, RssConf rssConf, int maxFailures, boolean speculation) {
+    int maxPartitions =
+        sparkConf.getInt(
+            RssSparkConfig.RSS_MAX_PARTITIONS.key(),
+            RssSparkConfig.RSS_MAX_PARTITIONS.defaultValue().get());
+    if (maxPartitions <= 1) {
+      throw new IllegalArgumentException(
+          "Value of "
+              + RssSparkConfig.RSS_MAX_PARTITIONS.key()
+              + " must be larger than 1: "
+              + maxPartitions);
+    }
+
+    int attemptIdBits = getAttemptIdBits(getMaxAttemptNo(maxFailures, speculation));
+    int partitionIdBits = 32 - Integer.numberOfLeadingZeros(maxPartitions - 1); // [1..31]
+    int taskAttemptIdBits = partitionIdBits + attemptIdBits; // [1+attemptIdBits..31+attemptIdBits]
+    int sequenceNoBits = 63 - partitionIdBits - taskAttemptIdBits; // [1-attemptIdBits..61]
+
+    if (taskAttemptIdBits > 31) {
+      throw new IllegalArgumentException(
+          "Cannot support "
+              + RssSparkConfig.RSS_MAX_PARTITIONS.key()
+              + "="
+              + maxPartitions
+              + " partitions, "
+              + "as this would require to reserve more than 31 bits "
+              + "in the block id for task attempt ids. "
+              + "With spark.maxFailures="
+              + maxFailures
+              + " and spark.speculation="
+              + (speculation ? "true" : "false")
+              + " at most "
+              + (1 << (31 - attemptIdBits))
+              + " partitions can be supported.");
+    }
+
+    // we have to cap the sequence number bits at 31 bits,
+    // because BlockIdLayout imposes an upper bound of 31 bits
+    // which is fine as this allows for over 2bn sequence ids
+    if (sequenceNoBits > 31) {
+      // move spare bits (bits over 31) from sequence number to partition id and task attempt id
+      int spareBits = sequenceNoBits - 31;
+
+      // make spareBits even, so we add same number of bits to partitionIdBits and taskAttemptIdBits
+      spareBits += spareBits % 2;
+
+      // move spare bits over
+      partitionIdBits += spareBits / 2;
+      taskAttemptIdBits += spareBits / 2;
+      maxPartitions = (1 << partitionIdBits);
+
+      // log with original sequenceNoBits
+      if (LOG.isInfoEnabled()) {
+        LOG.info(
+            "Increasing "
+                + RssSparkConfig.RSS_MAX_PARTITIONS.key()
+                + " to "
+                + maxPartitions
+                + ", "
+                + "otherwise we would have to support 2^"
+                + sequenceNoBits
+                + " (more than 2^31) sequence numbers.");
+      }
+
+      // remove spare bits
+      sequenceNoBits -= spareBits;
+
+      // propagate the change value back to SparkConf
+      sparkConf.set(RssSparkConfig.RSS_MAX_PARTITIONS.key(), String.valueOf(maxPartitions));
+    }
+
+    // set block id layout config in RssConf
+    rssConf.set(RssClientConf.BLOCKID_SEQUENCE_NO_BITS, sequenceNoBits);
+    rssConf.set(RssClientConf.BLOCKID_PARTITION_ID_BITS, partitionIdBits);
+    rssConf.set(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS, taskAttemptIdBits);
+
+    // materialize these RssConf settings in sparkConf as well
+    // so that RssSparkConfig.toRssConf(sparkConf) provides this configuration
+    sparkConf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key(),
+        String.valueOf(sequenceNoBits));
+    sparkConf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_PARTITION_ID_BITS.key(),
+        String.valueOf(partitionIdBits));
+    sparkConf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key(),
+        String.valueOf(taskAttemptIdBits));
+  }
+
+  private static void configureBlockIdLayoutFromLayoutConfig(
+      SparkConf sparkConf, RssConf rssConf, int maxFailures, boolean speculation) {
+    String sparkPrefix = RssSparkConfig.SPARK_RSS_CONFIG_PREFIX;
+    String sparkSeqNoBitsKey = sparkPrefix + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key();
+    String sparkPartIdBitsKey = sparkPrefix + RssClientConf.BLOCKID_PARTITION_ID_BITS.key();
+    String sparkTaskIdBitsKey = sparkPrefix + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key();
+
+    // if one bit field is configured, all three must be given
+    List<String> sparkKeys =
+        Arrays.asList(sparkSeqNoBitsKey, sparkPartIdBitsKey, sparkTaskIdBitsKey);
+    if (sparkKeys.stream().anyMatch(sparkConf::contains)
+        && !sparkKeys.stream().allMatch(sparkConf::contains)) {
+      String allKeys = sparkKeys.stream().collect(Collectors.joining(", "));
+      String existingKeys =
+          Arrays.stream(sparkConf.getAll())
+              .map(t -> t._1)
+              .filter(sparkKeys.stream().collect(Collectors.toSet())::contains)
+              .collect(Collectors.joining(", "));
+      throw new IllegalArgumentException(
+          "All block id bit config keys must be provided ("
+              + allKeys
+              + "), not just a sub-set: "
+              + existingKeys);
+    }
+
+    // if one bit field is configured, all three must be given
+    List<ConfigOption<Integer>> rssKeys =
+        Arrays.asList(
+            RssClientConf.BLOCKID_SEQUENCE_NO_BITS,
+            RssClientConf.BLOCKID_PARTITION_ID_BITS,
+            RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS);
+    if (rssKeys.stream().anyMatch(rssConf::contains)
+        && !rssKeys.stream().allMatch(rssConf::contains)) {
+      String allKeys = rssKeys.stream().map(ConfigOption::key).collect(Collectors.joining(", "));
+      String existingKeys =
+          rssConf.getKeySet().stream()
+              .filter(rssKeys.stream().map(ConfigOption::key).collect(Collectors.toSet())::contains)
+              .collect(Collectors.joining(", "));
+      throw new IllegalArgumentException(
+          "All block id bit config keys must be provided ("
+              + allKeys
+              + "), not just a sub-set: "
+              + existingKeys);
+    }
+
+    if (sparkKeys.stream().allMatch(sparkConf::contains)) {
+      rssConf.set(RssClientConf.BLOCKID_SEQUENCE_NO_BITS, sparkConf.getInt(sparkSeqNoBitsKey, 0));
+      rssConf.set(RssClientConf.BLOCKID_PARTITION_ID_BITS, sparkConf.getInt(sparkPartIdBitsKey, 0));
+      rssConf.set(
+          RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS, sparkConf.getInt(sparkTaskIdBitsKey, 0));
+    } else if (rssKeys.stream().allMatch(rssConf::contains)) {
+      sparkConf.set(sparkSeqNoBitsKey, rssConf.getValue(RssClientConf.BLOCKID_SEQUENCE_NO_BITS));
+      sparkConf.set(sparkPartIdBitsKey, rssConf.getValue(RssClientConf.BLOCKID_PARTITION_ID_BITS));
+      sparkConf.set(
+          sparkTaskIdBitsKey, rssConf.getValue(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS));
+    } else {
+      // use default max partitions
+      sparkConf.set(
+          RssSparkConfig.RSS_MAX_PARTITIONS.key(),
+          RssSparkConfig.RSS_MAX_PARTITIONS.defaultValueString());
+      configureBlockIdLayoutFromMaxPartitions(sparkConf, rssConf, maxFailures, speculation);
+    }
+  }
+
+  protected static int getMaxAttemptNo(int maxFailures, boolean speculation) {
+    // attempt number is zero based: 0, 1, …, maxFailures-1
+    // max maxFailures < 1 is not allowed but for safety, we interpret that as maxFailures == 1
+    int maxAttemptNo = maxFailures < 1 ? 0 : maxFailures - 1;
+
+    // with speculative execution enabled we could observe +1 attempts
+    if (speculation) {
+      maxAttemptNo++;
+    }
+
+    return maxAttemptNo;
+  }
+
+  protected static int getAttemptIdBits(int maxAttemptNo) {
+    return 32 - Integer.numberOfLeadingZeros(maxAttemptNo);
+  }
+
   /** See static overload of this method. */
   public abstract long getTaskAttemptIdForBlockId(int mapIndex, int attemptNo);
 
@@ -68,14 +281,8 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac
    */
   protected static long getTaskAttemptIdForBlockId(
       int mapIndex, int attemptNo, int maxFailures, boolean speculation, int maxTaskAttemptIdBits) {
-    // attempt number is zero based: 0, 1, …, maxFailures-1
-    // max maxFailures < 1 is not allowed but for safety, we interpret that as maxFailures == 1
-    int maxAttemptNo = maxFailures < 1 ? 0 : maxFailures - 1;
-
-    // with speculative execution enabled we could observe +1 attempts
-    if (speculation) {
-      maxAttemptNo++;
-    }
+    int maxAttemptNo = getMaxAttemptNo(maxFailures, speculation);
+    int attemptBits = getAttemptIdBits(maxAttemptNo);
 
     if (attemptNo > maxAttemptNo) {
       // this should never happen, if it does, our assumptions are wrong,
@@ -89,7 +296,6 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac
               + ".");
     }
 
-    int attemptBits = 32 - Integer.numberOfLeadingZeros(maxAttemptNo);
     int mapIndexBits = 32 - Integer.numberOfLeadingZeros(mapIndex);
     if (mapIndexBits + attemptBits > maxTaskAttemptIdBits) {
       throw new RssException(
diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
index 9440a69e7..440c8fb8c 100644
--- a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
+++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
@@ -18,15 +18,24 @@
 package org.apache.uniffle.shuffle.manager;
 
 import java.util.Arrays;
+import java.util.stream.Stream;
 
 import org.apache.spark.SparkConf;
+import org.apache.spark.shuffle.RssSparkConfig;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 
 import static org.apache.uniffle.shuffle.manager.RssShuffleManagerBase.getTaskAttemptIdForBlockId;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -45,6 +54,341 @@ public class RssShuffleManagerBaseTest {
     assertEquals(remoteStorageInfo.getConfItems().get("fs.defaultFs"), "hdfs://rbf-xxx/foo");
   }
 
+  private static Stream<Arguments> testConfigureBlockIdLayoutSource() {
+    // test arguments are
+    // - maxPartitions
+    // - maxFailure
+    // - speculation
+    // - expected maxPartitions
+    // - expected sequence number bits
+    // - expected partition id bits
+    // - expected task attempt id bits
+    return Stream.of(
+        // default config values
+        Arguments.of(null, 4, false, "1048576", 21, 20, 22),
+
+        // without speculation
+        Arguments.of("2", 0, false, "65536", 31, 16, 16),
+        Arguments.of("2147483647", 0, false, "2147483647", 1, 31, 31),
+        Arguments.of("1024", 3, false, "32768", 31, 15, 17),
+        Arguments.of("131072", 3, false, "131072", 27, 17, 19),
+        Arguments.of("1048576", 3, false, "1048576", 21, 20, 22),
+        Arguments.of("1048577", 3, false, "1048577", 19, 21, 23),
+        Arguments.of("1024", 4, false, "32768", 31, 15, 17),
+        Arguments.of("131072", 4, false, "131072", 27, 17, 19),
+        Arguments.of("1048576", 4, false, "1048576", 21, 20, 22),
+        Arguments.of("1048577", 4, false, "1048577", 19, 21, 23),
+        Arguments.of("1024", 5, false, "32768", 30, 15, 18),
+        Arguments.of("131072", 5, false, "131072", 26, 17, 20),
+        Arguments.of("1048576", 5, false, "1048576", 20, 20, 23),
+        Arguments.of("1048577", 5, false, "1048577", 18, 21, 24),
+        Arguments.of("2", 1073741824, false, "2", 31, 1, 31),
+
+        // with speculation
+        Arguments.of("2", 0, true, "65536", 30, 16, 17),
+        Arguments.of("1073741824", 0, true, "1073741824", 2, 30, 31),
+        Arguments.of("1024", 3, true, "32768", 31, 15, 17),
+        Arguments.of("131072", 3, true, "131072", 27, 17, 19),
+        Arguments.of("1048576", 3, true, "1048576", 21, 20, 22),
+        Arguments.of("1048577", 3, true, "1048577", 19, 21, 23),
+        Arguments.of("1024", 4, true, "32768", 30, 15, 18),
+        Arguments.of("131072", 4, true, "131072", 26, 17, 20),
+        Arguments.of("1048576", 4, true, "1048576", 20, 20, 23),
+        Arguments.of("1048577", 4, true, "1048577", 18, 21, 24),
+        Arguments.of("1024", 5, true, "32768", 30, 15, 18),
+        Arguments.of("131072", 5, true, "131072", 26, 17, 20),
+        Arguments.of("1048576", 5, true, "1048576", 20, 20, 23),
+        Arguments.of("1048577", 5, true, "1048577", 18, 21, 24),
+        Arguments.of("2", 1073741823, true, "2", 31, 1, 31));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testConfigureBlockIdLayoutSource")
+  public void testConfigureBlockIdLayout(
+      String setMaxPartitions,
+      Integer setMaxFailure,
+      Boolean setSpeculation,
+      String expectedMaxPartitions,
+      int expectedSequenceNoBits,
+      int expectedPartitionIdBits,
+      int expectedTaskAttemptIdBits) {
+    SparkConf sparkConf = new SparkConf();
+    if (setMaxPartitions != null) {
+      sparkConf.set(RssSparkConfig.RSS_MAX_PARTITIONS.key(), setMaxPartitions);
+    }
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+
+    RssShuffleManagerBase.configureBlockIdLayout(sparkConf, rssConf, setMaxFailure, setSpeculation);
+
+    if (expectedMaxPartitions == null) {
+      assertFalse(sparkConf.contains(RssSparkConfig.RSS_MAX_PARTITIONS.key()));
+    } else {
+      assertTrue(sparkConf.contains(RssSparkConfig.RSS_MAX_PARTITIONS.key()));
+      assertEquals(expectedMaxPartitions, sparkConf.get(RssSparkConfig.RSS_MAX_PARTITIONS.key()));
+    }
+
+    String key;
+    key = RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key();
+    assertTrue(sparkConf.contains(key));
+    assertEquals(String.valueOf(expectedSequenceNoBits), sparkConf.get(key));
+    assertEquals(expectedSequenceNoBits, rssConf.get(RssClientConf.BLOCKID_SEQUENCE_NO_BITS));
+
+    key = RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_PARTITION_ID_BITS.key();
+    assertTrue(sparkConf.contains(key));
+    assertEquals(String.valueOf(expectedPartitionIdBits), sparkConf.get(key));
+    assertEquals(expectedPartitionIdBits, rssConf.get(RssClientConf.BLOCKID_PARTITION_ID_BITS));
+
+    key = RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key();
+    assertTrue(sparkConf.contains(key));
+    assertEquals(String.valueOf(expectedTaskAttemptIdBits), sparkConf.get(key));
+    assertEquals(
+        expectedTaskAttemptIdBits, rssConf.get(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS));
+  }
+
+  @Test
+  public void testConfigureBlockIdLayoutOverrides() {
+    SparkConf sparkConf = new SparkConf();
+    RssConf rssConf = new RssConf();
+    int maxFailures = 4;
+    boolean speculation = false;
+
+    String sparkPrefix = RssSparkConfig.SPARK_RSS_CONFIG_PREFIX;
+    @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+    String sparkSeqNoBitsKey = sparkPrefix + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key();
+    @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+    String sparkPartIdBitsKey = sparkPrefix + RssClientConf.BLOCKID_PARTITION_ID_BITS.key();
+    @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+    String sparkTaskIdBitsKey = sparkPrefix + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key();
+
+    // SparkConf populates RssConf
+    sparkConf.set(RssSparkConfig.RSS_MAX_PARTITIONS.key(), "131072");
+    RssShuffleManagerBase.configureBlockIdLayout(sparkConf, rssConf, maxFailures, speculation);
+    assertEquals(27, rssConf.get(RssClientConf.BLOCKID_SEQUENCE_NO_BITS));
+    assertEquals(17, rssConf.get(RssClientConf.BLOCKID_PARTITION_ID_BITS));
+    assertEquals(19, rssConf.get(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS));
+    assertEquals(131072, sparkConf.getInt(RssSparkConfig.RSS_MAX_PARTITIONS.key(), -1));
+    assertEquals(27, sparkConf.getInt(sparkSeqNoBitsKey, -1));
+    assertEquals(17, sparkConf.getInt(sparkPartIdBitsKey, -1));
+    assertEquals(19, sparkConf.getInt(sparkTaskIdBitsKey, -1));
+
+    // SparkConf overrides RssConf
+    sparkConf.set(RssSparkConfig.RSS_MAX_PARTITIONS.key(), "131073");
+    RssShuffleManagerBase.configureBlockIdLayout(sparkConf, rssConf, maxFailures, speculation);
+    assertEquals(25, rssConf.get(RssClientConf.BLOCKID_SEQUENCE_NO_BITS));
+    assertEquals(18, rssConf.get(RssClientConf.BLOCKID_PARTITION_ID_BITS));
+    assertEquals(20, rssConf.get(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS));
+    assertEquals(131073, sparkConf.getInt(RssSparkConfig.RSS_MAX_PARTITIONS.key(), -1));
+    assertEquals(25, sparkConf.getInt(sparkSeqNoBitsKey, -1));
+    assertEquals(18, sparkConf.getInt(sparkPartIdBitsKey, -1));
+    assertEquals(20, sparkConf.getInt(sparkTaskIdBitsKey, -1));
+
+    // SparkConf block id config overrides RssConf
+    sparkConf.remove(RssSparkConfig.RSS_MAX_PARTITIONS.key());
+    sparkConf.set(sparkSeqNoBitsKey, "22");
+    sparkConf.set(sparkPartIdBitsKey, "21");
+    sparkConf.set(sparkTaskIdBitsKey, "20");
+    RssShuffleManagerBase.configureBlockIdLayout(sparkConf, rssConf, maxFailures, speculation);
+    assertEquals(22, rssConf.get(RssClientConf.BLOCKID_SEQUENCE_NO_BITS));
+    assertEquals(21, rssConf.get(RssClientConf.BLOCKID_PARTITION_ID_BITS));
+    assertEquals(20, rssConf.get(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS));
+    assertFalse(sparkConf.contains(RssSparkConfig.RSS_MAX_PARTITIONS.key()));
+    assertEquals(22, sparkConf.getInt(sparkSeqNoBitsKey, -1));
+    assertEquals(21, sparkConf.getInt(sparkPartIdBitsKey, -1));
+    assertEquals(20, sparkConf.getInt(sparkTaskIdBitsKey, -1));
+
+    // empty SparkConf preserves RssConf
+    sparkConf = new SparkConf();
+    RssShuffleManagerBase.configureBlockIdLayout(sparkConf, rssConf, maxFailures, speculation);
+    assertEquals(22, rssConf.get(RssClientConf.BLOCKID_SEQUENCE_NO_BITS));
+    assertEquals(21, rssConf.get(RssClientConf.BLOCKID_PARTITION_ID_BITS));
+    assertEquals(20, rssConf.get(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS));
+    assertFalse(sparkConf.contains(RssSparkConfig.RSS_MAX_PARTITIONS.key()));
+    assertEquals(22, sparkConf.getInt(sparkSeqNoBitsKey, -1));
+    assertEquals(21, sparkConf.getInt(sparkPartIdBitsKey, -1));
+    assertEquals(20, sparkConf.getInt(sparkTaskIdBitsKey, -1));
+  }
+
+  private static Stream<Arguments> testConfigureBlockIdLayoutMaxPartitionsValueExceptionSource() {
+    // test arguments are
+    // - maxPartitions
+    // - maxFailure
+    // - speculation
+    return Stream.of(
+        // without speculation
+        Arguments.of("-1", 4, false),
+        Arguments.of("0", 4, false),
+
+        // with speculation
+        Arguments.of("-1", 4, true),
+        Arguments.of("0", 4, true),
+        Arguments.of("1", 4, true));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testConfigureBlockIdLayoutMaxPartitionsValueExceptionSource")
+  public void testConfigureBlockIdLayoutMaxPartitionsValueException(
+      String setMaxPartitions, int setMaxFailure, boolean setSpeculation) {
+    SparkConf sparkConf = new SparkConf();
+    if (setMaxPartitions != null) {
+      sparkConf.set(RssSparkConfig.RSS_MAX_PARTITIONS.key(), setMaxPartitions);
+    }
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+
+    Executable call =
+        () ->
+            RssShuffleManagerBase.configureBlockIdLayout(
+                sparkConf, rssConf, setMaxFailure, setSpeculation);
+    Exception e = assertThrowsExactly(IllegalArgumentException.class, call);
+
+    String expectedMessage =
+        "Value of spark.rss.blockId.maxPartitions must be larger than 1: " + setMaxPartitions;
+    assertEquals(expectedMessage, e.getMessage());
+  }
+
+  private static Stream<Arguments> testConfigureBlockIdLayoutUnsupportedMaxPartitionsSource() {
+    // test arguments are
+    // - maxPartitions
+    // - maxFailure
+    // - speculation
+    // - expected message
+    return Stream.of(
+        // without speculation
+        Arguments.of("2097152", 2048, false, "1048576"),
+        Arguments.of("536870913", 3, false, "536870912"),
+        Arguments.of("1073741825", 2, false, "1073741824"),
+
+        // with speculation
+        Arguments.of("2097152", 2048, true, "524288"),
+        Arguments.of("536870913", 3, true, "536870912"),
+        Arguments.of("1073741824", 2, true, "536870912"));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testConfigureBlockIdLayoutUnsupportedMaxPartitionsSource")
+  public void testConfigureBlockIdLayoutUnsupportedMaxPartitions(
+      String setMaxPartitions, int setMaxFailure, boolean setSpeculation, String atMost) {
+    SparkConf sparkConf = new SparkConf();
+    if (setMaxPartitions != null) {
+      sparkConf.set(RssSparkConfig.RSS_MAX_PARTITIONS.key(), setMaxPartitions);
+    }
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+
+    String expectedMessage =
+        "Cannot support spark.rss.blockId.maxPartitions="
+            + setMaxPartitions
+            + " partitions, as this would require to reserve more than 31 bits in the block id for task attempt ids. With spark.maxFailures="
+            + setMaxFailure
+            + " and spark.speculation="
+            + setSpeculation
+            + " at most "
+            + atMost
+            + " partitions can be supported.";
+    Executable call =
+        () ->
+            RssShuffleManagerBase.configureBlockIdLayout(
+                sparkConf, rssConf, setMaxFailure, setSpeculation);
+    Exception e = assertThrowsExactly(IllegalArgumentException.class, call);
+    assertEquals(expectedMessage, e.getMessage());
+  }
+
+  private static Stream<Arguments> testConfigureBlockIdLayoutInsufficientConfigExceptionSource() {
+    // test arguments are
+    // - sequenceNoBits
+    // - partitionIdBits
+    // - taskAttemptIdBits
+    // - config
+    return Stream.of("spark", "rss")
+        .flatMap(
+            config ->
+                Stream.of(
+                    Arguments.of(null, 21, 22, config),
+                    Arguments.of(20, null, 22, config),
+                    Arguments.of(20, 21, null, config)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testConfigureBlockIdLayoutInsufficientConfigExceptionSource")
+  public void testConfigureBlockIdLayoutInsufficientConfigException(
+      Integer sequenceNoBits, Integer partitionIdBits, Integer taskAttemptIdBits, String config) {
+    SparkConf sparkConf = new SparkConf();
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+
+    if (config.equals("spark")) {
+      String sparkPrefix = RssSparkConfig.SPARK_RSS_CONFIG_PREFIX;
+      String sparkSeqNoBitsKey = sparkPrefix + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key();
+      String sparkPartIdBitsKey = sparkPrefix + RssClientConf.BLOCKID_PARTITION_ID_BITS.key();
+      String sparkTaskIdBitsKey = sparkPrefix + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key();
+
+      if (sequenceNoBits != null) {
+        sparkConf.set(sparkSeqNoBitsKey, sequenceNoBits.toString());
+      }
+      if (partitionIdBits != null) {
+        sparkConf.set(sparkPartIdBitsKey, partitionIdBits.toString());
+      }
+      if (taskAttemptIdBits != null) {
+        sparkConf.set(sparkTaskIdBitsKey, taskAttemptIdBits.toString());
+      }
+    } else if (config.equals("rss")) {
+      if (sequenceNoBits != null) {
+        rssConf.set(RssClientConf.BLOCKID_SEQUENCE_NO_BITS, sequenceNoBits);
+      }
+      if (partitionIdBits != null) {
+        rssConf.set(RssClientConf.BLOCKID_PARTITION_ID_BITS, partitionIdBits);
+      }
+      if (taskAttemptIdBits != null) {
+        rssConf.set(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS, taskAttemptIdBits);
+      }
+    } else {
+      throw new IllegalArgumentException(config);
+    }
+
+    Executable call =
+        () -> RssShuffleManagerBase.configureBlockIdLayout(sparkConf, rssConf, 4, false);
+    Exception e = assertThrowsExactly(IllegalArgumentException.class, call);
+
+    assertTrue(e.getMessage().startsWith("All block id bit config keys must be provided "));
+  }
+
+  @Test
+  public void testGetMaxAttemptNo() {
+    // without speculation
+    assertEquals(0, RssShuffleManagerBase.getMaxAttemptNo(-1, false));
+    assertEquals(0, RssShuffleManagerBase.getMaxAttemptNo(0, false));
+    assertEquals(0, RssShuffleManagerBase.getMaxAttemptNo(1, false));
+    assertEquals(1, RssShuffleManagerBase.getMaxAttemptNo(2, false));
+    assertEquals(2, RssShuffleManagerBase.getMaxAttemptNo(3, false));
+    assertEquals(3, RssShuffleManagerBase.getMaxAttemptNo(4, false));
+    assertEquals(4, RssShuffleManagerBase.getMaxAttemptNo(5, false));
+    assertEquals(1023, RssShuffleManagerBase.getMaxAttemptNo(1024, false));
+
+    // with speculation
+    assertEquals(1, RssShuffleManagerBase.getMaxAttemptNo(-1, true));
+    assertEquals(1, RssShuffleManagerBase.getMaxAttemptNo(0, true));
+    assertEquals(1, RssShuffleManagerBase.getMaxAttemptNo(1, true));
+    assertEquals(2, RssShuffleManagerBase.getMaxAttemptNo(2, true));
+    assertEquals(3, RssShuffleManagerBase.getMaxAttemptNo(3, true));
+    assertEquals(4, RssShuffleManagerBase.getMaxAttemptNo(4, true));
+    assertEquals(5, RssShuffleManagerBase.getMaxAttemptNo(5, true));
+    assertEquals(1024, RssShuffleManagerBase.getMaxAttemptNo(1024, true));
+  }
+
+  @Test
+  public void testGetAttemptIdBits() {
+    assertEquals(0, RssShuffleManagerBase.getAttemptIdBits(0));
+    assertEquals(1, RssShuffleManagerBase.getAttemptIdBits(1));
+    assertEquals(2, RssShuffleManagerBase.getAttemptIdBits(2));
+    assertEquals(2, RssShuffleManagerBase.getAttemptIdBits(3));
+    assertEquals(3, RssShuffleManagerBase.getAttemptIdBits(4));
+    assertEquals(3, RssShuffleManagerBase.getAttemptIdBits(5));
+    assertEquals(3, RssShuffleManagerBase.getAttemptIdBits(6));
+    assertEquals(3, RssShuffleManagerBase.getAttemptIdBits(7));
+    assertEquals(4, RssShuffleManagerBase.getAttemptIdBits(8));
+    assertEquals(4, RssShuffleManagerBase.getAttemptIdBits(9));
+    assertEquals(10, RssShuffleManagerBase.getAttemptIdBits(1023));
+    assertEquals(11, RssShuffleManagerBase.getAttemptIdBits(1024));
+    assertEquals(11, RssShuffleManagerBase.getAttemptIdBits(1025));
+  }
+
   private long bits(String string) {
     return Long.parseLong(string.replaceAll("[|]", ""), 2);
   }
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 3cb78ddd3..75cda7810 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -146,7 +146,10 @@ public class RssShuffleManager extends RssShuffleManagerBase {
     this.sparkConf = sparkConf;
     this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
     this.speculation = sparkConf.getBoolean("spark.speculation", false);
-    this.blockIdLayout = BlockIdLayout.from(RssSparkConfig.toRssConf(sparkConf));
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+    // configureBlockIdLayout requires maxFailures and speculation to be initialized
+    configureBlockIdLayout(sparkConf, rssConf);
+    this.blockIdLayout = BlockIdLayout.from(rssConf);
     this.user = sparkConf.get("spark.rss.quota.user", "user");
     this.uuid = sparkConf.get("spark.rss.quota.uuid", Long.toString(System.currentTimeMillis()));
     // set & check replica config
@@ -182,7 +185,6 @@ public class RssShuffleManager extends RssShuffleManagerBase {
         sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE);
     int unregisterRequestTimeoutSec =
         sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC);
-    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
     this.shuffleWriteClient =
         ShuffleClientFactory.getInstance()
             .createShuffleWriteClient(
@@ -496,6 +498,18 @@ public class RssShuffleManager extends RssShuffleManagerBase {
     }
   }
 
+  /**
+   * Derives block id layout config from maximum number of allowed partitions. Computes the number
+   * of required bits for partition id and task attempt id and reserves remaining bits for sequence
+   * number.
+   *
+   * @param sparkConf Spark config providing max partitions
+   * @param rssConf Rss config to amend
+   */
+  public void configureBlockIdLayout(SparkConf sparkConf, RssConf rssConf) {
+    configureBlockIdLayout(sparkConf, rssConf, maxFailures, speculation);
+  }
+
   @Override
   public long getTaskAttemptIdForBlockId(int mapIndex, int attemptNo) {
     return getTaskAttemptIdForBlockId(
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 59e19d192..891ccd65e 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -184,11 +184,13 @@ public class RssShuffleManager extends RssShuffleManagerBase {
     this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
     this.dynamicConfEnabled = sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED);
     this.dataDistributionType = getDataDistributionType(sparkConf);
-    this.blockIdLayout = BlockIdLayout.from(RssSparkConfig.toRssConf(sparkConf));
-    this.maxConcurrencyPerPartitionToWrite =
-        RssSparkConfig.toRssConf(sparkConf).get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+    this.maxConcurrencyPerPartitionToWrite = rssConf.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
     this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
     this.speculation = sparkConf.getBoolean("spark.speculation", false);
+    // configureBlockIdLayout requires maxFailures and speculation to be initialized
+    configureBlockIdLayout(sparkConf, rssConf);
+    this.blockIdLayout = BlockIdLayout.from(rssConf);
     long retryIntervalMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
     int heartBeatThreadNum = sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
     this.dataTransferPoolSize = sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
@@ -197,7 +199,6 @@ public class RssShuffleManager extends RssShuffleManagerBase {
         sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE);
     int unregisterRequestTimeoutSec =
         sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC);
-    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
     shuffleWriteClient =
         ShuffleClientFactory.getInstance()
             .createShuffleWriteClient(
@@ -308,13 +309,14 @@ public class RssShuffleManager extends RssShuffleManagerBase {
       Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker) {
     this.sparkConf = conf;
     this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
-    this.dataDistributionType =
-        RssSparkConfig.toRssConf(sparkConf).get(RssClientConf.DATA_DISTRIBUTION_TYPE);
-    this.blockIdLayout = BlockIdLayout.from(RssSparkConfig.toRssConf(sparkConf));
-    this.maxConcurrencyPerPartitionToWrite =
-        RssSparkConfig.toRssConf(sparkConf).get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+    this.dataDistributionType = rssConf.get(RssClientConf.DATA_DISTRIBUTION_TYPE);
+    this.blockIdLayout = BlockIdLayout.from(rssConf);
+    this.maxConcurrencyPerPartitionToWrite = rssConf.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
     this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
     this.speculation = sparkConf.getBoolean("spark.speculation", false);
+    // configureBlockIdLayout requires maxFailures and speculation to be initialized
+    configureBlockIdLayout(sparkConf, rssConf);
     this.heartbeatInterval = sparkConf.get(RssSparkConfig.RSS_HEARTBEAT_INTERVAL);
     this.heartbeatTimeout =
         sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT.key(), heartbeatInterval / 2);
@@ -359,7 +361,7 @@ public class RssShuffleManager extends RssShuffleManagerBase {
                     .dataCommitPoolSize(dataCommitPoolSize)
                     .unregisterThreadPoolSize(unregisterThreadPoolSize)
                     .unregisterRequestTimeSec(unregisterRequestTimeoutSec)
-                    .rssConf(RssSparkConfig.toRssConf(sparkConf)));
+                    .rssConf(rssConf));
     this.taskToSuccessBlockIds = taskToSuccessBlockIds;
     this.heartBeatScheduledExecutorService = null;
     this.taskToFailedBlockSendTracker = taskToFailedBlockSendTracker;
@@ -535,6 +537,11 @@ public class RssShuffleManager extends RssShuffleManagerBase {
         shuffleHandleInfo);
   }
 
+  @Override
+  public void configureBlockIdLayout(SparkConf sparkConf, RssConf rssConf) {
+    configureBlockIdLayout(sparkConf, rssConf, maxFailures, speculation);
+  }
+
   @Override
   public long getTaskAttemptIdForBlockId(int mapIndex, int attemptNo) {
     return getTaskAttemptIdForBlockId(
diff --git a/common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java b/common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java
index 33d8cd6d7..efba2b815 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java
@@ -31,6 +31,8 @@ import org.apache.uniffle.common.config.RssConf;
  */
 public class BlockIdLayout {
 
+  // historic default values, client-specific config defaults may vary
+  // see RssSparkConfig.RSS_MAX_PARTITIONS
   public static final BlockIdLayout DEFAULT = BlockIdLayout.from(18, 24, 21);
 
   public final int sequenceNoBits;
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 8c769f730..1f4dd0b06 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -22,7 +22,7 @@ public final class Constants {
   private Constants() {}
 
   // the value is used for client/server compatible, eg, online upgrade
-  public static final String SHUFFLE_SERVER_VERSION = "ss_v4";
+  public static final String SHUFFLE_SERVER_VERSION = "ss_v5";
   public static final String METRICS_TAG_LABEL_NAME = "tags";
   public static final String COORDINATOR_TAG = "coordinator";
   public static final String SHUFFLE_DATA_FILE_SUFFIX = ".data";
diff --git a/docs/client_guide/spark_client_guide.md b/docs/client_guide/spark_client_guide.md
index 779d92efa..bbcfe5911 100644
--- a/docs/client_guide/spark_client_guide.md
+++ b/docs/client_guide/spark_client_guide.md
@@ -101,7 +101,22 @@ If you observe an error like
 
 you should consider increasing the bits reserved in the blockId for that number / id (while decreasing the other number of bits).
 
-The bits reserved for sequence number, partition id and task attempt id are best specified for Spark clients as follows:
+Using the Spark client, configuring the blockId bits is as easy as defining a maximum number of supported partitions only:
+
+| Property Name                   | Default | Description                                                                                                                                                         |
+|---------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| spark.rss.blockId.maxPartitions | 1048576 | Number of partitions supported by the Spark client (`[2..2,147,483,648]`). |
+
+The Spark client derives the optimal values for the following properties.
+Alternatively, these properties can be configured instead of `spark.rss.blockId.maxPartitions`:
+
+| Property Name                       | Default | Description                                                                                                                                                         |
+|-------------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| spark.rss.blockId.sequenceNoBits    | 18      | Number of bits reserved in the blockId for the sequence number (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`. |
+| spark.rss.blockId.partitionIdBits   | 24      | Number of bits reserved in the blockId for the partition id (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`.    |
+| spark.rss.blockId.taskAttemptIdBits | 21      | Number of bits reserved in the blockId for the task attempt id (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`. |
+
+The bits reserved for sequence number, partition id and task attempt id are best specified for Spark clients as follows (done automatically if `spark.rss.blockId.maxPartitions` is set):
 
 1. Reserve the bits required to support the largest number of partitions that you anticipate. Pick `ceil( log(max number of partitions) / log(2) )` bits.
    For instance, `20` bits support `1,048,576` partitions.
@@ -111,12 +126,6 @@ The bits reserved for sequence number, partition id and task attempt id are best
    For example: `22` bits is sufficient for `taskAttemptIdBits` with `partitionIdBits=20`, and Spark conf `spark.task.maxFailures=4` and `spark.speculation=false`.
 3. Reserve the remaining bits to `sequenceNoBits`: `sequenceNoBits = 63 - partitionIdBits - taskAttemptIdBits`.
 
-| Property Name                       | Default | Description                                                                                                                                                         |
-|-------------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| spark.rss.blockId.sequenceNoBits    | 18      | Number of bits reserved in the blockId for the sequence number (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`. |
-| spark.rss.blockId.partitionIdBits   | 24      | Number of bits reserved in the blockId for the partition id (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`.    |
-| spark.rss.blockId.taskAttemptIdBits | 21      | Number of bits reserved in the blockId for the task attempt id (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`. |
-
 ### Adaptive Remote Shuffle Enabling 
 Currently, this feature only supports Spark. 
 
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index ea598e472..39e3deefb 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -75,7 +75,6 @@ import org.apache.uniffle.common.metrics.TestUtils;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.BlockIdLayout;
-import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.proto.RssProtos;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
@@ -420,20 +419,6 @@ public class ShuffleServerGrpcTest extends IntegrationTestBase {
     addExpectedBlockIds(expectedP3, blockIds3);
     assertEquals(expectedP3, blockIdBitmap);
 
-    // get same shuffle result without block id layout (legacy clients)
-    RssProtos.GetShuffleResultRequest rpcRequest =
-        RssProtos.GetShuffleResultRequest.newBuilder()
-            .setAppId("shuffleResultTest")
-            .setShuffleId(4)
-            .setPartitionId(3)
-            // deliberately not setting block id layout through .setBlockIdLayout
-            .build();
-    RssProtos.GetShuffleResultResponse rpcResponse =
-        grpcShuffleServerClient.getBlockingStub().getShuffleResult(rpcRequest);
-    assertEquals(RssProtos.StatusCode.SUCCESS, rpcResponse.getStatus());
-    blockIdBitmap = RssUtils.deserializeBitMap(rpcResponse.getSerializedBitmap().toByteArray());
-    assertEquals(expectedP3, blockIdBitmap);
-
     // wait resources are deleted
     Thread.sleep(12000);
     req = new RssGetShuffleResultRequest("shuffleResultTest", 1, 1, layout);
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
index 6d797a88a..ac6d739dd 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
@@ -103,6 +103,7 @@ public class RssShuffleManagerTest extends SparkIntegrationTestBase {
     return new HashMap();
   }
 
+  private static final BlockIdLayout DEFAULT = BlockIdLayout.from(21, 20, 22);
   private static final BlockIdLayout CUSTOM1 = BlockIdLayout.from(20, 21, 22);
   private static final BlockIdLayout CUSTOM2 = BlockIdLayout.from(22, 18, 23);
 
@@ -113,7 +114,7 @@ public class RssShuffleManagerTest extends SparkIntegrationTestBase {
   @ParameterizedTest
   @ValueSource(booleans = {false, true})
   public void testRssShuffleManager(boolean enableDynamicClientConf) throws Exception {
-    doTestRssShuffleManager(null, null, BlockIdLayout.DEFAULT, enableDynamicClientConf);
+    doTestRssShuffleManager(null, null, DEFAULT, enableDynamicClientConf);
   }
 
   @ParameterizedTest
@@ -187,6 +188,7 @@ public class RssShuffleManagerTest extends SparkIntegrationTestBase {
       // get written block ids (we know there is one shuffle where two task attempts wrote two
       // partitions)
       RssConf rssConf = RssSparkConfig.toRssConf(conf);
+      shuffleManager.configureBlockIdLayout(conf, rssConf);
       ShuffleWriteClient shuffleWriteClient =
           ShuffleClientFactory.newWriteBuilder()
               .clientType(ClientType.GRPC.name())
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 038e03bb3..695c2afc6 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -529,15 +529,11 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
     String appId = request.getAppId();
     int shuffleId = request.getShuffleId();
     int partitionId = request.getPartitionId();
-    BlockIdLayout blockIdLayout = BlockIdLayout.DEFAULT;
-    // legacy clients might send request without block id layout, we fall back to DEFAULT then
-    if (request.hasBlockIdLayout()) {
-      blockIdLayout =
-          BlockIdLayout.from(
-              request.getBlockIdLayout().getSequenceNoBits(),
-              request.getBlockIdLayout().getPartitionIdBits(),
-              request.getBlockIdLayout().getTaskAttemptIdBits());
-    }
+    BlockIdLayout blockIdLayout =
+        BlockIdLayout.from(
+            request.getBlockIdLayout().getSequenceNoBits(),
+            request.getBlockIdLayout().getPartitionIdBits(),
+            request.getBlockIdLayout().getTaskAttemptIdBits());
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";
     GetShuffleResultResponse reply;
@@ -581,15 +577,11 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
     String appId = request.getAppId();
     int shuffleId = request.getShuffleId();
     List<Integer> partitionsList = request.getPartitionsList();
-    BlockIdLayout blockIdLayout = BlockIdLayout.DEFAULT;
-    // legacy clients might send request without block id layout, we fall back to DEFAULT then
-    if (request.hasBlockIdLayout()) {
-      blockIdLayout =
-          BlockIdLayout.from(
-              request.getBlockIdLayout().getSequenceNoBits(),
-              request.getBlockIdLayout().getPartitionIdBits(),
-              request.getBlockIdLayout().getTaskAttemptIdBits());
-    }
+    BlockIdLayout blockIdLayout =
+        BlockIdLayout.from(
+            request.getBlockIdLayout().getSequenceNoBits(),
+            request.getBlockIdLayout().getPartitionIdBits(),
+            request.getBlockIdLayout().getTaskAttemptIdBits());
 
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";