You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/11/05 06:55:10 UTC

[incubator-uniffle] branch master updated: [ISSUE-137][Improvement][AQE] Sort MapId before the data are flushed (#293)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 74949f53 [ISSUE-137][Improvement][AQE] Sort MapId before the data are flushed (#293)
74949f53 is described below

commit 74949f53f611cb4a65ffb1e0b083893980c79419
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Sat Nov 5 14:55:04 2022 +0800

    [ISSUE-137][Improvement][AQE] Sort MapId before the data are flushed (#293)
    
    ### What changes were proposed in this pull request?
    
    Introduce a new mechanism to determine  how to write data to file, including directly append or append after sort.
    
    ### Why are the changes needed?
    In our internal uniffle deployment, 200+ shuffle-servers are in service. A single shuffle-server uses 4 SATA SSDs to be used as the localfile storage, the max network bandwidth is limited to 5G/s. The storageType of the shuffle-server is MEMORY_LOCALFILE.
    
    After monitoring the read_data_rate metric, I found it always will reach the max network bandwidth. However, at that time, the number of apps running was low. And only single disk usage is 100%.
    
    After digging into the shuffle-server’s log, I found almost all requests with the same AppId and the same Partition to get the shuffle data from the same partition data file. This indicates the reason for high disk utilization due to the hotspots of reading.
    
    It was found that this App’s shuffle-read was optimized by AQE skew data split, which causes the Uniffle shuffle-server high-pressure of network and diskIO.
    
    After catching this point, I analyzed the performance of historical tasks using different shuffle-services briefly.
    
    And in current implementation, one partition’s buffer will be flushed to disk once the size reaches the threshold of 64M. And the spark/mr uniffle client will fetch one batch data of 14M size(default value). That means for one buffer of one partition, the client needs to have 5 network interactions with the shuffle-server if the data with MapId is relatively discrete.
    
    To solve this problem, we could make the 64M buffer’s data sorted by MapId. That means for the uniffle client, ideally it will read one time in a single buffer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    1. Introduce the shuffle-data distribution type, NORMAL or LOCAL_ORDER, which can have other implementations, like GLOBAL_ORDER.
    2. Make the segment split strategy as a general interface for above different data distribution type.
    
    
    ### How was this patch tested?
    
    1. UTs
    2. Spark Tests on offline hadoop cluster
    
    ### Benchmark
    Table1: 100g,  dtypes: Array[(String, String)] = Array((v1,StringType), (k1,IntegerType)). And all columns of k1 have the same value (value = 10)
    
    Table2: 10 records, dtypes: Array[(String, String)] = Array((k2,IntegerType), (v2,StringType)). And it has the only one record of k2=10
    
    Environment: 100 executors(1core2g)
    SQL: spark.sql("select * from Table1,Table2 where k1 = k2").write.mode("overwrite").parquet("xxxxxx")
    
    - Uniffle without patch: cost 12min
    - Uniffle with patch: cost 4min
    
    ### Reference
    1. Design doc: https://docs.google.com/document/d/1G0cOFVJbYLf2oX1fiadh7zi2M6DlEcjTQTh4kSkb0LA/edit?usp=sharing
---
 .../hadoop/mapreduce/v2/app/RssMRAppMaster.java    |  13 +-
 .../hadoop/mapred/SortWriteBufferManagerTest.java  |   4 +-
 .../hadoop/mapreduce/task/reduce/FetcherTest.java  |   4 +-
 .../apache/spark/shuffle/RssShuffleManager.java    |   9 +-
 .../apache/spark/shuffle/RssShuffleManager.java    |  13 +-
 .../spark/shuffle/reader/RssShuffleReader.java     |   9 +-
 .../spark/shuffle/reader/RssShuffleReaderTest.java |  17 +-
 .../uniffle/client/api/ShuffleWriteClient.java     |   4 +-
 .../client/factory/ShuffleClientFactory.java       |   2 +-
 .../uniffle/client/impl/ShuffleReadClientImpl.java |  26 ++
 .../client/impl/ShuffleWriteClientImpl.java        |   6 +-
 .../request/CreateShuffleReadClientRequest.java    |  27 ++
 ...tConf.java => ShuffleDataDistributionType.java} |  26 +-
 .../uniffle/common/config/RssClientConf.java       |   8 +
 .../common/segment/FixedSizeSegmentSplitter.java   | 103 +++++++
 .../common/segment/LocalOrderSegmentSplitter.java  | 130 +++++++++
 .../SegmentSplitter.java}                          |  21 +-
 .../common/segment/SegmentSplitterFactory.java     |  50 ++++
 .../org/apache/uniffle/common/util/RssUtils.java   |  71 -----
 .../segment/FixedSizeSegmentSplitterTest.java      |  83 ++++++
 .../segment/LocalOrderSegmentSplitterTest.java     | 128 +++++++++
 .../apache/uniffle/common/util/RssUtilsTest.java   |  67 -----
 docs/client_guide.md                               |  12 +
 .../java/org/apache/uniffle/test/QuorumTest.java   |  11 +-
 .../apache/uniffle/test/ShuffleReadWriteBase.java  |  37 ++-
 .../apache/uniffle/test/ShuffleServerGrpcTest.java |   6 +-
 .../test/ShuffleServerWithKerberizedHdfsTest.java  |   7 +-
 .../ShuffleServerWithLocalOfLocalOrderTest.java    | 315 +++++++++++++++++++++
 .../uniffle/test/ShuffleServerWithLocalTest.java   |   2 +-
 .../uniffle/test/ShuffleWithRssClientTest.java     |  71 ++++-
 .../org/apache/uniffle/test/AQESkewedJoinTest.java |   2 +-
 .../test/AQESkewedJoinWithLocalOrderTest.java      |  49 ++++
 .../client/impl/grpc/ShuffleServerGrpcClient.java  |   8 +-
 .../client/request/RssRegisterShuffleRequest.java  |  18 +-
 proto/src/main/proto/Rss.proto                     |   6 +
 .../apache/uniffle/server/ShuffleFlushManager.java |   5 +
 .../uniffle/server/ShuffleServerGrpcService.java   |  13 +-
 .../org/apache/uniffle/server/ShuffleTaskInfo.java |  14 +
 .../apache/uniffle/server/ShuffleTaskManager.java  |  27 ++
 .../uniffle/server/buffer/ShuffleBuffer.java       |  19 +-
 .../server/buffer/ShuffleBufferManager.java        |  10 +-
 .../uniffle/server/ShuffleTaskManagerTest.java     |  31 +-
 .../server/buffer/ShuffleBufferManagerTest.java    |   5 +
 .../storage/factory/ShuffleHandlerFactory.java     |  11 +-
 .../handler/impl/DataSkippableReadHandler.java     |  18 +-
 .../handler/impl/HdfsClientReadHandler.java        |  31 +-
 .../handler/impl/HdfsShuffleReadHandler.java       |  22 +-
 .../handler/impl/LocalFileClientReadHandler.java   |  10 +-
 .../impl/LocalFileQuorumClientReadHandler.java     |  30 +-
 .../request/CreateShuffleReadHandlerRequest.java   |  19 ++
 .../handler/impl/LocalFileHandlerTestBase.java     |   5 +-
 .../impl/LocalFileServerReadHandlerTest.java       |   4 +-
 52 files changed, 1371 insertions(+), 268 deletions(-)

diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index a3cb2700..751b1e08 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -76,6 +76,7 @@ import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
@@ -220,10 +221,14 @@ public class RssMRAppMaster extends MRAppMaster {
           }
           LOG.info("Start to register shuffle");
           long start = System.currentTimeMillis();
-          serverToPartitionRanges.entrySet().forEach(entry -> {
-            client.registerShuffle(
-                entry.getKey(), appId, 0, entry.getValue(), remoteStorage);
-          });
+          serverToPartitionRanges.entrySet().forEach(entry -> client.registerShuffle(
+              entry.getKey(),
+              appId,
+              0,
+              entry.getValue(),
+              remoteStorage,
+              ShuffleDataDistributionType.NORMAL
+          ));
           LOG.info("Finish register shuffle with " + (System.currentTimeMillis() - start) + " ms");
           return shuffleAssignments;
         }, retryInterval, retryTimes);
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index 305a9dcb..d75d7647 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -37,6 +37,7 @@ import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
@@ -290,7 +291,8 @@ public class SortWriteBufferManagerTest {
         String appId,
         int shuffleId,
         List<PartitionRange> partitionRanges,
-        RemoteStorageInfo remoteStorage) {
+        RemoteStorageInfo remoteStorage,
+        ShuffleDataDistributionType distributionType) {
     }
 
     @Override
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index b5404e59..1b920aed 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -67,6 +67,7 @@ import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.compression.Lz4Codec;
@@ -386,7 +387,8 @@ public class FetcherTest {
         String appId,
         int shuffleId,
         List<PartitionRange> partitionRanges,
-        RemoteStorageInfo storageType) {
+        RemoteStorageInfo storageType,
+        ShuffleDataDistributionType distributionType) {
 
     }
 
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 26022a54..cf79c70c 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -59,6 +59,7 @@ import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.RetryUtils;
@@ -279,7 +280,13 @@ public class RssShuffleManager implements ShuffleManager {
         .stream()
         .forEach(entry -> {
           shuffleWriteClient.registerShuffle(
-              entry.getKey(), appId, shuffleId, entry.getValue(), remoteStorage);
+              entry.getKey(),
+              appId,
+              shuffleId,
+              entry.getValue(),
+              remoteStorage,
+              ShuffleDataDistributionType.NORMAL
+          );
         });
     LOG.info("Finish register shuffleId[" + shuffleId + "] with " + (System.currentTimeMillis() - start) + " ms");
   }
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index ea29a4cd..27368d20 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -64,7 +64,9 @@ import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
@@ -92,6 +94,7 @@ public class RssShuffleManager implements ShuffleManager {
   private ScheduledExecutorService heartBeatScheduledExecutorService;
   private boolean heartbeatStarted = false;
   private boolean dynamicConfEnabled = false;
+  private final ShuffleDataDistributionType dataDistributionType;
   private final EventLoop eventLoop;
   private final EventLoop defaultEventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {
 
@@ -155,6 +158,7 @@ public class RssShuffleManager implements ShuffleManager {
     final int retryMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX);
     this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
     this.dynamicConfEnabled = sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED);
+    this.dataDistributionType = RssSparkConfig.toRssConf(sparkConf).get(RssClientConf.DATA_DISTRIBUTION_TYPE);
     long retryIntervalMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
     int heartBeatThreadNum = sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
     this.dataTransferPoolSize = sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
@@ -205,6 +209,7 @@ public class RssShuffleManager implements ShuffleManager {
       Map<String, Set<Long>> taskToFailedBlockIds) {
     this.sparkConf = conf;
     this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
+    this.dataDistributionType = RssSparkConfig.toRssConf(sparkConf).get(RssClientConf.DATA_DISTRIBUTION_TYPE);
     this.heartbeatInterval = sparkConf.get(RssSparkConfig.RSS_HEARTBEAT_INTERVAL);
     this.heartbeatTimeout = sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT.key(), heartbeatInterval / 2);
     this.dataReplica = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA);
@@ -460,7 +465,9 @@ public class RssShuffleManager implements ShuffleManager {
         RssUtils.generatePartitionToBitmap(blockIdBitmap, startPartition, endPartition),
         taskIdBitmap,
         readMetrics,
-        RssSparkConfig.toRssConf(sparkConf));
+        RssSparkConfig.toRssConf(sparkConf),
+        dataDistributionType
+    );
   }
 
   private Roaring64NavigableMap getExpectedTasksByExecutorId(
@@ -621,7 +628,9 @@ public class RssShuffleManager implements ShuffleManager {
               appId,
               shuffleId,
               entry.getValue(),
-              remoteStorage);
+              remoteStorage,
+              dataDistributionType
+          );
         });
     LOG.info("Finish register shuffleId[" + shuffleId + "] with " + (System.currentTimeMillis() - start) + " ms");
   }
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 2806ce82..f194972c 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -49,6 +49,7 @@ import scala.runtime.BoxedUnit;
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
 
@@ -76,6 +77,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
   private int mapEndIndex;
   private ShuffleReadMetrics readMetrics;
   private RssConf rssConf;
+  private ShuffleDataDistributionType dataDistributionType;
 
   public RssShuffleReader(
       int startPartition,
@@ -93,7 +95,8 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
       Map<Integer, Roaring64NavigableMap> partitionToExpectBlocks,
       Roaring64NavigableMap taskIdBitmap,
       ShuffleReadMetrics readMetrics,
-      RssConf rssConf) {
+      RssConf rssConf,
+      ShuffleDataDistributionType dataDistributionType) {
     this.appId = rssShuffleHandle.getAppId();
     this.startPartition = startPartition;
     this.endPartition = endPartition;
@@ -115,6 +118,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
     this.readMetrics = readMetrics;
     this.partitionToShuffleServers = rssShuffleHandle.getPartitionToServers();
     this.rssConf = rssConf;
+    this.dataDistributionType = dataDistributionType;
   }
 
   @Override
@@ -201,7 +205,8 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
         List<ShuffleServerInfo> shuffleServerInfoList = partitionToShuffleServers.get(partition);
         CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
             appId, shuffleId, partition, storageType, basePath, indexReadLimit, readBufferSize,
-            1, partitionNum, partitionToExpectBlocks.get(partition), taskIdBitmap, shuffleServerInfoList, hadoopConf);
+            1, partitionNum, partitionToExpectBlocks.get(partition), taskIdBitmap, shuffleServerInfoList,
+            hadoopConf, dataDistributionType);
         ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
         RssShuffleDataIterator iterator = new RssShuffleDataIterator<K, C>(
             shuffleDependency.serializer(), shuffleReadClient,
diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index 5f8eceeb..213061a4 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import scala.Option;
 
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
 import org.apache.uniffle.storage.util.StorageType;
@@ -94,7 +95,10 @@ public class RssShuffleReaderTest extends AbstractRssReaderTest {
         1,
         partitionToExpectBlocks,
         taskIdBitmap,
-        new ShuffleReadMetrics(), new RssConf()));
+        new ShuffleReadMetrics(),
+        new RssConf(),
+        ShuffleDataDistributionType.NORMAL
+    ));
     validateResult(rssShuffleReaderSpy.read(), expectedData, 10);
 
     writeTestData(writeHandler1, 2, 4, expectedData,
@@ -115,8 +119,10 @@ public class RssShuffleReaderTest extends AbstractRssReaderTest {
         2,
         partitionToExpectBlocks,
         taskIdBitmap,
-        new ShuffleReadMetrics(), new RssConf())
-    );
+        new ShuffleReadMetrics(),
+        new RssConf(),
+        ShuffleDataDistributionType.NORMAL
+    ));
     validateResult(rssShuffleReaderSpy1.read(), expectedData, 18);
 
     RssShuffleReader rssShuffleReaderSpy2 = spy(new RssShuffleReader<String, String>(
@@ -134,7 +140,10 @@ public class RssShuffleReaderTest extends AbstractRssReaderTest {
         2,
         partitionToExpectBlocks,
         Roaring64NavigableMap.bitmapOf(),
-        new ShuffleReadMetrics(), new RssConf()));
+        new ShuffleReadMetrics(),
+        new RssConf(),
+        ShuffleDataDistributionType.NORMAL
+    ));
     validateResult(rssShuffleReaderSpy2.read(), Maps.newHashMap(), 0);
   }
 
diff --git a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index 39df0029..4ab4b51c 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -28,6 +28,7 @@ import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 
 public interface ShuffleWriteClient {
@@ -41,7 +42,8 @@ public interface ShuffleWriteClient {
       String appId,
       int shuffleId,
       List<PartitionRange> partitionRanges,
-      RemoteStorageInfo remoteStorage);
+      RemoteStorageInfo remoteStorage,
+      ShuffleDataDistributionType dataDistributionType);
 
   boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps);
 
diff --git a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index 652f9923..2b8b6264 100644
--- a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++ b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -70,6 +70,6 @@ public class ShuffleClientFactory {
         request.getPartitionId(), request.getIndexReadLimit(), request.getPartitionNumPerRange(),
         request.getPartitionNum(), request.getReadBufferSize(), request.getBasePath(),
         request.getBlockIdBitmap(), request.getTaskIdBitmap(), request.getShuffleServerInfoList(),
-        request.getHadoopConf(), request.getIdHelper());
+        request.getHadoopConf(), request.getIdHelper(), request.getShuffleDataDistributionType());
   }
 }
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 5059f879..28e81f29 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -34,6 +34,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
 import org.apache.uniffle.client.util.IdHelper;
 import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
@@ -60,6 +61,29 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
   private AtomicLong crcCheckTime = new AtomicLong(0);
   private ClientReadHandler clientReadHandler;
   private final IdHelper idHelper;
+  private ShuffleDataDistributionType dataDistributionType = ShuffleDataDistributionType.NORMAL;
+
+  public ShuffleReadClientImpl(
+      String storageType,
+      String appId,
+      int shuffleId,
+      int partitionId,
+      int indexReadLimit,
+      int partitionNumPerRange,
+      int partitionNum,
+      int readBufferSize,
+      String storageBasePath,
+      Roaring64NavigableMap blockIdBitmap,
+      Roaring64NavigableMap taskIdBitmap,
+      List<ShuffleServerInfo> shuffleServerInfoList,
+      Configuration hadoopConf,
+      IdHelper idHelper,
+      ShuffleDataDistributionType dataDistributionType) {
+    this(storageType, appId, shuffleId, partitionId, indexReadLimit,
+        partitionNumPerRange, partitionNum, readBufferSize, storageBasePath,
+        blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf, idHelper);
+    this.dataDistributionType = dataDistributionType;
+  }
 
   public ShuffleReadClientImpl(
       String storageType,
@@ -96,6 +120,8 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
     request.setHadoopConf(hadoopConf);
     request.setExpectBlockIds(blockIdBitmap);
     request.setProcessBlockIds(processedBlockIds);
+    request.setDistributionType(dataDistributionType);
+    request.setExpectTaskIds(taskIdBitmap);
 
     List<Long> removeBlockIds = Lists.newArrayList();
     blockIdBitmap.forEach(bid -> {
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index e8372d8e..be83ca0f 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -73,6 +73,7 @@ import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.ThreadUtils;
@@ -332,7 +333,8 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
       String appId,
       int shuffleId,
       List<PartitionRange> partitionRanges,
-      RemoteStorageInfo remoteStorage) {
+      RemoteStorageInfo remoteStorage,
+      ShuffleDataDistributionType dataDistributionType) {
     String user = null;
     try {
       user = UserGroupInformation.getCurrentUser().getShortUserName();
@@ -342,7 +344,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
     LOG.info("User: {}", user);
 
     RssRegisterShuffleRequest request =
-        new RssRegisterShuffleRequest(appId, shuffleId, partitionRanges, remoteStorage, user);
+        new RssRegisterShuffleRequest(appId, shuffleId, partitionRanges, remoteStorage, user, dataDistributionType);
     RssRegisterShuffleResponse response = getShuffleServerClient(shuffleServerInfo).registerShuffle(request);
 
     String msg = "Error happened when registerShuffle with appId[" + appId + "], shuffleId[" + shuffleId
diff --git a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
index d9f8a754..2cfd021d 100644
--- a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
+++ b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
@@ -24,6 +24,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.client.util.IdHelper;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 
 public class CreateShuffleReadClientRequest {
@@ -42,6 +43,28 @@ public class CreateShuffleReadClientRequest {
   private List<ShuffleServerInfo> shuffleServerInfoList;
   private Configuration hadoopConf;
   private IdHelper idHelper;
+  private ShuffleDataDistributionType shuffleDataDistributionType = ShuffleDataDistributionType.NORMAL;
+
+  public CreateShuffleReadClientRequest(
+      String appId,
+      int shuffleId,
+      int partitionId,
+      String storageType,
+      String basePath,
+      int indexReadLimit,
+      int readBufferSize,
+      int partitionNumPerRange,
+      int partitionNum,
+      Roaring64NavigableMap blockIdBitmap,
+      Roaring64NavigableMap taskIdBitmap,
+      List<ShuffleServerInfo> shuffleServerInfoList,
+      Configuration hadoopConf,
+      ShuffleDataDistributionType dataDistributionType) {
+    this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize,
+        partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList,
+        hadoopConf, new DefaultIdHelper());
+    this.shuffleDataDistributionType = dataDistributionType;
+  }
 
   public CreateShuffleReadClientRequest(
       String appId,
@@ -148,4 +171,8 @@ public class CreateShuffleReadClientRequest {
   public IdHelper getIdHelper() {
     return idHelper;
   }
+
+  public ShuffleDataDistributionType getShuffleDataDistributionType() {
+    return shuffleDataDistributionType;
+  }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/ShuffleDataDistributionType.java
similarity index 50%
copy from common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
copy to common/src/main/java/org/apache/uniffle/common/ShuffleDataDistributionType.java
index 99d82e03..70b1dbaa 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleDataDistributionType.java
@@ -15,24 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.common.config;
+package org.apache.uniffle.common;
 
-import org.apache.uniffle.common.compression.Codec;
-
-import static org.apache.uniffle.common.compression.Codec.Type.LZ4;
-
-public class RssClientConf {
-
-  public static final ConfigOption<Codec.Type> COMPRESSION_TYPE = ConfigOptions
-      .key("rss.client.io.compression.codec")
-      .enumType(Codec.Type.class)
-      .defaultValue(LZ4)
-      .withDescription("The compression codec is used to compress the shuffle data. "
-          + "Default codec is `LZ4`, `ZSTD` also can be used.");
-
-  public static final ConfigOption<Integer> ZSTD_COMPRESSION_LEVEL = ConfigOptions
-      .key("rss.client.io.compression.zstd.level")
-      .intType()
-      .defaultValue(3)
-      .withDescription("The zstd compression level, the default level is 3");
+/**
+ * The type of shuffle data distribution of a single partition.
+ */
+public enum ShuffleDataDistributionType {
+  NORMAL,
+  LOCAL_ORDER
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 99d82e03..cb6ec2e8 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.common.config;
 
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.compression.Codec;
 
 import static org.apache.uniffle.common.compression.Codec.Type.LZ4;
@@ -35,4 +36,11 @@ public class RssClientConf {
       .intType()
       .defaultValue(3)
       .withDescription("The zstd compression level, the default level is 3");
+
+  public static final ConfigOption<ShuffleDataDistributionType> DATA_DISTRIBUTION_TYPE = ConfigOptions
+      .key("rss.client.shuffle.data.distribution.type")
+      .enumType(ShuffleDataDistributionType.class)
+      .defaultValue(ShuffleDataDistributionType.NORMAL)
+      .withDescription("The type of partition shuffle data distribution, including normal and local_order. "
+          + "The default value is normal. This config is only valid in Spark3.x");
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
new file mode 100644
index 00000000..79e59b62
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.segment;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataSegment;
+import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.exception.RssException;
+
+public class FixedSizeSegmentSplitter implements SegmentSplitter {
+
+  private int readBufferSize;
+
+  public FixedSizeSegmentSplitter(int readBufferSize) {
+    this.readBufferSize = readBufferSize;
+  }
+
+  @Override
+  public List<ShuffleDataSegment> split(ShuffleIndexResult shuffleIndexResult) {
+    if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
+      return Lists.newArrayList();
+    }
+
+    byte[] indexData = shuffleIndexResult.getIndexData();
+    long dataFileLen = shuffleIndexResult.getDataFileLen();
+    return transIndexDataToSegments(indexData, readBufferSize, dataFileLen);
+  }
+
+  private static List<ShuffleDataSegment> transIndexDataToSegments(byte[] indexData,
+      int readBufferSize, long dataFileLen) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(indexData);
+    List<BufferSegment> bufferSegments = Lists.newArrayList();
+    List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList();
+    int bufferOffset = 0;
+    long fileOffset = -1;
+    long totalLength = 0;
+
+    while (byteBuffer.hasRemaining()) {
+      try {
+        long offset = byteBuffer.getLong();
+        int length = byteBuffer.getInt();
+        int uncompressLength = byteBuffer.getInt();
+        long crc = byteBuffer.getLong();
+        long blockId = byteBuffer.getLong();
+        long taskAttemptId = byteBuffer.getLong();
+        // The index file is written, read and parsed sequentially, so these parsed index segments
+        // index a continuous shuffle data in the corresponding data file and the first segment's
+        // offset field is the offset of these shuffle data in the data file.
+        if (fileOffset == -1) {
+          fileOffset = offset;
+        }
+
+        bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
+        bufferOffset += length;
+        totalLength += length;
+
+        // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater
+        // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException
+        if (dataFileLen != -1 && totalLength >= dataFileLen) {
+          break;
+        }
+
+        if (bufferOffset >= readBufferSize) {
+          ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
+          dataFileSegments.add(sds);
+          bufferSegments = Lists.newArrayList();
+          bufferOffset = 0;
+          fileOffset = -1;
+        }
+      } catch (BufferUnderflowException ue) {
+        throw new RssException("Read index data under flow", ue);
+      }
+    }
+
+    if (bufferOffset > 0) {
+      ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
+      dataFileSegments.add(sds);
+    }
+
+    return dataFileSegments;
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
new file mode 100644
index 00000000..77e02e06
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.segment;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataSegment;
+import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.exception.RssException;
+
+/**
+ * {@class LocalOrderSegmentSplitter} will be initialized only when the {@class ShuffleDataDistributionType}
+ * is LOCAL_ORDER, which means the index file will be split into several segments according to its
+ * locally ordered properties. And it will skip some blocks, but the remaining blocks in a segment
+ * are continuous.
+ *
+ * This strategy will be useful for Spark AQE skew optimization, it will split the single partition into
+ * multiple shuffle readers, and each one will fetch partial single partition data which is in the range of
+ * [StartMapId, endMapId). And so if one reader uses this, it will skip lots of unnecessary blocks.
+ *
+ * Last but not least, this split strategy depends on LOCAL_ORDER of index file, which must be guaranteed by
+ * the shuffle server.
+ */
+public class LocalOrderSegmentSplitter implements SegmentSplitter {
+
+  private Roaring64NavigableMap expectTaskIds;
+  private int readBufferSize;
+
+  public LocalOrderSegmentSplitter(Roaring64NavigableMap expectTaskIds, int readBufferSize) {
+    this.expectTaskIds = expectTaskIds;
+    this.readBufferSize = readBufferSize;
+  }
+
+  @Override
+  public List<ShuffleDataSegment> split(ShuffleIndexResult shuffleIndexResult) {
+    if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
+      return Lists.newArrayList();
+    }
+
+    byte[] indexData = shuffleIndexResult.getIndexData();
+    long dataFileLen = shuffleIndexResult.getDataFileLen();
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(indexData);
+    List<BufferSegment> bufferSegments = Lists.newArrayList();
+
+    List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList();
+    int bufferOffset = 0;
+    long fileOffset = -1;
+    long totalLen = 0;
+
+    long lastTaskAttemptId = -1;
+
+    /**
+     * One ShuffleDataSegment should meet following requirements:
+     *
+     * 1. taskId in [startMapId, endMapId) taskIds bitmap
+     * 2. ShuffleDataSegment size should < readBufferSize
+     * 3. ShuffleDataSegment's blocks should be continuous
+     *
+     */
+    while (byteBuffer.hasRemaining()) {
+      try {
+        long offset = byteBuffer.getLong();
+        int length = byteBuffer.getInt();
+        int uncompressLength = byteBuffer.getInt();
+        long crc = byteBuffer.getLong();
+        long blockId = byteBuffer.getLong();
+        long taskAttemptId = byteBuffer.getLong();
+
+        if (lastTaskAttemptId == -1) {
+          lastTaskAttemptId = taskAttemptId;
+        }
+
+        // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater
+        // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException
+        if (dataFileLen != -1 && totalLen >= dataFileLen) {
+          break;
+        }
+
+        if ((taskAttemptId < lastTaskAttemptId && bufferSegments.size() > 0) || bufferOffset >= readBufferSize) {
+          ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
+          dataFileSegments.add(sds);
+          bufferSegments = Lists.newArrayList();
+          bufferOffset = 0;
+          fileOffset = -1;
+        }
+
+        if (expectTaskIds.contains(taskAttemptId)) {
+          if (fileOffset == -1) {
+            fileOffset = offset;
+          }
+          bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
+          bufferOffset += length;
+        }
+
+        lastTaskAttemptId = taskAttemptId;
+      } catch (BufferUnderflowException ue) {
+        throw new RssException("Read index data under flow", ue);
+      }
+    }
+
+    if (bufferOffset > 0) {
+      ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
+      dataFileSegments.add(sds);
+    }
+
+    return dataFileSegments;
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/segment/SegmentSplitter.java
similarity index 50%
copy from common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
copy to common/src/main/java/org/apache/uniffle/common/segment/SegmentSplitter.java
index 99d82e03..6bf24327 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/segment/SegmentSplitter.java
@@ -15,24 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.common.config;
+package org.apache.uniffle.common.segment;
 
-import org.apache.uniffle.common.compression.Codec;
+import java.util.List;
 
-import static org.apache.uniffle.common.compression.Codec.Type.LZ4;
+import org.apache.uniffle.common.ShuffleDataSegment;
+import org.apache.uniffle.common.ShuffleIndexResult;
 
-public class RssClientConf {
+public interface SegmentSplitter {
 
-  public static final ConfigOption<Codec.Type> COMPRESSION_TYPE = ConfigOptions
-      .key("rss.client.io.compression.codec")
-      .enumType(Codec.Type.class)
-      .defaultValue(LZ4)
-      .withDescription("The compression codec is used to compress the shuffle data. "
-          + "Default codec is `LZ4`, `ZSTD` also can be used.");
+  List<ShuffleDataSegment> split(ShuffleIndexResult shuffleIndexResult);
 
-  public static final ConfigOption<Integer> ZSTD_COMPRESSION_LEVEL = ConfigOptions
-      .key("rss.client.io.compression.zstd.level")
-      .intType()
-      .defaultValue(3)
-      .withDescription("The zstd compression level, the default level is 3");
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/segment/SegmentSplitterFactory.java b/common/src/main/java/org/apache/uniffle/common/segment/SegmentSplitterFactory.java
new file mode 100644
index 00000000..0fc0ff08
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/segment/SegmentSplitterFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.segment;
+
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+
+public class SegmentSplitterFactory {
+
+  private SegmentSplitterFactory() {
+    // ignore
+  }
+
+  private static class LazyHolder {
+    static final SegmentSplitterFactory INSTANCE = new SegmentSplitterFactory();
+  }
+
+  public SegmentSplitter get(
+      ShuffleDataDistributionType distributionType,
+      Roaring64NavigableMap expectTaskIds,
+      int readBufferSize) {
+    switch (distributionType) {
+      case LOCAL_ORDER:
+        return new LocalOrderSegmentSplitter(expectTaskIds, readBufferSize);
+      case NORMAL:
+      default:
+        return new FixedSizeSegmentSplitter(readBufferSize);
+    }
+  }
+
+  public static SegmentSplitterFactory getInstance() {
+    return LazyHolder.INSTANCE;
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 848d6d01..16e21da3 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -30,8 +30,6 @@ import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.InterfaceAddress;
 import java.net.NetworkInterface;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -49,11 +47,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.uniffle.common.BufferSegment;
-import org.apache.uniffle.common.ShuffleDataSegment;
-import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.exception.RssException;
 
 public class RssUtils {
 
@@ -184,71 +178,6 @@ public class RssUtils {
     return clone;
   }
 
-  public static List<ShuffleDataSegment> transIndexDataToSegments(
-      ShuffleIndexResult shuffleIndexResult, int readBufferSize) {
-    if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
-      return Lists.newArrayList();
-    }
-
-    byte[] indexData = shuffleIndexResult.getIndexData();
-    long dataFileLen = shuffleIndexResult.getDataFileLen();
-    return transIndexDataToSegments(indexData, readBufferSize, dataFileLen);
-  }
-
-  private static List<ShuffleDataSegment> transIndexDataToSegments(byte[] indexData,
-      int readBufferSize, long dataFileLen) {
-    ByteBuffer byteBuffer = ByteBuffer.wrap(indexData);
-    List<BufferSegment> bufferSegments = Lists.newArrayList();
-    List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList();
-    int bufferOffset = 0;
-    long fileOffset = -1;
-    long totalLength = 0;
-
-    while (byteBuffer.hasRemaining()) {
-      try {
-        long offset = byteBuffer.getLong();
-        int length = byteBuffer.getInt();
-        int uncompressLength = byteBuffer.getInt();
-        long crc = byteBuffer.getLong();
-        long blockId = byteBuffer.getLong();
-        long taskAttemptId = byteBuffer.getLong();
-        // The index file is written, read and parsed sequentially, so these parsed index segments
-        // index a continuous shuffle data in the corresponding data file and the first segment's
-        // offset field is the offset of these shuffle data in the data file.
-        if (fileOffset == -1) {
-          fileOffset = offset;
-        }
-
-        bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
-        bufferOffset += length;
-        totalLength += length;
-
-        // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater
-        // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException
-        if (dataFileLen != -1 && totalLength >= dataFileLen) {
-          break;
-        }
-
-        if (bufferOffset >= readBufferSize) {
-          ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
-          dataFileSegments.add(sds);
-          bufferSegments = Lists.newArrayList();
-          bufferOffset = 0;
-          fileOffset = -1;
-        }
-      } catch (BufferUnderflowException ue) {
-        throw new RssException("Read index data under flow", ue);
-      }
-    }
-
-    if (bufferOffset > 0) {
-      ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
-      dataFileSegments.add(sds);
-    }
-
-    return dataFileSegments;
-  }
-
   public static String generateShuffleKey(String appId, int shuffleId) {
     return String.join(Constants.KEY_SPLIT_CHAR, appId, String.valueOf(shuffleId));
   }
diff --git a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java
new file mode 100644
index 00000000..9282b801
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.segment;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.ShuffleDataSegment;
+import org.apache.uniffle.common.ShuffleIndexResult;
+
+import static org.apache.uniffle.common.segment.LocalOrderSegmentSplitterTest.generateData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class FixedSizeSegmentSplitterTest {
+
+  @Test
+  public void testSplit() {
+    SegmentSplitter splitter = new FixedSizeSegmentSplitter(100);
+    ShuffleIndexResult shuffleIndexResult = new ShuffleIndexResult();
+    List<ShuffleDataSegment> shuffleDataSegments = splitter.split(shuffleIndexResult);
+    assertTrue(shuffleDataSegments.isEmpty());
+
+    int readBufferSize = 32;
+    splitter = new FixedSizeSegmentSplitter(32);
+
+    // those 5 segment's data length are [32, 16, 10, 32, 6] so the index should be
+    // split into 3 ShuffleDataSegment, which are [32, 16 + 10 + 32, 6]
+    byte[] data = generateData(
+        Pair.of(32, 0),
+        Pair.of(16, 0),
+        Pair.of(10, 0),
+        Pair.of(32, 6),
+        Pair.of(6, 0)
+    );
+    shuffleDataSegments = splitter.split(new ShuffleIndexResult(data, -1));
+    assertEquals(3, shuffleDataSegments.size());
+
+    assertEquals(0, shuffleDataSegments.get(0).getOffset());
+    assertEquals(32, shuffleDataSegments.get(0).getLength());
+    assertEquals(1, shuffleDataSegments.get(0).getBufferSegments().size());
+
+    assertEquals(32, shuffleDataSegments.get(1).getOffset());
+    assertEquals(58, shuffleDataSegments.get(1).getLength());
+    assertEquals(3,shuffleDataSegments.get(1).getBufferSegments().size());
+
+    assertEquals(90, shuffleDataSegments.get(2).getOffset());
+    assertEquals(6, shuffleDataSegments.get(2).getLength());
+    assertEquals(1, shuffleDataSegments.get(2).getBufferSegments().size());
+
+    ByteBuffer incompleteByteBuffer = ByteBuffer.allocate(12);
+    incompleteByteBuffer.putLong(1L);
+    incompleteByteBuffer.putInt(2);
+    data = incompleteByteBuffer.array();
+    // It should throw exception
+    try {
+      splitter.split(new ShuffleIndexResult(data, -1));
+      fail();
+    } catch (Exception e) {
+      // ignore
+      assertTrue(e.getMessage().contains("Read index data under flow"));
+    }
+  }
+}
diff --git a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
new file mode 100644
index 00000000..27a29814
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.segment;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.Test;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import org.apache.uniffle.common.ShuffleDataSegment;
+import org.apache.uniffle.common.ShuffleIndexResult;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LocalOrderSegmentSplitterTest {
+
+  @Test
+  public void testSplit() {
+    Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1);
+    LocalOrderSegmentSplitter splitter = new LocalOrderSegmentSplitter(taskIds, 1000);
+    assertTrue(splitter.split(new ShuffleIndexResult()).isEmpty());
+
+    splitter = new LocalOrderSegmentSplitter(taskIds, 32);
+
+    /**
+     * (length, taskId)
+     * case1: (32, 1) (16, 1) (10, 2) (16, 1) (6, 1)
+     *
+     *        (10, 2) will be dropped
+     */
+    byte[] data = generateData(
+        Pair.of(32, 1),
+        Pair.of(16, 1),
+        Pair.of(10, 2),
+        Pair.of(16, 1),
+        Pair.of(6, 1)
+    );
+    List<ShuffleDataSegment> dataSegments = splitter.split(new ShuffleIndexResult(data, -1));
+    assertEquals(3, dataSegments.size());
+
+    assertEquals(0, dataSegments.get(0).getOffset());
+    assertEquals(32, dataSegments.get(0).getLength());
+
+    assertEquals(32, dataSegments.get(1).getOffset());
+    assertEquals(16, dataSegments.get(1).getLength());
+
+    assertEquals(58, dataSegments.get(2).getOffset());
+    assertEquals(22, dataSegments.get(2).getLength());
+
+    /**
+     * case2: (32, 2) (16, 1) (10, 1) (16, 2) (6, 1)
+     *
+     *        (32, 2) (16, 2) will be dropped
+     */
+    data = generateData(
+        Pair.of(32, 2),
+        Pair.of(16, 1),
+        Pair.of(10, 1),
+        Pair.of(16, 2),
+        Pair.of(6, 1)
+    );
+    dataSegments = splitter.split(new ShuffleIndexResult(data, -1));
+    assertEquals(2, dataSegments.size());
+
+    assertEquals(32, dataSegments.get(0).getOffset());
+    assertEquals(26, dataSegments.get(0).getLength());
+
+    assertEquals(74, dataSegments.get(1).getOffset());
+    assertEquals(6, dataSegments.get(1).getLength());
+
+    /**
+     * case3: (32, 5) (16, 1) (10, 3) (16, 4) (6, 1)
+     *
+     *        (32, 5) will be dropped
+     */
+    taskIds = Roaring64NavigableMap.bitmapOf(1, 2, 3, 4);
+    splitter = new LocalOrderSegmentSplitter(taskIds, 32);
+    data = generateData(
+        Pair.of(32, 5),
+        Pair.of(16, 1),
+        Pair.of(10, 3),
+        Pair.of(16, 4),
+        Pair.of(6, 1)
+    );
+    dataSegments = splitter.split(new ShuffleIndexResult(data, -1));
+    assertEquals(2, dataSegments.size());
+
+    assertEquals(32, dataSegments.get(0).getOffset());
+    assertEquals(42, dataSegments.get(0).getLength());
+
+    assertEquals(74, dataSegments.get(1).getOffset());
+    assertEquals(6, dataSegments.get(1).getLength());
+  }
+
+  public static byte[] generateData(Pair<Integer, Integer>... configEntries) {
+    ByteBuffer byteBuffer = ByteBuffer.allocate(configEntries.length * 40);
+    int total = 0;
+    for (Pair<Integer, Integer> entry : configEntries) {
+      byteBuffer.putLong(total);
+      byteBuffer.putInt(entry.getLeft());
+      byteBuffer.putInt(1);
+      byteBuffer.putLong(1);
+      byteBuffer.putLong(1);
+      byteBuffer.putLong(entry.getRight());
+
+      total += entry.getLeft();
+    }
+    return byteBuffer.array();
+  }
+}
diff --git a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index 4c1f5a8c..a84e3902 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -21,7 +21,6 @@ import java.lang.reflect.Field;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -35,9 +34,6 @@ import com.google.common.collect.Sets;
 import org.junit.jupiter.api.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
-import org.apache.uniffle.common.BufferSegment;
-import org.apache.uniffle.common.ShuffleDataSegment;
-import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.ShuffleServerInfo;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -103,69 +99,6 @@ public class RssUtilsTest {
     assertEquals(bitmap1, bitmap2);
   }
 
-  @Test
-  public void testShuffleIndexSegment() {
-    ShuffleIndexResult shuffleIndexResult = new ShuffleIndexResult();
-    List<ShuffleDataSegment> shuffleDataSegments =
-        RssUtils.transIndexDataToSegments(shuffleIndexResult, 1000);
-    assertTrue(shuffleDataSegments.isEmpty());
-
-    int readBufferSize = 32;
-    int totalLength = 0;
-    List<BufferSegment> bufferSegments = Lists.newArrayList();
-    int[] dataSegmentLength = new int[]{32, 16, 10, 32, 6};
-
-    for (int i = 0; i < dataSegmentLength.length; ++i) {
-      long offset = totalLength;
-      int length = dataSegmentLength[i];
-      bufferSegments.add(new BufferSegment(i, offset, length, i, i, i));
-      totalLength += length;
-    }
-
-    // those 5 segment's data length are [32, 16, 10, 32, 6] so the index should be
-    // split into 3 ShuffleDataSegment, which are [32, 16 + 10 + 32, 6]
-    int expectedTotalSegmentNum = 3;
-    ByteBuffer byteBuffer = ByteBuffer.allocate(5 * 40);
-
-    for (BufferSegment bufferSegment : bufferSegments) {
-      byteBuffer.putLong(bufferSegment.getOffset());
-      byteBuffer.putInt(bufferSegment.getLength());
-      byteBuffer.putInt(bufferSegment.getUncompressLength());
-      byteBuffer.putLong(bufferSegment.getCrc());
-      byteBuffer.putLong(bufferSegment.getBlockId());
-      byteBuffer.putLong(bufferSegment.getTaskAttemptId());
-    }
-
-    byte[] data = byteBuffer.array();
-    shuffleDataSegments = RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data, -1), readBufferSize);
-    assertEquals(expectedTotalSegmentNum, shuffleDataSegments.size());
-
-    assertEquals(0, shuffleDataSegments.get(0).getOffset());
-    assertEquals(32, shuffleDataSegments.get(0).getLength());
-    assertEquals(1, shuffleDataSegments.get(0).getBufferSegments().size());
-
-    assertEquals(32, shuffleDataSegments.get(1).getOffset());
-    assertEquals(58, shuffleDataSegments.get(1).getLength());
-    assertEquals(3,shuffleDataSegments.get(1).getBufferSegments().size());
-
-    assertEquals(90, shuffleDataSegments.get(2).getOffset());
-    assertEquals(6, shuffleDataSegments.get(2).getLength());
-    assertEquals(1, shuffleDataSegments.get(2).getBufferSegments().size());
-
-    ByteBuffer incompleteByteBuffer = ByteBuffer.allocate(12);
-    incompleteByteBuffer.putLong(1L);
-    incompleteByteBuffer.putInt(2);
-    data = incompleteByteBuffer.array();
-    // It should throw exception
-    try {
-      RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data, -1), readBufferSize);
-      fail();
-    } catch (Exception e) {
-      // ignore
-      assertTrue(e.getMessage().contains("Read index data under flow"));
-    }
-  }
-
   @Test
   public void getMetricNameForHostNameTest() {
     assertEquals("a_b_c", RssUtils.getMetricNameForHostName("a.b.c"));
diff --git a/docs/client_guide.md b/docs/client_guide.md
index c945802d..0699ce55 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -54,6 +54,17 @@ After apply the patch and rebuild spark, add following configuration in spark co
   spark.dynamicAllocation.enabled true
   ```
 
+### Support Spark AQE
+
+To improve performance of AQE skew optimization, uniffle introduces the LOCAL_ORDER shuffle-data distribution mechanism
+to filter the lots of data to reduce network bandwidth and shuffle-server local-disk pressure.
+
+It can be enabled by the following config
+  ```bash
+  # Default value is NORMAL, it will directly append to file when the memory data is flushed to external storage 
+  spark.rss.client.shuffle.data.distribution.type LOCAL_ORDER
+  ```
+
 ### Deploy MapReduce Client Plugin
 
 1. Add client jar to the classpath of each NodeManager, e.g., <HADOOP>/share/hadoop/mapreduce/
@@ -91,6 +102,7 @@ These configurations are shared by all types of clients.
 |<client_type>.rss.client.assignment.shuffle.nodes.max|-1|The number of required assignment shuffle servers. If it is less than 0 or equals to 0 or greater than the coordinator's config of "rss.coordinator.shuffle.nodes.max", it will use the size of "rss.coordinator.shuffle.nodes.max" default|
 |<client_type>.rss.client.io.compression.codec|lz4|The compression codec is used to compress the shuffle data. Default codec is `lz4`, `zstd` also can be used.|
 |<client_type>.rss.client.io.compression.zstd.level|3|The zstd compression level, the default level is 3|
+|<client_type>.rss.client.shuffle.data.distribution.type|NORMAL|The type of partition shuffle data distribution, including normal and local_order. The default value is normal. Now this config is only valid in Spark3.x|
 Notice:
 
 1. `<client_type>` should be `spark` or `mapreduce`
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 2ea3d5c0..f4791df9 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -42,6 +42,7 @@ import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -272,8 +273,14 @@ public class QuorumTest extends ShuffleReadWriteBase {
         shuffleServerInfo2, shuffleServerInfo3, shuffleServerInfo4);
 
     for (int i = 0; i < replica; i++) {
-      shuffleWriteClientImpl.registerShuffle(allServers.get(i),
-          testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), new RemoteStorageInfo(""));
+      shuffleWriteClientImpl.registerShuffle(
+          allServers.get(i),
+          testAppId,
+          0,
+          Lists.newArrayList(new PartitionRange(0, 0)),
+          new RemoteStorageInfo(""),
+          ShuffleDataDistributionType.NORMAL
+      );
     }
   }
 
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
index 8c0a678c..ca0afadc 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
@@ -39,9 +39,10 @@ import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter;
+import org.apache.uniffle.common.segment.SegmentSplitter;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.Constants;
-import org.apache.uniffle.common.util.RssUtils;
 
 
 public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
@@ -131,8 +132,7 @@ public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
     RssGetShuffleIndexRequest rgsir = new RssGetShuffleIndexRequest(
         appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);
     ShuffleIndexResult shuffleIndexResult = shuffleServerClient.getShuffleIndex(rgsir).getShuffleIndexResult();
-    return RssUtils.transIndexDataToSegments(shuffleIndexResult, readBufferSize);
-
+    return new FixedSizeSegmentSplitter(readBufferSize).split(shuffleIndexResult);
   }
 
   public static ShuffleDataResult readShuffleData(
@@ -167,28 +167,51 @@ public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
       int partitionNumPerRange,
       int partitionNum,
       int readBufferSize,
-      int segmentIndex) {
-    // read index file
+      int segmentIndex,
+      SegmentSplitter segmentSplitter) {
     RssGetShuffleIndexRequest rgsir = new RssGetShuffleIndexRequest(
         appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);
     ShuffleIndexResult shuffleIndexResult = shuffleServerClient.getShuffleIndex(rgsir).getShuffleIndexResult();
     if (shuffleIndexResult == null) {
       return new ShuffleDataResult();
     }
-    List<ShuffleDataSegment> sds = RssUtils.transIndexDataToSegments(shuffleIndexResult, readBufferSize);
+    List<ShuffleDataSegment> sds = segmentSplitter.split(shuffleIndexResult);
 
     if (segmentIndex >= sds.size()) {
       return new ShuffleDataResult();
     }
 
-    // read shuffle data
     ShuffleDataSegment segment = sds.get(segmentIndex);
     RssGetShuffleDataRequest rgsdr = new RssGetShuffleDataRequest(
         appId, shuffleId, partitionId, partitionNumPerRange, partitionNum,
         segment.getOffset(), segment.getLength());
 
+    // read shuffle data
     return new ShuffleDataResult(
         shuffleServerClient.getShuffleData(rgsdr).getShuffleData(),
         segment.getBufferSegments());
   }
+
+  public static ShuffleDataResult readShuffleData(
+      ShuffleServerGrpcClient shuffleServerClient,
+      String appId,
+      int shuffleId,
+      int partitionId,
+      int partitionNumPerRange,
+      int partitionNum,
+      int readBufferSize,
+      int segmentIndex) {
+    // read index file
+    return readShuffleData(
+        shuffleServerClient,
+        appId,
+        shuffleId,
+        partitionId,
+        partitionNumPerRange,
+        partitionNum,
+        readBufferSize,
+        segmentIndex,
+        new FixedSizeSegmentSplitter(readBufferSize)
+    );
+  }
 }
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index be51772a..9b3d8488 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -54,6 +54,7 @@ import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.util.Constants;
@@ -112,7 +113,10 @@ public class ShuffleServerGrpcTest extends IntegrationTestBase {
         new ShuffleServerInfo("127.0.0.1-20001", "127.0.0.1", 20001),
         "clearResourceTest1",
         0,
-        Lists.newArrayList(new PartitionRange(0, 1)), new RemoteStorageInfo(""));
+        Lists.newArrayList(new PartitionRange(0, 1)),
+        new RemoteStorageInfo(""),
+        ShuffleDataDistributionType.NORMAL
+    );
 
     shuffleWriteClient.sendAppHeartbeat("clearResourceTest1", 1000L);
     shuffleWriteClient.sendAppHeartbeat("clearResourceTest2", 1000L);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
index 88c5438f..ed2bec66 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
@@ -46,6 +46,7 @@ import org.apache.uniffle.common.KerberizedHdfsBase;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.CoordinatorServer;
 import org.apache.uniffle.server.ShuffleServer;
@@ -178,7 +179,8 @@ public class ShuffleServerWithKerberizedHdfsTest extends KerberizedHdfsBase {
         0,
         Lists.newArrayList(new PartitionRange(0, 1)),
         remoteStorageInfo,
-        user
+        user,
+        ShuffleDataDistributionType.NORMAL
     );
     shuffleServerClient.registerShuffle(rrsr);
 
@@ -187,7 +189,8 @@ public class ShuffleServerWithKerberizedHdfsTest extends KerberizedHdfsBase {
         0,
         Lists.newArrayList(new PartitionRange(2, 3)),
         remoteStorageInfo,
-        user
+        user,
+        ShuffleDataDistributionType.NORMAL
     );
     shuffleServerClient.registerShuffle(rrsr);
 
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
new file mode 100644
index 00000000..06a37b2a
--- /dev/null
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.request.RssFinishShuffleRequest;
+import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
+import org.apache.uniffle.client.request.RssSendCommitRequest;
+import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.util.DefaultIdHelper;
+import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.segment.LocalOrderSegmentSplitter;
+import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.apache.uniffle.common.ShuffleDataDistributionType.LOCAL_ORDER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * This class is to test the local_order shuffle-data distribution
+ */
+public class ShuffleServerWithLocalOfLocalOrderTest extends ShuffleReadWriteBase {
+
+  private ShuffleServerGrpcClient shuffleServerClient;
+
+  @BeforeAll
+  public static void setupServers() throws Exception {
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    createCoordinatorServer(coordinatorConf);
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    File tmpDir = Files.createTempDir();
+    File dataDir1 = new File(tmpDir, "data1");
+    File dataDir2 = new File(tmpDir, "data2");
+    String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
+    shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name());
+    shuffleServerConf.setString("rss.storage.basePath", basePath);
+    shuffleServerConf.setString("rss.server.app.expired.withoutHeartbeat", "5000");
+    createShuffleServer(shuffleServerConf);
+    startServers();
+  }
+
+  @BeforeEach
+  public void createClient() {
+    shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
+  }
+
+  @AfterEach
+  public void closeClient() {
+    shuffleServerClient.close();
+  }
+
+  public static Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> createTestDataWithMultiMapIdx(
+      Roaring64NavigableMap[] bitmaps,
+      Map<Long, byte[]> expectedData) {
+    for (int i = 0; i < 4; i++) {
+      bitmaps[i] = Roaring64NavigableMap.bitmapOf();
+    }
+
+    // key: mapIdx
+    Map<Integer, List<ShuffleBlockInfo>> p0 = new HashMap<>();
+    List<ShuffleBlockInfo> blocks1 = createShuffleBlockList(
+        0, 0, 0, 3, 25, bitmaps[0], expectedData, mockSSI);
+    List<ShuffleBlockInfo> blocks2 = createShuffleBlockList(
+        0, 0, 1, 3, 25, bitmaps[0], expectedData, mockSSI);
+    List<ShuffleBlockInfo> blocks3 = createShuffleBlockList(
+        0, 0, 2, 3, 25, bitmaps[0], expectedData, mockSSI);
+    p0.put(0, blocks1);
+    p0.put(1, blocks2);
+    p0.put(2, blocks3);
+
+    final List<ShuffleBlockInfo> blocks4 = createShuffleBlockList(
+        0, 1, 1, 5, 25, bitmaps[1], expectedData, mockSSI);
+    final Map<Integer, List<ShuffleBlockInfo>> p1 = new HashMap<>();
+    p1.put(1, blocks4);
+
+    final List<ShuffleBlockInfo> blocks5 = createShuffleBlockList(
+        0, 2, 2, 4, 25, bitmaps[2], expectedData, mockSSI);
+    final Map<Integer, List<ShuffleBlockInfo>> p2 = new HashMap<>();
+    p2.put(2, blocks5);
+
+    final List<ShuffleBlockInfo> blocks6 = createShuffleBlockList(
+        0, 3, 3, 1, 25, bitmaps[3], expectedData, mockSSI);
+    final Map<Integer, List<ShuffleBlockInfo>> p3 = new HashMap<>();
+    p1.put(3, blocks6);
+
+    Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> partitionToBlocks = Maps.newHashMap();
+    partitionToBlocks.put(0, p0);
+    partitionToBlocks.put(1, p1);
+    partitionToBlocks.put(2, p2);
+    partitionToBlocks.put(3, p3);
+    return partitionToBlocks;
+  }
+
+  @Test
+  public void testWriteAndReadWithSpecifiedMapRange() throws Exception {
+    String testAppId = "testWriteAndReadWithSpecifiedMapRange";
+
+    for (int i = 0; i < 4; i++) {
+      RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(testAppId, 0,
+          Lists.newArrayList(new PartitionRange(i, i)), new RemoteStorageInfo(""), "", LOCAL_ORDER);
+      shuffleServerClient.registerShuffle(rrsr);
+    }
+
+    /**
+     * Write the data to shuffle-servers
+     */
+    Map<Long, byte[]> expectedData = Maps.newHashMap();
+    Roaring64NavigableMap[] bitMaps = new Roaring64NavigableMap[4];
+
+    // Create the shuffle block with the mapIdx
+    Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> partitionToBlocksWithMapIdx =
+        createTestDataWithMultiMapIdx(bitMaps, expectedData);
+
+    Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = partitionToBlocksWithMapIdx.entrySet()
+        .stream()
+        .map(x ->
+            Pair.of(x.getKey(), x.getValue().values().stream().flatMap(a -> a.stream()).collect(Collectors.toList()))
+        )
+        .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
+
+    Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
+    shuffleToBlocks.put(0, partitionToBlocks);
+
+    RssSendShuffleDataRequest rssdr = new RssSendShuffleDataRequest(
+        testAppId, 3, 1000, shuffleToBlocks);
+    shuffleServerClient.sendShuffleData(rssdr);
+
+    // Flush the data to file
+    RssSendCommitRequest rscr = new RssSendCommitRequest(testAppId, 0);
+    shuffleServerClient.sendCommit(rscr);
+    RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(testAppId, 0);
+    shuffleServerClient.finishShuffle(rfsr);
+
+    /**
+     * Read the single partition data by specified [startMapIdx, endMapIdx)
+     */
+    // case1: get the mapIdx range [0, 1) of partition0
+    final Set<Long> expectedBlockIds1 = partitionToBlocksWithMapIdx.get(0).get(0).stream()
+        .map(x -> x.getBlockId())
+        .collect(Collectors.toSet());
+    final Map<Long, byte[]> expectedData1 = expectedData.entrySet().stream()
+        .filter(x -> expectedBlockIds1.contains(x.getKey()))
+        .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
+
+    Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(0);
+    ShuffleDataResult sdr  = readShuffleData(
+        shuffleServerClient,
+        testAppId,
+        0,
+        0,
+        1,
+        10,
+        1000,
+        0,
+        new LocalOrderSegmentSplitter(taskIds, 1000)
+    );
+    validate(
+        sdr,
+        expectedBlockIds1,
+        expectedData1,
+        new HashSet<>(Arrays.asList(0L))
+    );
+
+    // case2: get the mapIdx range [0, 2) of partition0
+    final Set<Long> expectedBlockIds2 = partitionToBlocksWithMapIdx.get(0).get(1).stream()
+        .map(x -> x.getBlockId())
+        .collect(Collectors.toSet());
+    expectedBlockIds2.addAll(expectedBlockIds1);
+    final Map<Long, byte[]> expectedData2 = expectedData.entrySet().stream()
+        .filter(x -> expectedBlockIds2.contains(x.getKey()))
+        .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
+    taskIds = Roaring64NavigableMap.bitmapOf(0, 1);
+    sdr  = readShuffleData(
+        shuffleServerClient,
+        testAppId,
+        0,
+        0,
+        1,
+        10,
+        1000,
+        0,
+        new LocalOrderSegmentSplitter(taskIds, 1000)
+    );
+    validate(
+        sdr,
+        expectedBlockIds2,
+        expectedData2,
+        new HashSet<>(Arrays.asList(0L, 1L))
+    );
+
+    // case2: get the mapIdx range [1, 3) of partition0
+    final Set<Long> expectedBlockIds3 = partitionToBlocksWithMapIdx.get(0).get(1).stream()
+        .map(x -> x.getBlockId())
+        .collect(Collectors.toSet());
+    expectedBlockIds3.addAll(
+        partitionToBlocksWithMapIdx.get(0).get(2).stream()
+            .map(x -> x.getBlockId())
+            .collect(Collectors.toSet())
+    );
+    expectedBlockIds2.addAll(expectedBlockIds1);
+    final Map<Long, byte[]> expectedData3 = expectedData.entrySet().stream()
+        .filter(x -> expectedBlockIds3.contains(x.getKey()))
+        .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
+    taskIds = Roaring64NavigableMap.bitmapOf(1, 2);
+    sdr  = readShuffleData(
+        shuffleServerClient,
+        testAppId,
+        0,
+        0,
+        1,
+        10,
+        1000,
+        0,
+        new LocalOrderSegmentSplitter(taskIds, 1000)
+    );
+    validate(
+        sdr,
+        expectedBlockIds3,
+        expectedData3,
+        new HashSet<>(Arrays.asList(1L, 2L))
+    );
+
+    // case3: get the mapIdx range [0, Integer.MAX_VALUE) of partition0, it should always return all data
+    final Set<Long> expectedBlockIds4 = partitionToBlocks.get(0).stream()
+        .map(x -> x.getBlockId())
+        .collect(Collectors.toSet());
+    final Map<Long, byte[]> expectedData4 = expectedData.entrySet().stream()
+        .filter(x -> expectedBlockIds4.contains(x.getKey()))
+        .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
+    taskIds = Roaring64NavigableMap.bitmapOf();
+    for (long blockId : expectedBlockIds4) {
+      taskIds.add(new DefaultIdHelper().getTaskAttemptId(blockId));
+    }
+    sdr  = readShuffleData(
+        shuffleServerClient,
+        testAppId,
+        0,
+        0,
+        1,
+        10,
+        10000,
+        0,
+        new LocalOrderSegmentSplitter(taskIds, 100000)
+    );
+    validate(
+        sdr,
+        expectedBlockIds4,
+        expectedData4,
+        new HashSet<>(Arrays.asList(0L, 1L, 2L))
+    );
+  }
+
+  private void validate(ShuffleDataResult sdr, Set<Long> expectedBlockIds,
+      Map<Long, byte[]> expectedData, Set<Long> expectedTaskAttemptIds) {
+    byte[] buffer = sdr.getData();
+    List<BufferSegment> bufferSegments = sdr.getBufferSegments();
+    int matched = 0;
+    for (BufferSegment bs : bufferSegments) {
+      if (expectedBlockIds.contains(bs.getBlockId())) {
+        byte[] data = new byte[bs.getLength()];
+        System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength());
+        assertEquals(bs.getCrc(), ChecksumUtils.getCrc32(data));
+        assertTrue(Arrays.equals(data, expectedData.get(bs.getBlockId())));
+        assertTrue(expectedBlockIds.contains(bs.getBlockId()));
+        assertTrue(expectedTaskAttemptIds.contains(bs.getTaskAttemptId()));
+        matched++;
+      } else {
+        fail();
+      }
+    }
+    assertEquals(expectedBlockIds.size(), matched);
+  }
+}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
index bf07502a..3f142272 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
@@ -58,7 +58,7 @@ public class ShuffleServerWithLocalTest extends ShuffleReadWriteBase {
   private ShuffleServerGrpcClient shuffleServerClient;
 
   @BeforeAll
-  public static void setupServers() throws Exception {
+  private static void setupServers() throws Exception {
     CoordinatorConf coordinatorConf = getCoordinatorConf();
     createCoordinatorServer(coordinatorConf);
     ShuffleServerConf shuffleServerConf = getShuffleServerConf();
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 8df19926..0c985378 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -42,6 +42,7 @@ import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.RetryUtils;
@@ -104,8 +105,14 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
   @Test
   public void rpcFailTest() throws Exception {
     String testAppId = "rpcFailTest";
-    shuffleWriteClientImpl.registerShuffle(shuffleServerInfo1,
-        testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), new RemoteStorageInfo(""));
+    shuffleWriteClientImpl.registerShuffle(
+        shuffleServerInfo1,
+        testAppId,
+        0,
+        Lists.newArrayList(new PartitionRange(0, 0)),
+        new RemoteStorageInfo(""),
+        ShuffleDataDistributionType.NORMAL
+    );
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
 
@@ -149,11 +156,23 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
   public void reportMultipleServerTest() throws Exception {
     String testAppId = "reportMultipleServerTest";
 
-    shuffleWriteClientImpl.registerShuffle(shuffleServerInfo1,
-        testAppId, 1, Lists.newArrayList(new PartitionRange(1, 1)), new RemoteStorageInfo(""));
-
-    shuffleWriteClientImpl.registerShuffle(shuffleServerInfo2,
-        testAppId, 1, Lists.newArrayList(new PartitionRange(2, 2)), new RemoteStorageInfo(""));
+    shuffleWriteClientImpl.registerShuffle(
+        shuffleServerInfo1,
+        testAppId,
+        1,
+        Lists.newArrayList(new PartitionRange(1, 1)),
+        new RemoteStorageInfo(""),
+        ShuffleDataDistributionType.NORMAL
+    );
+
+    shuffleWriteClientImpl.registerShuffle(
+        shuffleServerInfo2,
+        testAppId,
+        1,
+        Lists.newArrayList(new PartitionRange(2, 2)),
+        new RemoteStorageInfo(""),
+        ShuffleDataDistributionType.NORMAL
+    );
 
     Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
     partitionToServers.putIfAbsent(1, Lists.newArrayList(shuffleServerInfo1));
@@ -212,10 +231,22 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
   @Test
   public void writeReadTest() throws Exception {
     String testAppId = "writeReadTest";
-    shuffleWriteClientImpl.registerShuffle(shuffleServerInfo1,
-        testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), new RemoteStorageInfo(""));
-    shuffleWriteClientImpl.registerShuffle(shuffleServerInfo2,
-        testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), new RemoteStorageInfo(""));
+    shuffleWriteClientImpl.registerShuffle(
+        shuffleServerInfo1,
+        testAppId,
+        0,
+        Lists.newArrayList(new PartitionRange(0, 0)),
+        new RemoteStorageInfo(""),
+        ShuffleDataDistributionType.NORMAL
+    );
+    shuffleWriteClientImpl.registerShuffle(
+        shuffleServerInfo2,
+        testAppId,
+        0,
+        Lists.newArrayList(new PartitionRange(0, 0)),
+        new RemoteStorageInfo(""),
+        ShuffleDataDistributionType.NORMAL
+    );
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
@@ -263,8 +294,14 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
   @Test
   public void emptyTaskTest() {
     String testAppId = "emptyTaskTest";
-    shuffleWriteClientImpl.registerShuffle(shuffleServerInfo1,
-        testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), new RemoteStorageInfo(""));
+    shuffleWriteClientImpl.registerShuffle(
+        shuffleServerInfo1,
+        testAppId,
+        0,
+        Lists.newArrayList(new PartitionRange(0, 0)),
+        new RemoteStorageInfo(""),
+        ShuffleDataDistributionType.NORMAL
+    );
     boolean commitResult = shuffleWriteClientImpl
         .sendCommit(Sets.newHashSet(shuffleServerInfo1), testAppId, 0, 2);
     assertTrue(commitResult);
@@ -306,7 +343,13 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
           });
         }
         shuffleWriteClientImpl.registerShuffle(
-            entry.getKey(), appId, 0, entry.getValue(), remoteStorage);
+            entry.getKey(),
+            appId,
+            0,
+            entry.getValue(),
+            remoteStorage,
+            ShuffleDataDistributionType.NORMAL
+        );
       });
       return shuffleAssignments;
     }, heartbeatTimeout, maxTryTime);
diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java
index 50e0c27e..576f1d71 100644
--- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java
+++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java
@@ -44,7 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class AQESkewedJoinTest extends SparkIntegrationTestBase {
 
   @BeforeAll
-  public static void setupServers() throws Exception {
+  private static void setupServers() throws Exception {
     CoordinatorConf coordinatorConf = getCoordinatorConf();
     Map<String, String> dynamicConf = Maps.newHashMap();
     dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test");
diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java
new file mode 100644
index 00000000..331f5898
--- /dev/null
+++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.junit.jupiter.api.BeforeAll;
+
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class AQESkewedJoinWithLocalOrderTest extends AQESkewedJoinTest {
+
+  @BeforeAll
+  public static void setupServers() throws Exception {
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    createCoordinatorServer(coordinatorConf);
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    // Use the LOCALFILE storage type to ensure the data will be flushed by local_order mechanism
+    shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name());
+    createShuffleServer(shuffleServerConf);
+    startServers();
+  }
+
+  @Override
+  public void updateSparkConfCustomer(SparkConf sparkConf) {
+    sparkConf.set(RssSparkConfig.RSS_STORAGE_TYPE.key(), "LOCALFILE");
+    sparkConf.set("spark." + RssClientConf.DATA_DISTRIBUTION_TYPE.key(),
+        ShuffleDataDistributionType.LOCAL_ORDER.name());
+  }
+}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index f817b2ca..3d54e50a 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -57,6 +57,7 @@ import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.exception.NotRetryException;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.RetryUtils;
@@ -131,12 +132,14 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
       int shuffleId,
       List<PartitionRange> partitionRanges,
       RemoteStorageInfo remoteStorageInfo,
-      String user) {
+      String user,
+      ShuffleDataDistributionType dataDistributionType) {
     ShuffleRegisterRequest.Builder reqBuilder = ShuffleRegisterRequest.newBuilder();
     reqBuilder
         .setAppId(appId)
         .setShuffleId(shuffleId)
         .setUser(user)
+        .setShuffleDataDistribution(RssProtos.DataDistribution.valueOf(dataDistributionType.name()))
         .addAllPartitionRanges(toShufflePartitionRanges(partitionRanges));
     RemoteStorage.Builder rsBuilder = RemoteStorage.newBuilder();
     rsBuilder.setPath(remoteStorageInfo.getPath());
@@ -245,7 +248,8 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
         request.getShuffleId(),
         request.getPartitionRanges(),
         request.getRemoteStorageInfo(),
-        request.getUser()
+        request.getUser(),
+        request.getDataDistributionType()
     );
 
     RssRegisterShuffleResponse response;
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java
index 249d7136..4365a4b1 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
 
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 
 public class RssRegisterShuffleRequest {
 
@@ -31,18 +32,21 @@ public class RssRegisterShuffleRequest {
   private List<PartitionRange> partitionRanges;
   private RemoteStorageInfo remoteStorageInfo;
   private String user;
+  private ShuffleDataDistributionType dataDistributionType;
 
   public RssRegisterShuffleRequest(
       String appId,
       int shuffleId,
       List<PartitionRange> partitionRanges,
       RemoteStorageInfo remoteStorageInfo,
-      String user) {
+      String user,
+      ShuffleDataDistributionType dataDistributionType) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionRanges = partitionRanges;
     this.remoteStorageInfo = remoteStorageInfo;
     this.user = user;
+    this.dataDistributionType = dataDistributionType;
   }
 
   public RssRegisterShuffleRequest(
@@ -50,7 +54,13 @@ public class RssRegisterShuffleRequest {
       int shuffleId,
       List<PartitionRange> partitionRanges,
       String remoteStoragePath) {
-    this(appId, shuffleId, partitionRanges, new RemoteStorageInfo(remoteStoragePath), StringUtils.EMPTY);
+    this(appId,
+        shuffleId,
+        partitionRanges,
+        new RemoteStorageInfo(remoteStoragePath),
+        StringUtils.EMPTY,
+        ShuffleDataDistributionType.NORMAL
+    );
   }
 
   public String getAppId() {
@@ -72,4 +82,8 @@ public class RssRegisterShuffleRequest {
   public String getUser() {
     return user;
   }
+
+  public ShuffleDataDistributionType getDataDistributionType() {
+    return dataDistributionType;
+  }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 4555a194..4a4077cf 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -167,6 +167,12 @@ message ShuffleRegisterRequest {
   repeated ShufflePartitionRange partitionRanges = 3;
   RemoteStorage remoteStorage = 4;
   string user = 5;
+  DataDistribution shuffleDataDistribution = 6;
+}
+
+enum DataDistribution {
+  NORMAL = 0;
+  LOCAL_ORDER = 1;
 }
 
 message ShuffleUnregisterRequest {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 3c2a190a..45e329d1 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -36,6 +36,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.util.ThreadUtils;
@@ -359,4 +360,8 @@ public class ShuffleFlushManager {
       return createTimeStamp;
     }
   }
+
+  public ShuffleDataDistributionType getDataDistributionType(String appId) {
+    return shuffleServer.getShuffleTaskManager().getDataDistributionType(appId);
+  }
 }
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 3980cc65..6af12b76 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Lists;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
@@ -138,6 +140,14 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
     String remoteStoragePath = req.getRemoteStorage().getPath();
     String user = req.getUser();
 
+    ShuffleDataDistributionType shuffleDataDistributionType =
+            ShuffleDataDistributionType.valueOf(
+                Optional
+                    .ofNullable(req.getShuffleDataDistribution())
+                    .orElse(RssProtos.DataDistribution.NORMAL)
+                    .name()
+            );
+
     Map<String, String> remoteStorageConf = req
         .getRemoteStorage()
         .getRemoteStorageConfList()
@@ -156,7 +166,8 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
             shuffleId,
             partitionRanges,
             new RemoteStorageInfo(remoteStoragePath, remoteStorageConf),
-            user
+            user,
+            shuffleDataDistributionType
         );
 
     reply = ShuffleRegisterResponse
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 963a57b8..aeb00887 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import com.google.common.collect.Maps;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+
 /**
  * ShuffleTaskInfo contains the information of submitting the shuffle,
  * the information of the cache block, user and timestamp corresponding to the app
@@ -42,12 +44,15 @@ public class ShuffleTaskInfo {
   private Map<Integer, Roaring64NavigableMap> cachedBlockIds;
   private AtomicReference<String> user;
 
+  private AtomicReference<ShuffleDataDistributionType> dataDistType;
+
   public ShuffleTaskInfo() {
     this.currentTimes = System.currentTimeMillis();
     this.commitCounts = Maps.newConcurrentMap();
     this.commitLocks = Maps.newConcurrentMap();
     this.cachedBlockIds = Maps.newConcurrentMap();
     this.user = new AtomicReference<>();
+    this.dataDistType = new AtomicReference<>();
   }
 
   public Long getCurrentTimes() {
@@ -77,4 +82,13 @@ public class ShuffleTaskInfo {
   public void setUser(String user) {
     this.user.set(user);
   }
+
+  public void setDataDistType(
+      ShuffleDataDistributionType dataDistType) {
+    this.dataDistType.set(dataDistType);
+  }
+
+  public ShuffleDataDistributionType getDataDistType() {
+    return dataDistType.get();
+  }
 }
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 2ec3d6c9..504b791f 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
@@ -134,14 +135,36 @@ public class ShuffleTaskManager {
     thread.start();
   }
 
+  /**
+   * Only for test
+   */
+  @VisibleForTesting
   public StatusCode registerShuffle(
       String appId,
       int shuffleId,
       List<PartitionRange> partitionRanges,
       RemoteStorageInfo remoteStorageInfo,
       String user) {
+    return registerShuffle(
+        appId,
+        shuffleId,
+        partitionRanges,
+        remoteStorageInfo,
+        user,
+        ShuffleDataDistributionType.NORMAL
+    );
+  }
+
+  public StatusCode registerShuffle(
+      String appId,
+      int shuffleId,
+      List<PartitionRange> partitionRanges,
+      RemoteStorageInfo remoteStorageInfo,
+      String user,
+      ShuffleDataDistributionType dataDistType) {
     refreshAppId(appId);
     shuffleTaskInfos.get(appId).setUser(user);
+    shuffleTaskInfos.get(appId).setDataDistType(dataDistType);
     partitionsToBlockIds.putIfAbsent(appId, Maps.newConcurrentMap());
     for (PartitionRange partitionRange : partitionRanges) {
       shuffleBufferManager.registerBuffer(appId, shuffleId, partitionRange.getStart(), partitionRange.getEnd());
@@ -502,4 +525,8 @@ public class ShuffleTaskManager {
   void removeShuffleDataSync(String appId, int shuffleId) {
     removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId));
   }
+
+  public ShuffleDataDistributionType getDataDistributionType(String appId) {
+    return shuffleTaskInfos.get(appId).getDataDistType();
+  }
 }
diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index b251113f..724fc6bc 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShufflePartitionedData;
@@ -74,12 +75,16 @@ public class ShuffleBuffer {
       int shuffleId,
       int startPartition,
       int endPartition,
-      Supplier<Boolean> isValid) {
+      Supplier<Boolean> isValid,
+      ShuffleDataDistributionType dataDistributionType) {
     if (blocks.isEmpty()) {
       return null;
     }
     // buffer will be cleared, and new list must be created for async flush
     List<ShufflePartitionedBlock> spBlocks = new LinkedList<>(blocks);
+    if (dataDistributionType == ShuffleDataDistributionType.LOCAL_ORDER) {
+      spBlocks.sort((o1, o2) -> o1.getTaskAttemptId() - o2.getTaskAttemptId() > 0 ? 1 : -1);
+    }
     long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement();
     final ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
         eventId,
@@ -97,6 +102,18 @@ public class ShuffleBuffer {
     return event;
   }
 
+  /**
+   * Only for test
+   */
+  public synchronized ShuffleDataFlushEvent toFlushEvent(
+      String appId,
+      int shuffleId,
+      int startPartition,
+      int endPartition,
+      Supplier<Boolean> isValid) {
+    return toFlushEvent(appId, shuffleId, startPartition, endPartition, isValid, ShuffleDataDistributionType.NORMAL);
+  }
+
   public List<ShufflePartitionedBlock> getBlocks() {
     return blocks;
   }
diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 99ae9c6b..f87ae896 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -195,8 +195,14 @@ public class ShuffleBufferManager {
   protected void flushBuffer(ShuffleBuffer buffer, String appId,
       int shuffleId, int startPartition, int endPartition) {
     ShuffleDataFlushEvent event =
-        buffer.toFlushEvent(appId, shuffleId, startPartition, endPartition,
-            () -> bufferPool.containsKey(appId));
+        buffer.toFlushEvent(
+            appId,
+            shuffleId,
+            startPartition,
+            endPartition,
+            () -> bufferPool.containsKey(appId),
+            shuffleFlushManager.getDataDistributionType(appId)
+        );
     if (event != null) {
       updateShuffleSize(appId, shuffleId, -event.getSize());
       inFlushSize.addAndGet(event.getSize());
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 5a869b43..f557531f 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -82,8 +82,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
     conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
     ShuffleServer shuffleServer = new ShuffleServer(conf);
-    ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf,
-        shuffleServer.getShuffleFlushManager(), shuffleServer.getShuffleBufferManager(), null);
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
 
     String appId = "registerTest1";
     int shuffleId = 1;
@@ -138,11 +137,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     conf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 3000L);
     conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
     ShuffleServer shuffleServer = new ShuffleServer(conf);
-    ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
-    ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
-    StorageManager storageManager = shuffleServer.getStorageManager();
-    ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(
-        conf, shuffleFlushManager, shuffleBufferManager, storageManager);
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
     shuffleTaskManager.registerShuffle(
         appId,
         shuffleId,
@@ -184,6 +179,8 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     assertEquals(1, bufferIds.size());
     assertEquals(StatusCode.SUCCESS, sc);
     shuffleTaskManager.commitShuffle(appId, shuffleId);
+
+    ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
     assertEquals(1, shuffleFlushManager.getCommittedBlockIds(appId, shuffleId).getLongCardinality());
 
     // flush for partition 1-1
@@ -279,11 +276,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
 
     ShuffleServer shuffleServer = new ShuffleServer(conf);
 
-    ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
-    ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
-    StorageManager storageManager = shuffleServer.getStorageManager();
-    ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(
-        conf, shuffleFlushManager, shuffleBufferManager, storageManager);
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
 
     String appId = "removeShuffleDataTest1";
     for (int i = 0; i < 4; i++) {
@@ -312,6 +305,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
 
     assertEquals(1, shuffleTaskManager.getAppIds().size());
 
+    ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
     RangeMap<Integer, ShuffleBuffer> rangeMap = shuffleBufferManager.getBufferPool().get(appId).get(0);
     assertFalse(rangeMap.asMapOfRanges().isEmpty());
     shuffleTaskManager.commitShuffle(appId, 0);
@@ -351,12 +345,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
         path1.toAbsolutePath().toString() + "," + path2.toAbsolutePath().toString());
 
     ShuffleServer shuffleServer = new ShuffleServer(conf);
-
-    ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
-    ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
-    StorageManager storageManager = shuffleServer.getStorageManager();
-    ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(
-        conf, shuffleFlushManager, shuffleBufferManager, storageManager);
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
 
     String appId = "removeShuffleDataWithLocalfileTest";
 
@@ -411,11 +400,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
 
     ShuffleServer shuffleServer = new ShuffleServer(conf);
-    ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
-    ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
-    StorageManager storageManager = shuffleServer.getStorageManager();
-    ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, shuffleFlushManager,
-        shuffleBufferManager, storageManager);
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
     shuffleTaskManager.registerShuffle(
         "clearTest1",
         shuffleId,
diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 50c91f5a..1bee2d9a 100644
--- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -54,6 +54,8 @@ import static org.mockito.Mockito.when;
 public class ShuffleBufferManagerTest extends BufferTestBase {
   private ShuffleBufferManager shuffleBufferManager;
   private ShuffleFlushManager mockShuffleFlushManager;
+  private ShuffleServer mockShuffleServer;
+  private ShuffleTaskManager mockShuffleTaskManager;
   private ShuffleServerConf conf;
 
   @BeforeEach
@@ -68,6 +70,9 @@ public class ShuffleBufferManagerTest extends BufferTestBase {
     conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 80.0);
     conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
     mockShuffleFlushManager = mock(ShuffleFlushManager.class);
+    mockShuffleServer = mock(ShuffleServer.class);
+    mockShuffleTaskManager = mock(ShuffleTaskManager.class);
+    when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
     shuffleBufferManager = new ShuffleBufferManager(conf, mockShuffleFlushManager);
   }
 
diff --git a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index 9dcb280c..dbe4ac3e 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -118,10 +118,12 @@ public class ShuffleHandlerFactory {
     List<ShuffleServerClient> shuffleServerClients = shuffleServerInfoList.stream().map(
         ssi -> ShuffleServerClientFactory.getInstance().getShuffleServerClient(ClientType.GRPC.name(), ssi)).collect(
         Collectors.toList());
-    return new LocalFileQuorumClientReadHandler(request.getAppId(), request.getShuffleId(), request.getPartitionId(),
+    return new LocalFileQuorumClientReadHandler(
+        request.getAppId(), request.getShuffleId(), request.getPartitionId(),
         request.getIndexReadLimit(), request.getPartitionNumPerRange(), request.getPartitionNum(),
         request.getReadBufferSize(), request.getExpectBlockIds(), request.getProcessBlockIds(),
-        shuffleServerClients);
+        shuffleServerClients, request.getDistributionType(), request.getExpectTaskIds()
+    );
   }
 
   private ClientReadHandler getHdfsClientReadHandler(CreateShuffleReadHandlerRequest request) {
@@ -136,7 +138,10 @@ public class ShuffleHandlerFactory {
         request.getExpectBlockIds(),
         request.getProcessBlockIds(),
         request.getStorageBasePath(),
-        request.getHadoopConf());
+        request.getHadoopConf(),
+        request.getDistributionType(),
+        request.getExpectTaskIds()
+    );
   }
 
   public ShuffleDeleteHandler createShuffleDeleteHandler(CreateShuffleDeleteHandlerRequest request) {
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
index ce7a515f..1cf7dfd4 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
@@ -24,10 +24,11 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
-import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.segment.SegmentSplitterFactory;
 
 public abstract class DataSkippableReadHandler extends AbstractClientReadHandler {
   private static final Logger LOG = LoggerFactory.getLogger(DataSkippableReadHandler.class);
@@ -38,19 +39,26 @@ public abstract class DataSkippableReadHandler extends AbstractClientReadHandler
   protected Roaring64NavigableMap expectBlockIds;
   protected Roaring64NavigableMap processBlockIds;
 
+  protected ShuffleDataDistributionType distributionType;
+  protected Roaring64NavigableMap expectTaskIds;
+
   public DataSkippableReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
       Roaring64NavigableMap expectBlockIds,
-      Roaring64NavigableMap processBlockIds) {
+      Roaring64NavigableMap processBlockIds,
+      ShuffleDataDistributionType distributionType,
+      Roaring64NavigableMap expectTaskIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     this.expectBlockIds = expectBlockIds;
     this.processBlockIds = processBlockIds;
+    this.distributionType = distributionType;
+    this.expectTaskIds = expectTaskIds;
   }
 
   protected abstract ShuffleIndexResult readShuffleIndex();
@@ -64,7 +72,11 @@ public abstract class DataSkippableReadHandler extends AbstractClientReadHandler
         return null;
       }
 
-      shuffleDataSegments = RssUtils.transIndexDataToSegments(shuffleIndexResult, readBufferSize);
+      shuffleDataSegments =
+          SegmentSplitterFactory
+              .getInstance()
+              .get(distributionType, expectTaskIds, readBufferSize)
+              .split(shuffleIndexResult);
     }
 
     // We should skip unexpected and processed segments when handler is read
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
index 872672cc..a9208960 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.common.util.Constants;
@@ -53,6 +54,9 @@ public class HdfsClientReadHandler extends AbstractClientReadHandler {
   private long readLength = 0L;
   private long readUncompressLength = 0L;
 
+  private ShuffleDataDistributionType distributionType;
+  private Roaring64NavigableMap expectTaskIds;
+
   public HdfsClientReadHandler(
       String appId,
       int shuffleId,
@@ -64,7 +68,9 @@ public class HdfsClientReadHandler extends AbstractClientReadHandler {
       Roaring64NavigableMap expectBlockIds,
       Roaring64NavigableMap processBlockIds,
       String storageBasePath,
-      Configuration hadoopConf) {
+      Configuration hadoopConf,
+      ShuffleDataDistributionType distributionType,
+      Roaring64NavigableMap expectTaskIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
@@ -76,6 +82,26 @@ public class HdfsClientReadHandler extends AbstractClientReadHandler {
     this.storageBasePath = storageBasePath;
     this.hadoopConf = hadoopConf;
     this.readHandlerIndex = 0;
+    this.distributionType = distributionType;
+    this.expectTaskIds = expectTaskIds;
+  }
+
+  // Only for test
+  public HdfsClientReadHandler(
+      String appId,
+      int shuffleId,
+      int partitionId,
+      int indexReadLimit,
+      int partitionNumPerRange,
+      int partitionNum,
+      int readBufferSize,
+      Roaring64NavigableMap expectBlockIds,
+      Roaring64NavigableMap processBlockIds,
+      String storageBasePath,
+      Configuration hadoopConf) {
+    this(appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange, partitionNum, readBufferSize,
+        expectBlockIds, processBlockIds, storageBasePath, hadoopConf, ShuffleDataDistributionType.NORMAL,
+        Roaring64NavigableMap.bitmapOf());
   }
 
   protected void init(String fullShufflePath) {
@@ -107,7 +133,8 @@ public class HdfsClientReadHandler extends AbstractClientReadHandler {
         try {
           HdfsShuffleReadHandler handler = new HdfsShuffleReadHandler(
               appId, shuffleId, partitionId, filePrefix,
-              readBufferSize, expectBlockIds, processBlockIds, hadoopConf);
+              readBufferSize, expectBlockIds, processBlockIds, hadoopConf,
+              distributionType, expectTaskIds);
           readHandlers.add(handler);
         } catch (Exception e) {
           LOG.warn("Can't create ShuffleReaderHandler for " + filePrefix, e);
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
index af94723f..810ff1bd 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
@@ -26,6 +26,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
@@ -51,13 +52,30 @@ public class HdfsShuffleReadHandler extends DataSkippableReadHandler {
       int readBufferSize,
       Roaring64NavigableMap expectBlockIds,
       Roaring64NavigableMap processBlockIds,
-      Configuration conf) throws Exception {
-    super(appId, shuffleId, partitionId, readBufferSize, expectBlockIds, processBlockIds);
+      Configuration conf,
+      ShuffleDataDistributionType distributionType,
+      Roaring64NavigableMap expectTaskIds) throws Exception {
+    super(appId, shuffleId, partitionId, readBufferSize, expectBlockIds, processBlockIds,
+        distributionType, expectTaskIds);
     this.filePrefix = filePrefix;
     this.indexReader = createHdfsReader(ShuffleStorageUtils.generateIndexFileName(filePrefix), conf);
     this.dataReader = createHdfsReader(ShuffleStorageUtils.generateDataFileName(filePrefix), conf);
   }
 
+  // Only for test
+  public HdfsShuffleReadHandler(
+      String appId,
+      int shuffleId,
+      int partitionId,
+      String filePrefix,
+      int readBufferSize,
+      Roaring64NavigableMap expectBlockIds,
+      Roaring64NavigableMap processBlockIds,
+      Configuration conf) throws Exception {
+    this(appId, shuffleId, partitionId, filePrefix, readBufferSize, expectBlockIds,
+        processBlockIds, conf, ShuffleDataDistributionType.NORMAL, Roaring64NavigableMap.bitmapOf());
+  }
+
   @Override
   protected ShuffleIndexResult readShuffleIndex() {
     long start = System.currentTimeMillis();
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
index a5c42c29..a630cf1b 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
@@ -25,6 +25,7 @@ import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
 import org.apache.uniffle.client.request.RssGetShuffleIndexRequest;
 import org.apache.uniffle.client.response.RssGetShuffleDataResponse;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
@@ -47,8 +48,13 @@ public class LocalFileClientReadHandler extends DataSkippableReadHandler {
       int readBufferSize,
       Roaring64NavigableMap expectBlockIds,
       Roaring64NavigableMap processBlockIds,
-      ShuffleServerClient shuffleServerClient) {
-    super(appId, shuffleId, partitionId, readBufferSize, expectBlockIds, processBlockIds);
+      ShuffleServerClient shuffleServerClient,
+      ShuffleDataDistributionType distributionType,
+      Roaring64NavigableMap expectTaskIds) {
+    super(
+        appId, shuffleId, partitionId, readBufferSize, expectBlockIds,
+        processBlockIds, distributionType, expectTaskIds
+    );
     this.shuffleServerClient = shuffleServerClient;
     this.partitionNumPerRange = partitionNumPerRange;
     this.partitionNum = partitionNum;
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
index 8cf1fe2a..d523cd63 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.exception.RssException;
 
@@ -49,7 +50,9 @@ public class LocalFileQuorumClientReadHandler extends AbstractClientReadHandler
       int readBufferSize,
       Roaring64NavigableMap expectBlockIds,
       Roaring64NavigableMap processBlockIds,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients,
+      ShuffleDataDistributionType distributionType,
+      Roaring64NavigableMap expectTaskIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
@@ -65,11 +68,34 @@ public class LocalFileQuorumClientReadHandler extends AbstractClientReadHandler
           readBufferSize,
           expectBlockIds,
           processBlockIds,
-          client
+          client,
+          distributionType,
+          expectBlockIds
       ));
     }
   }
 
+  /**
+   * Only for test
+   */
+  public LocalFileQuorumClientReadHandler(
+      String appId,
+      int shuffleId,
+      int partitionId,
+      int indexReadLimit,
+      int partitionNumPerRange,
+      int partitionNum,
+      int readBufferSize,
+      Roaring64NavigableMap expectBlockIds,
+      Roaring64NavigableMap processBlockIds,
+      List<ShuffleServerClient> shuffleServerClients) {
+    this(
+        appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange,
+        partitionNum, readBufferSize, expectBlockIds, processBlockIds,
+        shuffleServerClients, ShuffleDataDistributionType.NORMAL, Roaring64NavigableMap.bitmapOf()
+    );
+  }
+
   @Override
   public ShuffleDataResult readShuffleData() {
     boolean readSuccessful = false;
diff --git a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
index 046089c9..75a1f146 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssBaseConf;
 
@@ -41,6 +42,8 @@ public class CreateShuffleReadHandlerRequest {
   private List<ShuffleServerInfo> shuffleServerInfoList;
   private Roaring64NavigableMap expectBlockIds;
   private Roaring64NavigableMap processBlockIds;
+  private ShuffleDataDistributionType distributionType;
+  private Roaring64NavigableMap expectTaskIds;
 
   public CreateShuffleReadHandlerRequest() {
   }
@@ -156,4 +159,20 @@ public class CreateShuffleReadHandlerRequest {
   public Roaring64NavigableMap getProcessBlockIds() {
     return processBlockIds;
   }
+
+  public ShuffleDataDistributionType getDistributionType() {
+    return distributionType;
+  }
+
+  public void setDistributionType(ShuffleDataDistributionType distributionType) {
+    this.distributionType = distributionType;
+  }
+
+  public Roaring64NavigableMap getExpectTaskIds() {
+    return expectTaskIds;
+  }
+
+  public void setExpectTaskIds(Roaring64NavigableMap expectTaskIds) {
+    this.expectTaskIds = expectTaskIds;
+  }
 }
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
index ae7f0541..5c1b5294 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
@@ -32,8 +32,8 @@ import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter;
 import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
 import org.apache.uniffle.storage.handler.api.ServerReadHandler;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -97,8 +97,7 @@ public class LocalFileHandlerTestBase {
       return shuffleDataResults;
     }
 
-    List<ShuffleDataSegment> shuffleDataSegments =
-        RssUtils.transIndexDataToSegments(shuffleIndexResult, 32);
+    List<ShuffleDataSegment> shuffleDataSegments = new FixedSizeSegmentSplitter(32).split(shuffleIndexResult);
 
     for (ShuffleDataSegment shuffleDataSegment : shuffleDataSegments) {
       byte[] shuffleData =
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
index d25f44f4..e2dcfd5b 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
@@ -33,6 +33,7 @@ import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
 import org.apache.uniffle.client.response.ResponseStatusCode;
 import org.apache.uniffle.client.response.RssGetShuffleDataResponse;
 import org.apache.uniffle.client.response.RssGetShuffleIndexResponse;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
@@ -95,7 +96,8 @@ public class LocalFileServerReadHandlerTest {
 
     Roaring64NavigableMap processBlockIds =  Roaring64NavigableMap.bitmapOf();
     LocalFileClientReadHandler handler = new LocalFileClientReadHandler(appId, partitionId, shuffleId, -1, 1, 1,
-        readBufferSize, expectBlockIds, processBlockIds, mockShuffleServerClient);
+        readBufferSize, expectBlockIds, processBlockIds, mockShuffleServerClient,
+        ShuffleDataDistributionType.NORMAL, Roaring64NavigableMap.bitmapOf());
     int totalSegment = ((blockSize * actualWriteDataBlock) / bytesPerSegment) + 1;
     int readBlocks = 0;
     for (int i = 0; i < totalSegment; i++) {