You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/10/16 01:55:35 UTC

[incubator-uniffle] branch master updated: [#1090] refactor: Refactor the reader code with builder pattern (#1232)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new f49b9de0d [#1090] refactor: Refactor the reader code with builder pattern (#1232)
f49b9de0d is described below

commit f49b9de0d8d6504f7d943cea9325f4c137d58ec9
Author: summaryzb <su...@gmail.com>
AuthorDate: Sun Oct 15 20:55:29 2023 -0500

    [#1090] refactor: Refactor the reader code with builder pattern (#1232)
    
    ### What changes were proposed in this pull request?
    As the title
    
    ### Why are the changes needed?
    https://github.com/apache/incubator-uniffle/issues/1090
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    unit test
---
 .../hadoop/mapreduce/task/reduce/RssShuffle.java   |  33 +-
 .../shuffle/reader/RssShuffleDataIteratorTest.java |  32 +-
 .../spark/shuffle/reader/RssShuffleReader.java     |  32 +-
 .../spark/shuffle/reader/RssShuffleReader.java     |  33 +-
 .../common/shuffle/impl/RssTezFetcherTask.java     |  34 +-
 .../orderedgrouped/RssShuffleScheduler.java        |  34 +-
 .../client/factory/ShuffleClientFactory.java       | 222 +++++++++-
 .../uniffle/client/impl/ShuffleReadClientImpl.java | 213 +++------
 .../request/CreateShuffleReadClientRequest.java    | 221 ----------
 .../client/impl/ShuffleReadClientImplTest.java     | 477 ++++++---------------
 .../uniffle/test/DiskErrorToleranceTest.java       |  62 +--
 .../test/HybridStorageFaultToleranceBase.java      |  33 +-
 .../java/org/apache/uniffle/test/QuorumTest.java   | 401 ++++++-----------
 .../ShuffleServerConcurrentWriteOfHadoopTest.java  |  33 +-
 .../uniffle/test/ShuffleServerWithHadoopTest.java  | 128 +++---
 .../ShuffleServerWithKerberizedHadoopTest.java     | 126 +++---
 .../uniffle/test/ShuffleWithRssClientTest.java     |  59 ++-
 .../uniffle/test/SparkClientWithLocalTest.java     | 289 ++++---------
 18 files changed, 919 insertions(+), 1543 deletions(-)

diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
index 47d061546..1f7aae942 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
@@ -42,7 +42,6 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
-import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.hadoop.shim.HadoopShimImpl;
@@ -217,23 +216,23 @@ public class RssShuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionR
       LOG.info("In reduce: " + reduceId + ", Rss MR client starts to fetch blocks from RSS server");
       JobConf readerJobConf = getRemoteConf();
       boolean expectedTaskIdsBitmapFilterEnable = serverInfoList.size() > 1;
-      CreateShuffleReadClientRequest request =
-          new CreateShuffleReadClientRequest(
-              appId,
-              0,
-              reduceId.getTaskID().getId(),
-              basePath,
-              partitionNumPerRange,
-              partitionNum,
-              blockIdBitmap,
-              taskIdBitmap,
-              serverInfoList,
-              readerJobConf,
-              new MRIdHelper(),
-              expectedTaskIdsBitmapFilterEnable,
-              RssMRConfig.toRssConf(rssJobConf));
       ShuffleReadClient shuffleReadClient =
-          ShuffleClientFactory.getInstance().createShuffleReadClient(request);
+          ShuffleClientFactory.getInstance()
+              .createShuffleReadClient(
+                  ShuffleClientFactory.newReadBuilder()
+                      .appId(appId)
+                      .shuffleId(0)
+                      .partitionId(reduceId.getTaskID().getId())
+                      .basePath(basePath)
+                      .partitionNumPerRange(partitionNumPerRange)
+                      .partitionNum(partitionNum)
+                      .blockIdBitmap(blockIdBitmap)
+                      .taskIdBitmap(taskIdBitmap)
+                      .shuffleServerInfoList(serverInfoList)
+                      .hadoopConf(readerJobConf)
+                      .idHelper(new MRIdHelper())
+                      .expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
+                      .rssConf(RssMRConfig.toRssConf(rssJobConf)));
       RssFetcher fetcher =
           new RssFetcher(
               mrJobConf,
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 748a2a58b..0dea95147 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.SparkConf;
@@ -39,9 +38,9 @@ import org.mockito.Mockito;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.client.util.ClientUtils;
-import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.ChecksumUtils;
@@ -114,21 +113,20 @@ public class RssShuffleDataIteratorTest extends AbstractRssReaderTest {
       List<ShuffleServerInfo> serverInfos,
       boolean compress) {
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            2,
-            10,
-            10000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(serverInfos),
-            new Configuration(),
-            new DefaultIdHelper());
+        ShuffleClientFactory.newReadBuilder()
+            .storageType(StorageType.HDFS.name())
+            .appId("appId")
+            .shuffleId(0)
+            .partitionId(1)
+            .indexReadLimit(100)
+            .partitionNumPerRange(2)
+            .partitionNum(10)
+            .readBufferSize(10000)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(serverInfos))
+            .build();
     RssConf rc;
     if (!compress) {
       SparkConf sc = new SparkConf();
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 6f9f3186d..9130bc587 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
-import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
 import org.apache.uniffle.client.util.RssClientConfig;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssClientConf;
@@ -108,23 +107,22 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
   @Override
   public Iterator<Product2<K, C>> read() {
     LOG.info("Shuffle read started:" + getReadInfo());
-
-    CreateShuffleReadClientRequest request =
-        new CreateShuffleReadClientRequest(
-            appId,
-            shuffleId,
-            startPartition,
-            basePath,
-            partitionNumPerRange,
-            partitionNum,
-            blockIdBitmap,
-            taskIdBitmap,
-            shuffleServerInfoList,
-            hadoopConf,
-            expectedTaskIdsBitmapFilterEnable,
-            rssConf);
     ShuffleReadClient shuffleReadClient =
-        ShuffleClientFactory.getInstance().createShuffleReadClient(request);
+        ShuffleClientFactory.getInstance()
+            .createShuffleReadClient(
+                ShuffleClientFactory.newReadBuilder()
+                    .appId(appId)
+                    .shuffleId(shuffleId)
+                    .partitionId(startPartition)
+                    .basePath(basePath)
+                    .partitionNumPerRange(partitionNumPerRange)
+                    .partitionNum(partitionNum)
+                    .blockIdBitmap(blockIdBitmap)
+                    .taskIdBitmap(taskIdBitmap)
+                    .shuffleServerInfoList(shuffleServerInfoList)
+                    .hadoopConf(hadoopConf)
+                    .expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
+                    .rssConf(rssConf));
     RssShuffleDataIterator rssShuffleDataIterator =
         new RssShuffleDataIterator<K, C>(
             shuffleDependency.serializer(),
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 4cd4aaa2b..f8ed76f73 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -49,7 +49,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
-import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
 import org.apache.uniffle.client.util.RssClientConfig;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
@@ -246,23 +245,23 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
         boolean expectedTaskIdsBitmapFilterEnable =
             !(mapStartIndex == 0 && mapEndIndex == Integer.MAX_VALUE)
                 || shuffleServerInfoList.size() > 1;
-        CreateShuffleReadClientRequest request =
-            new CreateShuffleReadClientRequest(
-                appId,
-                shuffleId,
-                partition,
-                basePath,
-                1,
-                partitionNum,
-                partitionToExpectBlocks.get(partition),
-                taskIdBitmap,
-                shuffleServerInfoList,
-                hadoopConf,
-                dataDistributionType,
-                expectedTaskIdsBitmapFilterEnable,
-                rssConf);
         ShuffleReadClient shuffleReadClient =
-            ShuffleClientFactory.getInstance().createShuffleReadClient(request);
+            ShuffleClientFactory.getInstance()
+                .createShuffleReadClient(
+                    ShuffleClientFactory.newReadBuilder()
+                        .appId(appId)
+                        .shuffleId(shuffleId)
+                        .partitionId(partition)
+                        .basePath(basePath)
+                        .partitionNumPerRange(1)
+                        .partitionNum(partitionNum)
+                        .blockIdBitmap(partitionToExpectBlocks.get(partition))
+                        .taskIdBitmap(taskIdBitmap)
+                        .shuffleServerInfoList(shuffleServerInfoList)
+                        .hadoopConf(hadoopConf)
+                        .shuffleDataDistributionType(dataDistributionType)
+                        .expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
+                        .rssConf(rssConf));
         RssShuffleDataIterator<K, C> iterator =
             new RssShuffleDataIterator<>(
                 shuffleDependency.serializer(), shuffleReadClient, readMetrics, rssConf);
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
index 65ee99b4b..f1dd85b8b 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
-import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.util.UnitConverter;
@@ -179,23 +178,24 @@ public class RssTezFetcherTask extends CallableWithNdc<FetchResult> {
       Configuration hadoopConf = getRemoteConf();
       LOG.info("RssTezFetcherTask storageType:{}", storageType);
       boolean expectedTaskIdsBitmapFilterEnable = serverInfoSet.size() > 1;
-      CreateShuffleReadClientRequest request =
-          new CreateShuffleReadClientRequest(
-              applicationAttemptId.toString(),
-              shuffleId,
-              partition,
-              basePath,
-              partitionNumPerRange,
-              partitionNum,
-              blockIdBitmap,
-              taskIdBitmap,
-              new ArrayList<>(serverInfoSet),
-              hadoopConf,
-              new TezIdHelper(),
-              expectedTaskIdsBitmapFilterEnable,
-              RssTezConfig.toRssConf(this.conf));
+
       ShuffleReadClient shuffleReadClient =
-          ShuffleClientFactory.getInstance().createShuffleReadClient(request);
+          ShuffleClientFactory.getInstance()
+              .createShuffleReadClient(
+                  ShuffleClientFactory.newReadBuilder()
+                      .appId(applicationAttemptId.toString())
+                      .shuffleId(shuffleId)
+                      .partitionId(partition)
+                      .basePath(basePath)
+                      .partitionNumPerRange(partitionNumPerRange)
+                      .partitionNum(partitionNum)
+                      .blockIdBitmap(blockIdBitmap)
+                      .taskIdBitmap(taskIdBitmap)
+                      .shuffleServerInfoList(new ArrayList<>(serverInfoSet))
+                      .hadoopConf(hadoopConf)
+                      .idHelper(new TezIdHelper())
+                      .expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
+                      .rssConf(RssTezConfig.toRssConf(this.conf)));
       RssTezFetcher fetcher =
           new RssTezFetcher(
               fetcherCallback,
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index 89146ae94..0528037d3 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -95,7 +95,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
-import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
@@ -1851,24 +1850,23 @@ class RssShuffleScheduler extends ShuffleScheduler {
       int partitionNum = partitionToServers.size();
       boolean expectedTaskIdsBitmapFilterEnable = shuffleServerInfoSet.size() > 1;
 
-      CreateShuffleReadClientRequest request =
-          new CreateShuffleReadClientRequest(
-              applicationAttemptId.toString(),
-              shuffleId,
-              mapHost.getPartitionId(),
-              basePath,
-              partitionNumPerRange,
-              partitionNum,
-              blockIdBitmap,
-              taskIdBitmap,
-              shuffleServerInfoList,
-              hadoopConf,
-              new TezIdHelper(),
-              expectedTaskIdsBitmapFilterEnable,
-              RssTezConfig.toRssConf(conf));
-
       ShuffleReadClient shuffleReadClient =
-          ShuffleClientFactory.getInstance().createShuffleReadClient(request);
+          ShuffleClientFactory.getInstance()
+              .createShuffleReadClient(
+                  ShuffleClientFactory.newReadBuilder()
+                      .appId(applicationAttemptId.toString())
+                      .shuffleId(shuffleId)
+                      .partitionId(mapHost.getPartitionId())
+                      .basePath(basePath)
+                      .partitionNumPerRange(partitionNumPerRange)
+                      .partitionNum(partitionNum)
+                      .blockIdBitmap(blockIdBitmap)
+                      .taskIdBitmap(taskIdBitmap)
+                      .shuffleServerInfoList(shuffleServerInfoList)
+                      .hadoopConf(hadoopConf)
+                      .idHelper(new TezIdHelper())
+                      .expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
+                      .rssConf(RssTezConfig.toRssConf(conf)));
       RssTezShuffleDataFetcher fetcher =
           new RssTezShuffleDataFetcher(
               partitionIdToSuccessMapTaskAttempts.get(mapHost.getPartitionId()).iterator().next(),
diff --git a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index cdabf0533..6baccc05b 100644
--- a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++ b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -17,12 +17,19 @@
 
 package org.apache.uniffle.client.factory;
 
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
-import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.IdHelper;
 
 public class ShuffleClientFactory {
 
@@ -41,22 +48,8 @@ public class ShuffleClientFactory {
     return builder.build();
   }
 
-  public ShuffleReadClient createShuffleReadClient(CreateShuffleReadClientRequest request) {
-    return new ShuffleReadClientImpl(
-        request.getAppId(),
-        request.getShuffleId(),
-        request.getPartitionId(),
-        request.getPartitionNumPerRange(),
-        request.getPartitionNum(),
-        request.getBasePath(),
-        request.getBlockIdBitmap(),
-        request.getTaskIdBitmap(),
-        request.getShuffleServerInfoList(),
-        request.getHadoopConf(),
-        request.getIdHelper(),
-        request.getShuffleDataDistributionType(),
-        request.isExpectedTaskIdsBitmapFilterEnable(),
-        request.getRssConf());
+  public ShuffleReadClient createShuffleReadClient(ReadClientBuilder builder) {
+    return builder.build();
   }
 
   public static class WriteClientBuilder {
@@ -198,7 +191,202 @@ public class ShuffleClientFactory {
     }
   }
 
+  public static class ReadClientBuilder {
+    private String appId;
+    private int shuffleId;
+    private int partitionId;
+    private String basePath;
+    private int partitionNumPerRange;
+    private int partitionNum;
+    private Roaring64NavigableMap blockIdBitmap;
+    private Roaring64NavigableMap taskIdBitmap;
+    private List<ShuffleServerInfo> shuffleServerInfoList;
+    private Configuration hadoopConf;
+    private IdHelper idHelper;
+    private ShuffleDataDistributionType shuffleDataDistributionType;
+    private boolean expectedTaskIdsBitmapFilterEnable;
+    private RssConf rssConf;
+    private boolean offHeapEnable;
+    private String storageType;
+    private int indexReadLimit;
+    private long readBufferSize;
+
+    public ReadClientBuilder appId(String appId) {
+      this.appId = appId;
+      return this;
+    }
+
+    public ReadClientBuilder shuffleId(int shuffleId) {
+      this.shuffleId = shuffleId;
+      return this;
+    }
+
+    public ReadClientBuilder partitionId(int partitionId) {
+      this.partitionId = partitionId;
+      return this;
+    }
+
+    public ReadClientBuilder basePath(String basePath) {
+      this.basePath = basePath;
+      return this;
+    }
+
+    public ReadClientBuilder partitionNumPerRange(int partitionNumPerRange) {
+      this.partitionNumPerRange = partitionNumPerRange;
+      return this;
+    }
+
+    public ReadClientBuilder partitionNum(int partitionNum) {
+      this.partitionNum = partitionNum;
+      return this;
+    }
+
+    public ReadClientBuilder blockIdBitmap(Roaring64NavigableMap blockIdBitmap) {
+      this.blockIdBitmap = blockIdBitmap;
+      return this;
+    }
+
+    public ReadClientBuilder taskIdBitmap(Roaring64NavigableMap taskIdBitmap) {
+      this.taskIdBitmap = taskIdBitmap;
+      return this;
+    }
+
+    public ReadClientBuilder shuffleServerInfoList(List<ShuffleServerInfo> shuffleServerInfoList) {
+      this.shuffleServerInfoList = shuffleServerInfoList;
+      return this;
+    }
+
+    public ReadClientBuilder hadoopConf(Configuration hadoopConf) {
+      this.hadoopConf = hadoopConf;
+      return this;
+    }
+
+    public ReadClientBuilder idHelper(IdHelper idHelper) {
+      this.idHelper = idHelper;
+      return this;
+    }
+
+    public ReadClientBuilder shuffleDataDistributionType(
+        ShuffleDataDistributionType shuffleDataDistributionType) {
+      this.shuffleDataDistributionType = shuffleDataDistributionType;
+      return this;
+    }
+
+    public ReadClientBuilder expectedTaskIdsBitmapFilterEnable(
+        boolean expectedTaskIdsBitmapFilterEnable) {
+      this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
+      return this;
+    }
+
+    public ReadClientBuilder rssConf(RssConf rssConf) {
+      this.rssConf = rssConf;
+      return this;
+    }
+
+    public ReadClientBuilder offHeapEnable(boolean offHeapEnable) {
+      this.offHeapEnable = offHeapEnable;
+      return this;
+    }
+
+    public ReadClientBuilder storageType(String storageType) {
+      this.storageType = storageType;
+      return this;
+    }
+
+    public ReadClientBuilder indexReadLimit(int indexReadLimit) {
+      this.indexReadLimit = indexReadLimit;
+      return this;
+    }
+
+    public ReadClientBuilder readBufferSize(long readBufferSize) {
+      this.readBufferSize = readBufferSize;
+      return this;
+    }
+
+    public ReadClientBuilder() {}
+
+    public String getAppId() {
+      return appId;
+    }
+
+    public int getShuffleId() {
+      return shuffleId;
+    }
+
+    public int getPartitionId() {
+      return partitionId;
+    }
+
+    public int getPartitionNumPerRange() {
+      return partitionNumPerRange;
+    }
+
+    public int getPartitionNum() {
+      return partitionNum;
+    }
+
+    public String getBasePath() {
+      return basePath;
+    }
+
+    public Roaring64NavigableMap getBlockIdBitmap() {
+      return blockIdBitmap;
+    }
+
+    public Roaring64NavigableMap getTaskIdBitmap() {
+      return taskIdBitmap;
+    }
+
+    public List<ShuffleServerInfo> getShuffleServerInfoList() {
+      return shuffleServerInfoList;
+    }
+
+    public Configuration getHadoopConf() {
+      return hadoopConf;
+    }
+
+    public IdHelper getIdHelper() {
+      return idHelper;
+    }
+
+    public ShuffleDataDistributionType getShuffleDataDistributionType() {
+      return shuffleDataDistributionType;
+    }
+
+    public boolean isExpectedTaskIdsBitmapFilterEnable() {
+      return expectedTaskIdsBitmapFilterEnable;
+    }
+
+    public RssConf getRssConf() {
+      return rssConf;
+    }
+
+    public boolean isOffHeapEnable() {
+      return offHeapEnable;
+    }
+
+    public String getStorageType() {
+      return storageType;
+    }
+
+    public int getIndexReadLimit() {
+      return indexReadLimit;
+    }
+
+    public long getReadBufferSize() {
+      return readBufferSize;
+    }
+
+    public ShuffleReadClientImpl build() {
+      return new ShuffleReadClientImpl(this);
+    }
+  }
+
   public static WriteClientBuilder newWriteBuilder() {
     return new WriteClientBuilder();
   }
+
+  public static ReadClientBuilder newReadBuilder() {
+    return new ReadClientBuilder();
+  }
 }
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index f7986d574..24fb20609 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -31,7 +31,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
@@ -64,131 +66,82 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
   private ClientReadHandler clientReadHandler;
   private IdHelper idHelper;
 
-  public ShuffleReadClientImpl(
-      String appId,
-      int shuffleId,
-      int partitionId,
-      int partitionNumPerRange,
-      int partitionNum,
-      String storageBasePath,
-      Roaring64NavigableMap blockIdBitmap,
-      Roaring64NavigableMap taskIdBitmap,
-      List<ShuffleServerInfo> shuffleServerInfoList,
-      Configuration hadoopConf,
-      IdHelper idHelper,
-      ShuffleDataDistributionType dataDistributionType,
-      boolean expectedTaskIdsBitmapFilterEnable,
-      RssConf rssConf) {
-    final int indexReadLimit = rssConf.get(RssClientConf.RSS_INDEX_READ_LIMIT);
-    final String storageType = rssConf.get(RssClientConf.RSS_STORAGE_TYPE);
-    long readBufferSize =
-        rssConf.getSizeAsBytes(
-            RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.key(),
-            RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.defaultValue());
-    if (readBufferSize > Integer.MAX_VALUE) {
-      LOG.warn(RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.key() + " can support 2g as max");
-      readBufferSize = Integer.MAX_VALUE;
+  public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) {
+    // add default value
+    if (builder.getIdHelper() == null) {
+      builder.idHelper(new DefaultIdHelper());
     }
-    boolean offHeapEnabled = rssConf.get(RssClientConf.OFF_HEAP_MEMORY_ENABLE);
-    init(
-        storageType,
-        appId,
-        shuffleId,
-        partitionId,
-        indexReadLimit,
-        partitionNumPerRange,
-        partitionNum,
-        (int) readBufferSize,
-        storageBasePath,
-        blockIdBitmap,
-        taskIdBitmap,
-        shuffleServerInfoList,
-        hadoopConf,
-        idHelper,
-        dataDistributionType,
-        expectedTaskIdsBitmapFilterEnable,
-        offHeapEnabled,
-        rssConf);
-  }
+    if (builder.getShuffleDataDistributionType() == null) {
+      builder.shuffleDataDistributionType(ShuffleDataDistributionType.NORMAL);
+    }
+    if (builder.getHadoopConf() == null) {
+      builder.hadoopConf(new Configuration());
+    }
+    if (builder.getRssConf() != null) {
+      final int indexReadLimit = builder.getRssConf().get(RssClientConf.RSS_INDEX_READ_LIMIT);
+      final String storageType = builder.getRssConf().get(RssClientConf.RSS_STORAGE_TYPE);
+      long readBufferSize =
+          builder
+              .getRssConf()
+              .getSizeAsBytes(
+                  RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.key(),
+                  RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.defaultValue());
+      if (readBufferSize > Integer.MAX_VALUE) {
+        LOG.warn(RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.key() + " can support 2g as max");
+        readBufferSize = Integer.MAX_VALUE;
+      }
+      boolean offHeapEnabled = builder.getRssConf().get(RssClientConf.OFF_HEAP_MEMORY_ENABLE);
+
+      builder.indexReadLimit(indexReadLimit);
+      builder.storageType(storageType);
+      builder.readBufferSize(readBufferSize);
+      builder.offHeapEnable(offHeapEnabled);
+    } else {
+      // most for test
+      RssConf rssConf = new RssConf();
+      rssConf.set(RssClientConf.RSS_STORAGE_TYPE, builder.getStorageType());
+      rssConf.set(RssClientConf.RSS_INDEX_READ_LIMIT, builder.getIndexReadLimit());
+      rssConf.set(
+          RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE, String.valueOf(builder.getReadBufferSize()));
 
-  public ShuffleReadClientImpl(
-      String appId,
-      int shuffleId,
-      int partitionId,
-      int partitionNumPerRange,
-      int partitionNum,
-      String storageBasePath,
-      Roaring64NavigableMap blockIdBitmap,
-      Roaring64NavigableMap taskIdBitmap,
-      List<ShuffleServerInfo> shuffleServerInfoList,
-      Configuration hadoopConf,
-      IdHelper idHelper,
-      RssConf rssConf) {
-    this(
-        appId,
-        shuffleId,
-        partitionId,
-        partitionNumPerRange,
-        partitionNum,
-        storageBasePath,
-        blockIdBitmap,
-        taskIdBitmap,
-        shuffleServerInfoList,
-        hadoopConf,
-        idHelper,
-        ShuffleDataDistributionType.NORMAL,
-        false,
-        rssConf);
+      builder.rssConf(rssConf);
+      builder.offHeapEnable(false);
+      builder.expectedTaskIdsBitmapFilterEnable(false);
+    }
+
+    init(builder);
   }
 
-  private void init(
-      String storageType,
-      String appId,
-      int shuffleId,
-      int partitionId,
-      int indexReadLimit,
-      int partitionNumPerRange,
-      int partitionNum,
-      int readBufferSize,
-      String storageBasePath,
-      Roaring64NavigableMap blockIdBitmap,
-      Roaring64NavigableMap taskIdBitmap,
-      List<ShuffleServerInfo> shuffleServerInfoList,
-      Configuration hadoopConf,
-      IdHelper idHelper,
-      ShuffleDataDistributionType dataDistributionType,
-      boolean expectedTaskIdsBitmapFilterEnable,
-      boolean offHeapEnabled,
-      RssConf rssConf) {
-    this.shuffleId = shuffleId;
-    this.partitionId = partitionId;
-    this.blockIdBitmap = blockIdBitmap;
-    this.taskIdBitmap = taskIdBitmap;
-    this.idHelper = idHelper;
-    this.shuffleServerInfoList = shuffleServerInfoList;
+  private void init(ShuffleClientFactory.ReadClientBuilder builder) {
+    this.shuffleId = builder.getShuffleId();
+    this.partitionId = builder.getPartitionId();
+    this.blockIdBitmap = builder.getBlockIdBitmap();
+    this.taskIdBitmap = builder.getTaskIdBitmap();
+    this.idHelper = builder.getIdHelper();
+    this.shuffleServerInfoList = builder.getShuffleServerInfoList();
 
     CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest();
-    request.setStorageType(storageType);
-    request.setAppId(appId);
+    request.setStorageType(builder.getStorageType());
+    request.setAppId(builder.getAppId());
     request.setShuffleId(shuffleId);
     request.setPartitionId(partitionId);
-    request.setIndexReadLimit(indexReadLimit);
-    request.setPartitionNumPerRange(partitionNumPerRange);
-    request.setPartitionNum(partitionNum);
-    request.setReadBufferSize((int) readBufferSize);
-    request.setStorageBasePath(storageBasePath);
+    request.setIndexReadLimit(builder.getIndexReadLimit());
+    request.setPartitionNumPerRange(builder.getPartitionNumPerRange());
+    request.setPartitionNum(builder.getPartitionNum());
+    request.setReadBufferSize((int) builder.getReadBufferSize());
+    request.setStorageBasePath(builder.getBasePath());
     request.setShuffleServerInfoList(shuffleServerInfoList);
-    request.setHadoopConf(hadoopConf);
+    request.setHadoopConf(builder.getHadoopConf());
     request.setExpectBlockIds(blockIdBitmap);
     request.setProcessBlockIds(processedBlockIds);
-    request.setDistributionType(dataDistributionType);
+    request.setDistributionType(builder.getShuffleDataDistributionType());
     request.setIdHelper(idHelper);
     request.setExpectTaskIds(taskIdBitmap);
-    request.setClientConf(rssConf);
-    if (expectedTaskIdsBitmapFilterEnable) {
+    request.setClientConf(builder.getRssConf());
+    if (builder.isExpectedTaskIdsBitmapFilterEnable()) {
       request.useExpectedTaskIdsBitmapFilter();
     }
-    if (offHeapEnabled) {
+    if (builder.isOffHeapEnable()) {
       request.enableOffHeap();
     }
 
@@ -210,46 +163,6 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
     clientReadHandler = ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
   }
 
-  public ShuffleReadClientImpl(
-      String storageType,
-      String appId,
-      int shuffleId,
-      int partitionId,
-      int indexReadLimit,
-      int partitionNumPerRange,
-      int partitionNum,
-      int readBufferSize,
-      String storageBasePath,
-      Roaring64NavigableMap blockIdBitmap,
-      Roaring64NavigableMap taskIdBitmap,
-      List<ShuffleServerInfo> shuffleServerInfoList,
-      Configuration hadoopConf,
-      IdHelper idHelper) {
-    RssConf rssConf = new RssConf();
-    rssConf.set(RssClientConf.RSS_STORAGE_TYPE, storageType);
-    rssConf.set(RssClientConf.RSS_INDEX_READ_LIMIT, indexReadLimit);
-    rssConf.set(RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE, String.valueOf(readBufferSize));
-    init(
-        storageType,
-        appId,
-        shuffleId,
-        partitionId,
-        indexReadLimit,
-        partitionNumPerRange,
-        partitionNum,
-        readBufferSize,
-        storageBasePath,
-        blockIdBitmap,
-        taskIdBitmap,
-        shuffleServerInfoList,
-        hadoopConf,
-        idHelper,
-        ShuffleDataDistributionType.NORMAL,
-        false,
-        false,
-        rssConf);
-  }
-
   @Override
   public CompressedShuffleBlock readShuffleBlockData() {
     // empty data expected, just return null
diff --git a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
deleted file mode 100644
index 97a568179..000000000
--- a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.client.request;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
-
-import org.apache.uniffle.client.util.DefaultIdHelper;
-import org.apache.uniffle.common.ShuffleDataDistributionType;
-import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.config.RssConf;
-import org.apache.uniffle.common.util.IdHelper;
-
-public class CreateShuffleReadClientRequest {
-
-  private String appId;
-  private int shuffleId;
-  private int partitionId;
-  private String basePath;
-  private int partitionNumPerRange;
-  private int partitionNum;
-  private Roaring64NavigableMap blockIdBitmap;
-  private Roaring64NavigableMap taskIdBitmap;
-  private List<ShuffleServerInfo> shuffleServerInfoList;
-  private Configuration hadoopConf;
-  private IdHelper idHelper;
-  private ShuffleDataDistributionType shuffleDataDistributionType =
-      ShuffleDataDistributionType.NORMAL;
-  private boolean expectedTaskIdsBitmapFilterEnable = false;
-  private RssConf rssConf;
-
-  public CreateShuffleReadClientRequest(
-      String appId,
-      int shuffleId,
-      int partitionId,
-      String basePath,
-      int partitionNumPerRange,
-      int partitionNum,
-      Roaring64NavigableMap blockIdBitmap,
-      Roaring64NavigableMap taskIdBitmap,
-      List<ShuffleServerInfo> shuffleServerInfoList,
-      Configuration hadoopConf,
-      ShuffleDataDistributionType dataDistributionType,
-      boolean expectedTaskIdsBitmapFilterEnable,
-      RssConf rssConf) {
-    this(
-        appId,
-        shuffleId,
-        partitionId,
-        basePath,
-        partitionNumPerRange,
-        partitionNum,
-        blockIdBitmap,
-        taskIdBitmap,
-        shuffleServerInfoList,
-        hadoopConf,
-        new DefaultIdHelper(),
-        expectedTaskIdsBitmapFilterEnable,
-        rssConf);
-    this.shuffleDataDistributionType = dataDistributionType;
-  }
-
-  public CreateShuffleReadClientRequest(
-      String appId,
-      int shuffleId,
-      int partitionId,
-      String basePath,
-      int partitionNumPerRange,
-      int partitionNum,
-      Roaring64NavigableMap blockIdBitmap,
-      Roaring64NavigableMap taskIdBitmap,
-      List<ShuffleServerInfo> shuffleServerInfoList,
-      Configuration hadoopConf,
-      IdHelper idHelper,
-      boolean expectedTaskIdsBitmapFilterEnable,
-      RssConf rssConf) {
-    this(
-        appId,
-        shuffleId,
-        partitionId,
-        basePath,
-        partitionNumPerRange,
-        partitionNum,
-        blockIdBitmap,
-        taskIdBitmap,
-        shuffleServerInfoList,
-        hadoopConf,
-        idHelper,
-        expectedTaskIdsBitmapFilterEnable);
-    this.rssConf = rssConf;
-  }
-
-  public CreateShuffleReadClientRequest(
-      String appId,
-      int shuffleId,
-      int partitionId,
-      String basePath,
-      int partitionNumPerRange,
-      int partitionNum,
-      Roaring64NavigableMap blockIdBitmap,
-      Roaring64NavigableMap taskIdBitmap,
-      List<ShuffleServerInfo> shuffleServerInfoList,
-      Configuration hadoopConf,
-      IdHelper idHelper,
-      boolean expectedTaskIdsBitmapFilterEnable) {
-    this.appId = appId;
-    this.shuffleId = shuffleId;
-    this.partitionId = partitionId;
-    this.basePath = basePath;
-    this.partitionNumPerRange = partitionNumPerRange;
-    this.partitionNum = partitionNum;
-    this.blockIdBitmap = blockIdBitmap;
-    this.taskIdBitmap = taskIdBitmap;
-    this.shuffleServerInfoList = shuffleServerInfoList;
-    this.hadoopConf = hadoopConf;
-    this.idHelper = idHelper;
-    this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
-  }
-
-  public CreateShuffleReadClientRequest(
-      String appId,
-      int shuffleId,
-      int partitionId,
-      String basePath,
-      int partitionNumPerRange,
-      int partitionNum,
-      Roaring64NavigableMap blockIdBitmap,
-      Roaring64NavigableMap taskIdBitmap,
-      List<ShuffleServerInfo> shuffleServerInfoList,
-      Configuration hadoopConf,
-      boolean expectedTaskIdsBitmapFilterEnable,
-      RssConf rssConf) {
-    this(
-        appId,
-        shuffleId,
-        partitionId,
-        basePath,
-        partitionNumPerRange,
-        partitionNum,
-        blockIdBitmap,
-        taskIdBitmap,
-        shuffleServerInfoList,
-        hadoopConf,
-        new DefaultIdHelper(),
-        expectedTaskIdsBitmapFilterEnable);
-    this.rssConf = rssConf;
-  }
-
-  public String getAppId() {
-    return appId;
-  }
-
-  public int getShuffleId() {
-    return shuffleId;
-  }
-
-  public int getPartitionId() {
-    return partitionId;
-  }
-
-  public int getPartitionNumPerRange() {
-    return partitionNumPerRange;
-  }
-
-  public int getPartitionNum() {
-    return partitionNum;
-  }
-
-  public String getBasePath() {
-    return basePath;
-  }
-
-  public Roaring64NavigableMap getBlockIdBitmap() {
-    return blockIdBitmap;
-  }
-
-  public Roaring64NavigableMap getTaskIdBitmap() {
-    return taskIdBitmap;
-  }
-
-  public List<ShuffleServerInfo> getShuffleServerInfoList() {
-    return shuffleServerInfoList;
-  }
-
-  public Configuration getHadoopConf() {
-    return hadoopConf;
-  }
-
-  public IdHelper getIdHelper() {
-    return idHelper;
-  }
-
-  public ShuffleDataDistributionType getShuffleDataDistributionType() {
-    return shuffleDataDistributionType;
-  }
-
-  public boolean isExpectedTaskIdsBitmapFilterEnable() {
-    return expectedTaskIdsBitmapFilterEnable;
-  }
-
-  public RssConf getRssConf() {
-    return rssConf;
-  }
-}
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index 8ee0e9407..1b19ef084 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
@@ -35,8 +34,8 @@ import org.roaringbitmap.longlong.LongIterator;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.TestUtils;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.util.ChecksumUtils;
@@ -59,6 +58,19 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
   private ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0", "host1", 0);
   private ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0", "host2", 0);
 
+  private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+    return ShuffleClientFactory.newReadBuilder()
+        .storageType(StorageType.HDFS.name())
+        .appId("appId")
+        .shuffleId(0)
+        .partitionId(1)
+        .indexReadLimit(100)
+        .partitionNumPerRange(1)
+        .partitionNum(10)
+        .readBufferSize(1000)
+        .shuffleServerInfoList(Lists.newArrayList(ssi1));
+  }
+
   @Test
   public void readTest1() throws Exception {
     String basePath = HDFS_URI + "clientReadTest1";
@@ -69,24 +81,12 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 30, 0, expectedData, blockIdBitmap);
-
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            1,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
-
+        baseReadBuilder()
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
     readClient.close();
@@ -94,21 +94,14 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
     blockIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
     taskIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            1,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     TestUtils.validateResult(readClient, expectedData);
     try {
       // can't find all expected block id, data loss
@@ -136,22 +129,13 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
     writeTestData(writeHandler2, 2, 30, 0, expectedData, blockIdBitmap);
 
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            2,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1, ssi2),
-            new Configuration(),
-            new DefaultIdHelper());
-
+        baseReadBuilder()
+            .partitionNumPerRange(2)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(ssi1, ssi2))
+            .build();
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
     readClient.close();
@@ -203,22 +187,13 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
         conf);
 
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            2,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1, ssi2),
-            new Configuration(),
-            new DefaultIdHelper());
-
+        baseReadBuilder()
+            .partitionNumPerRange(2)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(ssi1, ssi2))
+            .build();
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
     readClient.close();
@@ -236,21 +211,12 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
     writeTestData(writeHandler, 2, 30, 0, expectedData, blockIdBitmap);
 
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            2,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .partitionNumPerRange(2)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     Path dataFile = new Path(basePath + "/appId/0/0-1/" + ssi1.getId() + "_0.data");
     // data file is deleted after readClient checkExpectedBlockIds
     fs.delete(dataFile, true);
@@ -282,23 +248,13 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 30, 0, expectedData, blockIdBitmap);
-
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            2,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .partitionNumPerRange(2)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     // index file is deleted after iterator initialization, it should be ok, all index infos are
     // read already
     Path indexFile = new Path(basePath + "/appId/0/0-1/" + ssi1.getId() + "_0.index");
@@ -324,39 +280,22 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
     writeTestData(writeHandler, 10, 30, 0, expectedData2, blockIdBitmap2);
 
     writeTestData(writeHandler, 10, 30, 0, expectedData1, blockIdBitmap1);
-
     ShuffleReadClientImpl readClient1 =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            0,
-            100,
-            2,
-            10,
-            100,
-            basePath,
-            blockIdBitmap1,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .partitionId(0)
+            .partitionNumPerRange(2)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap1)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
+
     final ShuffleReadClientImpl readClient2 =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            2,
-            10,
-            100,
-            basePath,
-            blockIdBitmap2,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .partitionNumPerRange(2)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap2)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     TestUtils.validateResult(readClient1, expectedData1);
     readClient1.checkProcessedBlockIds();
     readClient1.close();
@@ -376,39 +315,21 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 30, 0, expectedData, blockIdBitmap);
-
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            2,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .partitionNumPerRange(2)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     ShuffleReadClientImpl readClient2 =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            2,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1, ssi2),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .partitionNumPerRange(2)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(ssi1, ssi2))
+            .build();
     // crc32 is incorrect
     try (MockedStatic<ChecksumUtils> checksumUtilsMock = Mockito.mockStatic(ChecksumUtils.class)) {
       checksumUtilsMock.when(() -> ChecksumUtils.getCrc32((ByteBuffer) any())).thenReturn(-1L);
@@ -433,21 +354,12 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
   public void readTest9() {
     // empty data
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            2,
-            10,
-            1000,
-            "basePath",
-            Roaring64NavigableMap.bitmapOf(),
-            Roaring64NavigableMap.bitmapOf(),
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .partitionNumPerRange(2)
+            .basePath("basePath")
+            .blockIdBitmap(Roaring64NavigableMap.bitmapOf())
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf())
+            .build();
     assertNull(readClient.readShuffleBlockData());
     readClient.checkProcessedBlockIds();
   }
@@ -469,21 +381,13 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
     }
 
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            0,
-            100,
-            2,
-            10,
-            100,
-            basePath,
-            wrongBlockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .partitionId(0)
+            .partitionNumPerRange(2)
+            .basePath(basePath)
+            .blockIdBitmap(wrongBlockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     assertNull(readClient.readShuffleBlockData());
     try {
       readClient.checkProcessedBlockIds();
@@ -503,109 +407,58 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 10, 30, 0, expectedData, blockIdBitmap);
-
     // test with different indexReadLimit to validate result
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            1,
-            1,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
-
+        baseReadBuilder()
+            .indexReadLimit(1)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
     readClient.close();
 
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            2,
-            1,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
-
+        baseReadBuilder()
+            .indexReadLimit(2)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
     readClient.close();
 
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            3,
-            1,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
-
+        baseReadBuilder()
+            .indexReadLimit(3)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
     readClient.close();
 
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            10,
-            1,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
-
+        baseReadBuilder()
+            .indexReadLimit(10)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
     readClient.close();
 
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            11,
-            1,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
-
+        baseReadBuilder()
+            .indexReadLimit(11)
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
     readClient.close();
@@ -626,22 +479,11 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
 
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            1,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
-
+        baseReadBuilder()
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     TestUtils.validateResult(readClient, expectedData);
     assertEquals(15, readClient.getProcessedBlockIds().getLongCardinality());
     readClient.checkProcessedBlockIds();
@@ -666,22 +508,11 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
 
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            1,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
-
+        baseReadBuilder()
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     TestUtils.validateResult(readClient, expectedData);
     assertEquals(20, readClient.getProcessedBlockIds().getLongCardinality());
     readClient.checkProcessedBlockIds();
@@ -703,22 +534,11 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
 
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            1,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
-
+        baseReadBuilder()
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     TestUtils.validateResult(readClient, expectedData);
     assertEquals(15, readClient.getProcessedBlockIds().getLongCardinality());
     readClient.checkProcessedBlockIds();
@@ -742,22 +562,11 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
     writeTestData(writeHandler, 5, 30, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            "appId",
-            0,
-            1,
-            100,
-            1,
-            10,
-            1000,
-            basePath,
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(ssi1),
-            new Configuration(),
-            new DefaultIdHelper());
-
+        baseReadBuilder()
+            .basePath(basePath)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     TestUtils.validateResult(readClient, expectedData);
     assertEquals(25, readClient.getProcessedBlockIds().getLongCardinality());
     readClient.checkProcessedBlockIds();
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
index 70697d511..d55317cf7 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
@@ -36,13 +36,13 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
 import org.apache.uniffle.client.request.RssFinishShuffleRequest;
 import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.client.request.RssSendCommitRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
-import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
@@ -112,21 +112,21 @@ public class DiskErrorToleranceTest extends ShuffleReadWriteBase {
     RssFinishShuffleRequest rf1 = new RssFinishShuffleRequest(appId, 0);
     shuffleServerClient.finishShuffle(rf1);
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            appId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            null,
-            blockIdBitmap1,
-            Roaring64NavigableMap.bitmapOf(1),
-            shuffleServerInfo,
-            conf,
-            new DefaultIdHelper());
+        ShuffleClientFactory.newReadBuilder()
+            .storageType(StorageType.LOCALFILE.name())
+            .appId(appId)
+            .shuffleId(0)
+            .partitionId(0)
+            .indexReadLimit(100)
+            .partitionNumPerRange(1)
+            .partitionNum(10)
+            .readBufferSize(1000)
+            .basePath(null)
+            .blockIdBitmap(blockIdBitmap1)
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf(1))
+            .shuffleServerInfoList(shuffleServerInfo)
+            .hadoopConf(conf)
+            .build();
     validateResult(readClient, expectedData);
 
     File shuffleData = new File(data2, appId);
@@ -153,21 +153,21 @@ public class DiskErrorToleranceTest extends ShuffleReadWriteBase {
     shuffleServerClient.finishShuffle(rf1);
 
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            appId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            null,
-            blockIdBitmap2,
-            Roaring64NavigableMap.bitmapOf(2),
-            shuffleServerInfo,
-            conf,
-            new DefaultIdHelper());
+        ShuffleClientFactory.newReadBuilder()
+            .storageType(StorageType.LOCALFILE.name())
+            .appId(appId)
+            .shuffleId(0)
+            .partitionId(0)
+            .indexReadLimit(100)
+            .partitionNumPerRange(1)
+            .partitionNum(10)
+            .readBufferSize(1000)
+            .basePath(null)
+            .blockIdBitmap(blockIdBitmap2)
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf(2))
+            .shuffleServerInfoList(shuffleServerInfo)
+            .hadoopConf(conf)
+            .build();
     validateResult(readClient, expectedData);
     shuffleData = new File(data1, appId);
     assertTrue(shuffleData.exists());
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
index bde7222d2..d2b27b952 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
@@ -30,6 +30,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
@@ -39,7 +40,6 @@ import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
 import org.apache.uniffle.client.request.RssSendCommitRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
@@ -122,21 +122,22 @@ public abstract class HybridStorageFaultToleranceBase extends ShuffleReadWriteBa
       Roaring64NavigableMap taskBitmap,
       Map<Long, byte[]> expectedData) {
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE_HDFS.name(),
-            appId,
-            shuffleId,
-            partitionId,
-            100,
-            1,
-            10,
-            1000,
-            REMOTE_STORAGE,
-            blockBitmap,
-            taskBitmap,
-            Lists.newArrayList(new ShuffleServerInfo(LOCALHOST, SHUFFLE_SERVER_PORT)),
-            conf,
-            new DefaultIdHelper());
+        ShuffleClientFactory.newReadBuilder()
+            .storageType(StorageType.LOCALFILE_HDFS.name())
+            .appId(appId)
+            .shuffleId(shuffleId)
+            .partitionId(partitionId)
+            .indexReadLimit(100)
+            .partitionNumPerRange(1)
+            .partitionNum(10)
+            .readBufferSize(1000)
+            .basePath(REMOTE_STORAGE)
+            .blockIdBitmap(blockBitmap)
+            .taskIdBitmap(taskBitmap)
+            .shuffleServerInfoList(
+                Lists.newArrayList(new ShuffleServerInfo(LOCALHOST, SHUFFLE_SERVER_PORT)))
+            .hadoopConf(conf)
+            .build();
     CompressedShuffleBlock csb = readClient.readShuffleBlockData();
     Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
     while (csb != null && csb.getByteBuffer() != null) {
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index f653103fd..03acbc74f 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -37,7 +37,6 @@ import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
 import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
 import org.apache.uniffle.client.response.SendShuffleDataResult;
-import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
@@ -72,6 +71,17 @@ public class QuorumTest extends ShuffleReadWriteBase {
   private static ShuffleServerInfo fakedShuffleServerInfo4;
   private MockedShuffleWriteClientImpl shuffleWriteClientImpl;
 
+  private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+    return ShuffleClientFactory.newReadBuilder()
+        .storageType(StorageType.MEMORY_LOCALFILE.name())
+        .shuffleId(0)
+        .partitionId(0)
+        .indexReadLimit(100)
+        .partitionNumPerRange(1)
+        .partitionNum(10)
+        .readBufferSize(1000);
+  }
+
   public static MockedShuffleServer createServer(int id, File tmpDir) throws Exception {
     ShuffleServerConf shuffleServerConf = getShuffleServerConf();
     shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000);
@@ -244,22 +254,15 @@ public class QuorumTest extends ShuffleReadWriteBase {
     assertEquals(blockIdBitmap, succBlockIdBitmap);
 
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, fakedShuffleServerInfo2),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(
+                Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, fakedShuffleServerInfo2))
+            .build();
     // The data should be read
     validateResult(readClient, expectedData);
 
@@ -398,21 +401,13 @@ public class QuorumTest extends ShuffleReadWriteBase {
 
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(
+                Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2))
+            .build();
     validateResult(readClient, expectedData);
   }
 
@@ -427,7 +422,6 @@ public class QuorumTest extends ShuffleReadWriteBase {
     // When 2 servers are timeout, the block sending should fail
     enableTimeout((MockedShuffleServer) shuffleServers.get(1), 500);
     enableTimeout((MockedShuffleServer) shuffleServers.get(2), 500);
-
     List<ShuffleBlockInfo> blocks =
         createShuffleBlockList(
             0,
@@ -579,21 +573,13 @@ public class QuorumTest extends ShuffleReadWriteBase {
     }
 
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(
+                Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2))
+            .build();
     validateResult(readClient, expectedData);
   }
 
@@ -768,59 +754,33 @@ public class QuorumTest extends ShuffleReadWriteBase {
 
     // we cannot read any blocks from server 2
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo2),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo2))
+            .build();
     assertTrue(readClient.readShuffleBlockData() == null);
 
     // we can read blocks from server 0,1
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1))
+            .build();
     validateResult(readClient, expectedData);
 
     // we can also read blocks from server 0,1,2
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(
+                Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2))
+            .build();
     validateResult(readClient, expectedData);
   }
 
@@ -854,59 +814,33 @@ public class QuorumTest extends ShuffleReadWriteBase {
 
     // we cannot read any blocks from server 1
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo1),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo1))
+            .build();
     assertTrue(readClient.readShuffleBlockData() == null);
 
     // we can read blocks from server 2, which is sent in to secondary round
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo2),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo2))
+            .build();
     validateResult(readClient, expectedData);
 
     // we can read blocks from server 0,1,2
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo0, fakedShuffleServerInfo1, shuffleServerInfo2),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(
+                Lists.newArrayList(shuffleServerInfo0, fakedShuffleServerInfo1, shuffleServerInfo2))
+            .build();
     validateResult(readClient, expectedData);
   }
 
@@ -944,40 +878,23 @@ public class QuorumTest extends ShuffleReadWriteBase {
 
     // we cannot read any blocks from server 3, 4
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo3, shuffleServerInfo4),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo3, shuffleServerInfo4))
+            .build();
     assertTrue(readClient.readShuffleBlockData() == null);
 
     // we can also read blocks from server 0,1,2
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(
+                Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2))
+            .build();
     validateResult(readClient, expectedData);
   }
 
@@ -1016,40 +933,22 @@ public class QuorumTest extends ShuffleReadWriteBase {
 
     // we cannot read any blocks from server 1 due to failures
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo1),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo1))
+            .build();
     assertTrue(readClient.readShuffleBlockData() == null);
 
     // we can also read blocks from server 3,4
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo3, shuffleServerInfo4),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo3, shuffleServerInfo4))
+            .build();
     validateResult(readClient, expectedData);
   }
 
@@ -1087,41 +986,24 @@ public class QuorumTest extends ShuffleReadWriteBase {
 
     // we cannot read any blocks from server 4 because the secondary round is skipped
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo4),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo4))
+            .build();
     assertTrue(readClient.readShuffleBlockData() == null);
 
     // we can read blocks from server 0,1,2,3
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(
-                shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2, shuffleServerInfo3),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(
+                Lists.newArrayList(
+                    shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2, shuffleServerInfo3))
+            .build();
     validateResult(readClient, expectedData);
   }
 
@@ -1153,59 +1035,32 @@ public class QuorumTest extends ShuffleReadWriteBase {
 
     // we can read blocks from server 0
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo0),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo0))
+            .build();
     validateResult(readClient, expectedData);
 
     // we can also read blocks from server 1
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo1),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo1))
+            .build();
     validateResult(readClient, expectedData);
 
     // we can also read blocks from server 2
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.MEMORY_LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo2),
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo2))
+            .build();
     validateResult(readClient, expectedData);
   }
 
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
index e46e87447..eee24a3a2 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
@@ -30,7 +30,6 @@ import java.util.stream.Stream;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.BeforeAll;
@@ -39,12 +38,12 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.client.request.RssFinishShuffleRequest;
 import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.client.request.RssSendCommitRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
-import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -144,23 +143,21 @@ public class ShuffleServerConcurrentWriteOfHadoopTest extends ShuffleServerWithH
                 blocksBitmap.add(iterator.next());
               }
             });
-
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            appId,
-            0,
-            0,
-            100,
-            2,
-            10,
-            1000,
-            dataBasePath,
-            blocksBitmap,
-            Roaring64NavigableMap.bitmapOf(0),
-            Lists.newArrayList(ssi),
-            new Configuration(),
-            new DefaultIdHelper());
+        ShuffleClientFactory.newReadBuilder()
+            .storageType(StorageType.HDFS.name())
+            .appId(appId)
+            .shuffleId(0)
+            .partitionId(0)
+            .indexReadLimit(100)
+            .partitionNumPerRange(2)
+            .partitionNum(10)
+            .readBufferSize(1000)
+            .basePath(dataBasePath)
+            .blockIdBitmap(blocksBitmap)
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf(0))
+            .shuffleServerInfoList(Lists.newArrayList(ssi))
+            .build();
 
     validateResult(readClient, expectedDataList, blocksBitmap);
   }
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
index 4ae09ba1e..590355ab1 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
@@ -23,13 +23,13 @@ import java.util.Map.Entry;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
 import org.apache.uniffle.client.request.RssFinishShuffleRequest;
@@ -37,7 +37,6 @@ import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.client.request.RssSendCommitRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
@@ -73,6 +72,17 @@ public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase {
     shuffleServerClient.close();
   }
 
+  private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+    return ShuffleClientFactory.newReadBuilder()
+        .storageType(StorageType.HDFS.name())
+        .shuffleId(0)
+        .partitionId(0)
+        .indexReadLimit(100)
+        .partitionNumPerRange(2)
+        .partitionNum(10)
+        .readBufferSize(1000);
+  }
+
   @Test
   public void hadoopWriteReadTest() {
     String appId = "app_hdfs_read_write";
@@ -106,22 +116,15 @@ public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase {
     RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(appId, 0);
 
     ShuffleServerInfo ssi = new ShuffleServerInfo(LOCALHOST, SHUFFLE_SERVER_PORT);
+
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            appId,
-            0,
-            0,
-            100,
-            2,
-            10,
-            1000,
-            dataBasePath,
-            bitmaps[0],
-            Roaring64NavigableMap.bitmapOf(0),
-            Lists.newArrayList(ssi),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(appId)
+            .basePath(dataBasePath)
+            .blockIdBitmap(bitmaps[0])
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf(0))
+            .shuffleServerInfoList(Lists.newArrayList(ssi))
+            .build();
     assertNull(readClient.readShuffleBlockData());
     shuffleServerClient.finishShuffle(rfsr);
 
@@ -149,75 +152,46 @@ public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase {
     shuffleServerClient.finishShuffle(rfsr);
 
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            appId,
-            0,
-            0,
-            100,
-            2,
-            10,
-            1000,
-            dataBasePath,
-            bitmaps[0],
-            Roaring64NavigableMap.bitmapOf(0),
-            Lists.newArrayList(ssi),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(appId)
+            .basePath(dataBasePath)
+            .blockIdBitmap(bitmaps[0])
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf(0))
+            .shuffleServerInfoList(Lists.newArrayList(ssi))
+            .build();
     validateResult(readClient, expectedData, bitmaps[0]);
 
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            appId,
-            0,
-            1,
-            100,
-            2,
-            10,
-            1000,
-            dataBasePath,
-            bitmaps[1],
-            Roaring64NavigableMap.bitmapOf(1),
-            Lists.newArrayList(ssi),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(appId)
+            .partitionId(1)
+            .basePath(dataBasePath)
+            .blockIdBitmap(bitmaps[1])
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf(1))
+            .shuffleServerInfoList(Lists.newArrayList(ssi))
+            .build();
     validateResult(readClient, expectedData, bitmaps[1]);
 
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            appId,
-            0,
-            2,
-            100,
-            2,
-            10,
-            1000,
-            dataBasePath,
-            bitmaps[2],
-            Roaring64NavigableMap.bitmapOf(2),
-            Lists.newArrayList(ssi),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(appId)
+            .partitionId(2)
+            .basePath(dataBasePath)
+            .blockIdBitmap(bitmaps[2])
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf(2))
+            .shuffleServerInfoList(Lists.newArrayList(ssi))
+            .build();
     validateResult(readClient, expectedData, bitmaps[2]);
 
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            appId,
-            0,
-            3,
-            100,
-            2,
-            10,
-            1000,
-            dataBasePath,
-            bitmaps[3],
-            Roaring64NavigableMap.bitmapOf(3),
-            Lists.newArrayList(ssi),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(appId)
+            .partitionId(3)
+            .basePath(dataBasePath)
+            .blockIdBitmap(bitmaps[3])
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf(3))
+            .shuffleServerInfoList(Lists.newArrayList(ssi))
+            .build();
     validateResult(readClient, expectedData, bitmaps[3]);
   }
 
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
index 60ad05358..69c11db53 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.io.TempDir;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.TestUtils;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
 import org.apache.uniffle.client.request.RssFinishShuffleRequest;
@@ -41,7 +42,6 @@ import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.client.request.RssSendCommitRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.KerberizedHadoopBase;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
@@ -177,6 +177,17 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase
     return partitionToBlocks;
   }
 
+  private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+    return ShuffleClientFactory.newReadBuilder()
+        .storageType(StorageType.HDFS.name())
+        .shuffleId(0)
+        .partitionId(0)
+        .indexReadLimit(100)
+        .partitionNumPerRange(2)
+        .partitionNum(10)
+        .readBufferSize(1000);
+  }
+
   @Test
   public void hadoopWriteReadTest() throws Exception {
     String alexDir = kerberizedHadoop.getSchemeAndAuthorityPrefix() + "/alex/";
@@ -229,21 +240,13 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase
 
     ShuffleServerInfo ssi = new ShuffleServerInfo(LOCALHOST, SHUFFLE_SERVER_PORT);
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            appId,
-            0,
-            0,
-            100,
-            2,
-            10,
-            1000,
-            dataBasePath,
-            bitmaps[0],
-            Roaring64NavigableMap.bitmapOf(0),
-            Lists.newArrayList(ssi),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(appId)
+            .basePath(dataBasePath)
+            .blockIdBitmap(bitmaps[0])
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf(0))
+            .shuffleServerInfoList(Lists.newArrayList(ssi))
+            .build();
     assertNull(readClient.readShuffleBlockData());
     shuffleServerClient.finishShuffle(rfsr);
 
@@ -271,75 +274,46 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase
     shuffleServerClient.finishShuffle(rfsr);
 
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            appId,
-            0,
-            0,
-            100,
-            2,
-            10,
-            1000,
-            dataBasePath,
-            bitmaps[0],
-            Roaring64NavigableMap.bitmapOf(0),
-            Lists.newArrayList(ssi),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(appId)
+            .basePath(dataBasePath)
+            .blockIdBitmap(bitmaps[0])
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf(0))
+            .shuffleServerInfoList(Lists.newArrayList(ssi))
+            .build();
     validateResult(readClient, expectedData, bitmaps[0]);
 
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            appId,
-            0,
-            1,
-            100,
-            2,
-            10,
-            1000,
-            dataBasePath,
-            bitmaps[1],
-            Roaring64NavigableMap.bitmapOf(1),
-            Lists.newArrayList(ssi),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(appId)
+            .partitionId(1)
+            .basePath(dataBasePath)
+            .blockIdBitmap(bitmaps[1])
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf(1))
+            .shuffleServerInfoList(Lists.newArrayList(ssi))
+            .build();
     validateResult(readClient, expectedData, bitmaps[1]);
 
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            appId,
-            0,
-            2,
-            100,
-            2,
-            10,
-            1000,
-            dataBasePath,
-            bitmaps[2],
-            Roaring64NavigableMap.bitmapOf(2),
-            Lists.newArrayList(ssi),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(appId)
+            .partitionId(2)
+            .basePath(dataBasePath)
+            .blockIdBitmap(bitmaps[2])
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf(2))
+            .shuffleServerInfoList(Lists.newArrayList(ssi))
+            .build();
     validateResult(readClient, expectedData, bitmaps[2]);
 
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.HDFS.name(),
-            appId,
-            0,
-            3,
-            100,
-            2,
-            10,
-            1000,
-            dataBasePath,
-            bitmaps[3],
-            Roaring64NavigableMap.bitmapOf(3),
-            Lists.newArrayList(ssi),
-            new Configuration(),
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(appId)
+            .partitionId(3)
+            .basePath(dataBasePath)
+            .blockIdBitmap(bitmaps[3])
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf(3))
+            .shuffleServerInfoList(Lists.newArrayList(ssi))
+            .build();
     validateResult(readClient, expectedData, bitmaps[3]);
   }
 
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 2ad1e0492..eee83cb34 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -37,7 +37,6 @@ import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
 import org.apache.uniffle.client.response.SendShuffleDataResult;
 import org.apache.uniffle.client.util.ClientUtils;
-import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
@@ -344,21 +343,20 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
     assertTrue(commitResult);
 
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2),
-            null,
-            new DefaultIdHelper());
+        ShuffleClientFactory.newReadBuilder()
+            .storageType(StorageType.LOCALFILE.name())
+            .appId(testAppId)
+            .shuffleId(0)
+            .partitionId(0)
+            .indexReadLimit(100)
+            .partitionNumPerRange(1)
+            .partitionNum(10)
+            .readBufferSize(1000)
+            .basePath("")
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2))
+            .build();
     assertNull(readClient.readShuffleBlockData());
     readClient.close();
 
@@ -368,21 +366,20 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
             Sets.newHashSet(shuffleServerInfo1, shuffleServerInfo2), testAppId, 0, 2);
     assertTrue(commitResult);
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2),
-            null,
-            new DefaultIdHelper());
+        ShuffleClientFactory.newReadBuilder()
+            .storageType(StorageType.LOCALFILE.name())
+            .appId(testAppId)
+            .shuffleId(0)
+            .partitionId(0)
+            .indexReadLimit(100)
+            .partitionNumPerRange(1)
+            .partitionNum(10)
+            .readBufferSize(1000)
+            .basePath("")
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2))
+            .build();
     validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
     readClient.close();
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index c876c937d..2eb2ffad7 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -32,13 +32,13 @@ import org.junit.jupiter.api.io.TempDir;
 import org.roaringbitmap.longlong.LongIterator;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
 import org.apache.uniffle.client.request.RssFinishShuffleRequest;
 import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.client.request.RssSendCommitRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
-import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
@@ -85,6 +85,18 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
     shuffleServerClient.close();
   }
 
+  private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+    return ShuffleClientFactory.newReadBuilder()
+        .storageType(StorageType.LOCALFILE.name())
+        .shuffleId(0)
+        .partitionId(0)
+        .indexReadLimit(100)
+        .partitionNumPerRange(1)
+        .partitionNum(10)
+        .readBufferSize(1000)
+        .shuffleServerInfoList(shuffleServerInfo);
+  }
+
   @Test
   public void readTest1() {
     String testAppId = "localReadTest1";
@@ -96,21 +108,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
     blockIdBitmap.addLong((1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
     ShuffleReadClientImpl readClient;
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            shuffleServerInfo,
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     validateResult(readClient, expectedData);
     try {
       // can't find all expected block id, data loss
@@ -138,21 +140,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
     sendTestData(testAppId, blocks);
 
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            shuffleServerInfo,
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
 
     validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
@@ -177,21 +169,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
 
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            shuffleServerInfo,
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     assertNull(readClient.readShuffleBlockData());
     readClient.close();
   }
@@ -217,37 +199,20 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
     sendTestData(testAppId, blocks);
 
     ShuffleReadClientImpl readClient1 =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            2,
-            10,
-            100,
-            "",
-            blockIdBitmap1,
-            taskIdBitmap,
-            shuffleServerInfo,
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .partitionNumPerRange(2)
+            .blockIdBitmap(blockIdBitmap1)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     final ShuffleReadClientImpl readClient2 =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            1,
-            100,
-            2,
-            10,
-            100,
-            "",
-            blockIdBitmap2,
-            taskIdBitmap,
-            shuffleServerInfo,
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .partitionId(1)
+            .partitionNumPerRange(2)
+            .blockIdBitmap(blockIdBitmap2)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     validateResult(readClient1, expectedData1);
     readClient1.checkProcessedBlockIds();
     readClient1.close();
@@ -261,21 +226,13 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
   public void readTest5() {
     String testAppId = "localReadTest5";
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            1,
-            100,
-            2,
-            10,
-            1000,
-            "",
-            Roaring64NavigableMap.bitmapOf(),
-            Roaring64NavigableMap.bitmapOf(),
-            shuffleServerInfo,
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .partitionId(1)
+            .partitionNumPerRange(2)
+            .blockIdBitmap(Roaring64NavigableMap.bitmapOf())
+            .taskIdBitmap(Roaring64NavigableMap.bitmapOf())
+            .build();
     assertNull(readClient.readShuffleBlockData());
     readClient.checkProcessedBlockIds();
   }
@@ -299,21 +256,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
     }
 
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            100,
-            "",
-            wrongBlockIdBitmap,
-            taskIdBitmap,
-            shuffleServerInfo,
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(wrongBlockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     assertNull(readClient.readShuffleBlockData());
     try {
       readClient.checkProcessedBlockIds();
@@ -344,21 +291,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
 
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            shuffleServerInfo,
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
 
     validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
@@ -393,21 +330,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
 
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            shuffleServerInfo,
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
 
     validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
@@ -431,43 +358,28 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
     sendTestData(testAppId, blocks);
     // test with un-changed expected blockId
     ShuffleReadClientImpl readClient;
+    baseReadBuilder()
+        .appId(testAppId)
+        .blockIdBitmap(beforeAdded)
+        .taskIdBitmap(taskIdBitmap)
+        .build();
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            beforeAdded,
-            taskIdBitmap,
-            shuffleServerInfo,
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(beforeAdded)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
     readClient.close();
 
     // test with changed expected blockId
     readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            shuffleServerInfo,
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
     readClient.close();
@@ -492,22 +404,17 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
     // send some expected data
     blocks = createShuffleBlockList(0, 0, 1, 2, 30, expectedBlockIds, expectedData, mockSSI);
     sendTestData(testAppId, blocks);
+    baseReadBuilder()
+        .appId(testAppId)
+        .blockIdBitmap(expectedBlockIds)
+        .taskIdBitmap(taskIdBitmap)
+        .build();
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            expectedBlockIds,
-            taskIdBitmap,
-            shuffleServerInfo,
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(expectedBlockIds)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
 
     validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
@@ -546,21 +453,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
     sendTestData(testAppId, blocks);
 
     ShuffleReadClientImpl readClient =
-        new ShuffleReadClientImpl(
-            StorageType.LOCALFILE.name(),
-            testAppId,
-            0,
-            0,
-            100,
-            1,
-            10,
-            1000,
-            "",
-            blockIdBitmap,
-            taskIdBitmap,
-            shuffleServerInfo,
-            null,
-            new DefaultIdHelper());
+        baseReadBuilder()
+            .appId(testAppId)
+            .blockIdBitmap(blockIdBitmap)
+            .taskIdBitmap(taskIdBitmap)
+            .build();
     validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
     readClient.close();