You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/10/27 06:28:52 UTC

[incubator-uniffle] branch master updated: [ISSUE-239][BUG] RssUtils#transIndexDataToSegments should consider the length of the data file (#275)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 2429c67f [ISSUE-239][BUG] RssUtils#transIndexDataToSegments should consider the length of the data file (#275)
2429c67f is described below

commit 2429c67fb3ef48b17e3a11fb1f1665a2fee37c0b
Author: Xianming Lei <31...@users.noreply.github.com>
AuthorDate: Thu Oct 27 14:28:47 2022 +0800

    [ISSUE-239][BUG] RssUtils#transIndexDataToSegments should consider the length of the data file (#275)
    
    ### What changes were proposed in this pull request?
    For issue#239, Fix inconsistent blocks when reading shuffle data.
    
    ### Why are the changes needed?
    This problem will cause reading shuffle data failed.
    
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    
    ### How was this patch tested?
    Already added UT
    
    Co-authored-by: leixianming <le...@didiglobal.com>
---
 .../apache/uniffle/common/ShuffleIndexResult.java  |  10 +-
 .../org/apache/uniffle/common/util/RssUtils.java   |  14 +-
 .../uniffle/common/ShuffleIndexResultTest.java     |   2 +-
 .../apache/uniffle/common/util/RssUtilsTest.java   |   4 +-
 .../client/impl/grpc/ShuffleServerGrpcClient.java  |   2 +-
 .../response/RssGetShuffleIndexResponse.java       |   4 +-
 proto/src/main/proto/Rss.proto                     |   1 +
 .../uniffle/server/ShuffleServerGrpcService.java   |   1 +
 .../storage/handler/impl/HdfsFileReader.java       |   7 +-
 .../handler/impl/HdfsShuffleReadHandler.java       |  13 +-
 .../handler/impl/LocalFileServerReadHandler.java   |  10 +-
 .../handler/impl/HdfsShuffleReadHandlerTest.java   | 139 ++++++++++++++++++
 .../storage/handler/impl/LocalFileHandlerTest.java | 112 +++------------
 .../handler/impl/LocalFileHandlerTestBase.java     | 157 +++++++++++++++++++++
 .../impl/LocalFileServerReadHandlerTest.java       | 109 ++++++++++++++
 15 files changed, 475 insertions(+), 110 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
index 36dc8213..7e5d5b58 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
@@ -19,19 +19,25 @@ package org.apache.uniffle.common;
 
 public class ShuffleIndexResult {
   private final byte[] indexData;
+  private long dataFileLen;
 
   public ShuffleIndexResult() {
-    this(new byte[0]);
+    this(new byte[0], -1);
   }
 
-  public ShuffleIndexResult(byte[] bytes) {
+  public ShuffleIndexResult(byte[] bytes, long dataFileLen) {
     this.indexData = bytes;
+    this.dataFileLen = dataFileLen;
   }
 
   public byte[] getIndexData() {
     return indexData;
   }
 
+  public long getDataFileLen() {
+    return dataFileLen;
+  }
+
   public boolean isEmpty() {
     return indexData == null || indexData.length == 0;
   }
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 47160204..848d6d01 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -191,15 +191,18 @@ public class RssUtils {
     }
 
     byte[] indexData = shuffleIndexResult.getIndexData();
-    return transIndexDataToSegments(indexData, readBufferSize);
+    long dataFileLen = shuffleIndexResult.getDataFileLen();
+    return transIndexDataToSegments(indexData, readBufferSize, dataFileLen);
   }
 
-  private static List<ShuffleDataSegment> transIndexDataToSegments(byte[] indexData, int readBufferSize) {
+  private static List<ShuffleDataSegment> transIndexDataToSegments(byte[] indexData,
+      int readBufferSize, long dataFileLen) {
     ByteBuffer byteBuffer = ByteBuffer.wrap(indexData);
     List<BufferSegment> bufferSegments = Lists.newArrayList();
     List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList();
     int bufferOffset = 0;
     long fileOffset = -1;
+    long totalLength = 0;
 
     while (byteBuffer.hasRemaining()) {
       try {
@@ -218,6 +221,13 @@ public class RssUtils {
 
         bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
         bufferOffset += length;
+        totalLength += length;
+
+        // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater
+        // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException
+        if (dataFileLen != -1 && totalLength >= dataFileLen) {
+          break;
+        }
 
         if (bufferOffset >= readBufferSize) {
           ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
diff --git a/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java b/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java
index 74c7c0d3..349e5a88 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java
@@ -26,7 +26,7 @@ public class ShuffleIndexResultTest {
   @Test
   public void testEmpty() {
     assertTrue(new ShuffleIndexResult().isEmpty());
-    assertTrue(new ShuffleIndexResult(null).isEmpty());
+    assertTrue(new ShuffleIndexResult(null, -1).isEmpty());
   }
 
 }
diff --git a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index c67bf482..4c1f5a8c 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -137,7 +137,7 @@ public class RssUtilsTest {
     }
 
     byte[] data = byteBuffer.array();
-    shuffleDataSegments = RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data), readBufferSize);
+    shuffleDataSegments = RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data, -1), readBufferSize);
     assertEquals(expectedTotalSegmentNum, shuffleDataSegments.size());
 
     assertEquals(0, shuffleDataSegments.get(0).getOffset());
@@ -158,7 +158,7 @@ public class RssUtilsTest {
     data = incompleteByteBuffer.array();
     // It should throw exception
     try {
-      RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data), readBufferSize);
+      RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data, -1), readBufferSize);
       fail();
     } catch (Exception e) {
       // ignore
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 7f4fa076..f817b2ca 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -576,7 +576,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
     switch (statusCode) {
       case SUCCESS:
         response = new RssGetShuffleIndexResponse(
-            ResponseStatusCode.SUCCESS, rpcResponse.getIndexData().toByteArray());
+            ResponseStatusCode.SUCCESS, rpcResponse.getIndexData().toByteArray(), rpcResponse.getDataFileLen());
 
         break;
       default:
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java
index 74ddb826..8884271c 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java
@@ -22,9 +22,9 @@ import org.apache.uniffle.common.ShuffleIndexResult;
 public class RssGetShuffleIndexResponse extends ClientResponse  {
   private final ShuffleIndexResult shuffleIndexResult;
 
-  public RssGetShuffleIndexResponse(ResponseStatusCode statusCode, byte[] data) {
+  public RssGetShuffleIndexResponse(ResponseStatusCode statusCode, byte[] data, long dataFileLen) {
     super(statusCode);
-    this.shuffleIndexResult = new ShuffleIndexResult(data);
+    this.shuffleIndexResult = new ShuffleIndexResult(data, dataFileLen);
   }
 
   public ShuffleIndexResult getShuffleIndexResult() {
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index c2111bb0..4555a194 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -111,6 +111,7 @@ message GetLocalShuffleIndexResponse {
   bytes indexData = 1;
   StatusCode status = 2;
   string retMsg = 3;
+  int64 dataFileLen = 4;
 }
 
 message ReportShuffleResultRequest {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index b7a8ab20..3980cc65 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -556,6 +556,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
             + " bytes with {}", readTime, data.length, requestInfo);
 
         builder.setIndexData(UnsafeByteOperations.unsafeWrap(data));
+        builder.setDataFileLen(shuffleIndexResult.getDataFileLen());
         reply = builder.build();
       } catch (FileNotFoundException indexFileNotFoundException) {
         LOG.warn("Index file for {} is not found, maybe the data has been flushed to cold storage.",
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
index a6414114..38d6439e 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
@@ -37,6 +37,7 @@ public class HdfsFileReader implements FileReader, Closeable {
   private Path path;
   private Configuration hadoopConf;
   private FSDataInputStream fsDataInputStream;
+  private FileSystem fileSystem;
 
   public HdfsFileReader(Path path, Configuration hadoopConf) throws Exception {
     this.path = path;
@@ -45,7 +46,7 @@ public class HdfsFileReader implements FileReader, Closeable {
   }
 
   private void createStream() throws Exception {
-    FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf);
+    fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf);
 
     if (!fileSystem.isFile(path)) {
       String msg = path + " don't exist or is not a file.";
@@ -92,4 +93,8 @@ public class HdfsFileReader implements FileReader, Closeable {
   public Path getPath() {
     return path;
   }
+
+  public long getFileLen() throws IOException {
+    return fileSystem.getFileStatus(path).getLen();
+  }
 }
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
index f91e27f4..af94723f 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
@@ -71,15 +71,15 @@ public class HdfsShuffleReadHandler extends DataSkippableReadHandler {
         System.arraycopy(indexData, 0, indexNewData, 0, expectedLen);
         indexData = indexNewData;
       }
+      long dateFileLen = getDataFileLen();
       LOG.info("Read index files {}.index for {} ms", filePrefix, System.currentTimeMillis() - start);
-      return new ShuffleIndexResult(indexData);
+      return new ShuffleIndexResult(indexData, dateFileLen);
     } catch (Exception e) {
       LOG.info("Fail to read index files {}.index", filePrefix, e);
     }
     return new ShuffleIndexResult();
   }
 
-  @Override
   protected ShuffleDataResult readShuffleData(ShuffleDataSegment shuffleDataSegment) {
     // Here we make an assumption that the rest of the file is corrupted, if an unexpected data is read.
     int expectedLength = shuffleDataSegment.getLength();
@@ -115,6 +115,15 @@ public class HdfsShuffleReadHandler extends DataSkippableReadHandler {
     return data;
   }
 
+  private long getDataFileLen() {
+    try {
+      return dataReader.getFileLen();
+    } catch (IOException ioException) {
+      LOG.error("getDataFileLen failed for " +  ShuffleStorageUtils.generateDataFileName(filePrefix), ioException);
+      return -1;
+    }
+  }
+
   public synchronized void close() {
     try {
       dataReader.close();
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
index d1d0bc7d..b786cfc7 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
@@ -140,14 +140,16 @@ public class LocalFileServerReadHandler implements ServerReadHandler {
     int indexNum = 0;
     int len = 0;
     try (LocalFileReader reader = createFileReader(indexFileName)) {
-      long fileSize = new File(indexFileName).length();
-      indexNum = (int)  (fileSize / FileBasedShuffleSegment.SEGMENT_SIZE);
+      long indexFileSize = new File(indexFileName).length();
+      indexNum = (int)  (indexFileSize / FileBasedShuffleSegment.SEGMENT_SIZE);
       len = indexNum * FileBasedShuffleSegment.SEGMENT_SIZE;
-      if (fileSize != len) {
+      if (indexFileSize != len) {
         LOG.warn("Maybe the index file: {} is being written due to the shuffle-buffer flushing.", indexFileName);
       }
       byte[] indexData = reader.read(0, len);
-      return new ShuffleIndexResult(indexData);
+      // get dataFileSize for read segment generation in DataSkippableReadHandler#readShuffleData
+      long dataFileSize = new File(dataFileName).length();
+      return new ShuffleIndexResult(indexData, dataFileSize);
     } catch (Exception e) {
       LOG.error("Fail to read index file {} indexNum {} len {}",
           indexFileName, indexNum, len);
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java
index 8c5dd7a7..cd2df454 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java
@@ -17,10 +17,15 @@
 
 package org.apache.uniffle.storage.handler.impl;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
@@ -30,8 +35,12 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.HdfsShuffleHandlerTestBase;
 import org.apache.uniffle.storage.HdfsTestBase;
+import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -99,4 +108,134 @@ public class HdfsShuffleReadHandlerTest extends HdfsTestBase {
   public void test() {
     createAndRunCases(HDFS_URI, conf, StringUtils.EMPTY);
   }
+
+  @Test
+  public void testDataInconsistent() {
+
+    try {
+      String basePath = HDFS_URI + "HdfsShuffleFileReadHandlerTest#testDataInconsistent";
+      TestHdfsShuffleWriteHandler writeHandler =
+          new TestHdfsShuffleWriteHandler(
+              "appId",
+              0,
+              1,
+              1,
+              basePath,
+              "test",
+              conf,
+              StringUtils.EMPTY);
+
+      Map<Long, byte[]> expectedData = Maps.newHashMap();
+      int totalBlockNum = 0;
+      int expectTotalBlockNum = 6;
+      int blockSize = 7;
+      int taskAttemptId = 0;
+
+      // write expectTotalBlockNum - 1 complete block
+      HdfsShuffleHandlerTestBase.writeTestData(writeHandler, expectTotalBlockNum - 1,
+          blockSize, taskAttemptId, expectedData);
+
+      // write 1 incomplete block , which only write index file
+      List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
+      byte[] buf = new byte[blockSize];
+      new Random().nextBytes(buf);
+      long blockId = (expectTotalBlockNum
+                          << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
+                              + taskAttemptId;
+      blocks.add(new ShufflePartitionedBlock(blockSize, blockSize, ChecksumUtils.getCrc32(buf), blockId,
+          taskAttemptId, buf));
+      writeHandler.writeIndex(blocks);
+
+      int readBufferSize = 13;
+      int total = HdfsShuffleHandlerTestBase.calcExpectedSegmentNum(expectTotalBlockNum, blockSize, readBufferSize);
+      Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+      Roaring64NavigableMap processBlockIds =  Roaring64NavigableMap.bitmapOf();
+      expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
+      String fileNamePrefix = ShuffleStorageUtils.getFullShuffleDataFolder(basePath,
+          ShuffleStorageUtils.getShuffleDataPathWithRange("appId",
+              0, 1, 1, 10)) + "/test_0";
+      HdfsShuffleReadHandler handler =
+          new HdfsShuffleReadHandler("appId", 0, 1, fileNamePrefix,
+              readBufferSize, expectBlockIds, processBlockIds, conf);
+
+      Set<Long> actualBlockIds = Sets.newHashSet();
+      for (int i = 0; i < total; ++i) {
+        ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+        totalBlockNum += shuffleDataResult.getBufferSegments().size();
+        HdfsShuffleHandlerTestBase.checkData(shuffleDataResult, expectedData);
+        for (BufferSegment bufferSegment : shuffleDataResult.getBufferSegments()) {
+          actualBlockIds.add(bufferSegment.getBlockId());
+        }
+      }
+
+      assertNull(handler.readShuffleData());
+      assertEquals(
+          total,
+          handler.getShuffleDataSegments().size());
+      // The last block cannot be read, only the index is generated
+      assertEquals(expectTotalBlockNum - 1, totalBlockNum);
+      assertEquals(expectedData.keySet(), actualBlockIds);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  static class TestHdfsShuffleWriteHandler extends HdfsShuffleWriteHandler {
+
+    private Configuration hadoopConf;
+    private Lock writeLock = new ReentrantLock();
+    private String basePath;
+    private String fileNamePrefix;
+    private int failTimes = 0;
+
+    TestHdfsShuffleWriteHandler(
+        String appId,
+        int shuffleId,
+        int startPartition,
+        int endPartition,
+        String storageBasePath,
+        String fileNamePrefix,
+        Configuration hadoopConf,
+        String user) throws Exception {
+      super(appId, shuffleId, startPartition, endPartition, storageBasePath, fileNamePrefix, hadoopConf, user);
+      this.hadoopConf = hadoopConf;
+      this.fileNamePrefix = fileNamePrefix;
+      this.basePath = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath,
+          ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId, startPartition, endPartition));
+    }
+
+
+    // only write index file
+    public void writeIndex(
+        List<ShufflePartitionedBlock> shuffleBlocks) throws IOException, IllegalStateException {
+      HdfsFileWriter indexWriter = null;
+      writeLock.lock();
+      try {
+        try {
+          String indexFileName = ShuffleStorageUtils.generateIndexFileName(fileNamePrefix + "_" + failTimes);
+          indexWriter = createWriter(indexFileName);
+          for (ShufflePartitionedBlock block : shuffleBlocks) {
+            long blockId = block.getBlockId();
+            long crc = block.getCrc();
+            long startOffset = indexWriter.nextOffset();
+
+            FileBasedShuffleSegment segment = new FileBasedShuffleSegment(
+                blockId, startOffset, block.getLength(), block.getUncompressLength(), crc, block.getTaskAttemptId());
+            indexWriter.writeIndex(segment);
+          }
+        } catch (Exception e) {
+          failTimes++;
+          throw new RuntimeException(e);
+        } finally {
+          if (indexWriter != null) {
+            indexWriter.close();
+          }
+        }
+      } finally {
+        writeLock.unlock();
+      }
+    }
+  }
 }
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTest.java
index 83df32db..a0c0043f 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTest.java
@@ -21,37 +21,24 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import org.junit.jupiter.api.Test;
 
-import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
-import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.config.RssBaseConf;
-import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.RssUtils;
-import org.apache.uniffle.storage.handler.api.ServerReadHandler;
-import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class LocalFileHandlerTest {
 
-  private static AtomicLong ATOMIC_LONG = new AtomicLong(0L);
-
   @Test
   public void writeTest() throws Exception {
     File tmpDir = Files.createTempDir();
@@ -76,15 +63,15 @@ public class LocalFileHandlerTest {
     final Set<Long> expectedBlockIds1 = Sets.newHashSet();
     final Set<Long> expectedBlockIds2 = Sets.newHashSet();
 
-    writeTestData(writeHandler1, 1, 32, expectedData, expectedBlockIds1);
-    writeTestData(writeHandler1, 2, 32, expectedData, expectedBlockIds1);
-    writeTestData(writeHandler1, 3, 32, expectedData, expectedBlockIds1);
-    writeTestData(writeHandler1, 4, 32, expectedData, expectedBlockIds1);
+    LocalFileHandlerTestBase.writeTestData(writeHandler1, 1, 32, expectedData, expectedBlockIds1);
+    LocalFileHandlerTestBase.writeTestData(writeHandler1, 2, 32, expectedData, expectedBlockIds1);
+    LocalFileHandlerTestBase.writeTestData(writeHandler1, 3, 32, expectedData, expectedBlockIds1);
+    LocalFileHandlerTestBase.writeTestData(writeHandler1, 4, 32, expectedData, expectedBlockIds1);
 
-    writeTestData(writeHandler2, 3, 32, expectedData, expectedBlockIds2);
-    writeTestData(writeHandler2, 3, 32, expectedData, expectedBlockIds2);
-    writeTestData(writeHandler2, 2, 32, expectedData, expectedBlockIds2);
-    writeTestData(writeHandler2, 1, 32, expectedData, expectedBlockIds2);
+    LocalFileHandlerTestBase.writeTestData(writeHandler2, 3, 32, expectedData, expectedBlockIds2);
+    LocalFileHandlerTestBase.writeTestData(writeHandler2, 3, 32, expectedData, expectedBlockIds2);
+    LocalFileHandlerTestBase.writeTestData(writeHandler2, 2, 32, expectedData, expectedBlockIds2);
+    LocalFileHandlerTestBase.writeTestData(writeHandler2, 1, 32, expectedData, expectedBlockIds2);
 
     RssBaseConf conf = new RssBaseConf();
     conf.setString("rss.storage.basePath", dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath());
@@ -93,21 +80,22 @@ public class LocalFileHandlerTest {
     LocalFileServerReadHandler readHandler2 = new LocalFileServerReadHandler(
         "appId", 0, 2, 1, 10, dataDir1.getAbsolutePath());
 
-    validateResult(readHandler1, expectedBlockIds1, expectedData);
-    validateResult(readHandler2, expectedBlockIds2, expectedData);
+    LocalFileHandlerTestBase.validateResult(readHandler1, expectedBlockIds1, expectedData);
+    LocalFileHandlerTestBase.validateResult(readHandler2, expectedBlockIds2, expectedData);
 
     // after first read, write more data
-    writeTestData(writeHandler1, 1, 32, expectedData, expectedBlockIds1);
+    LocalFileHandlerTestBase.writeTestData(writeHandler1, 1, 32, expectedData, expectedBlockIds1);
     // new data should be read
-    validateResult(readHandler1, expectedBlockIds1, expectedData);
+    LocalFileHandlerTestBase.validateResult(readHandler1, expectedBlockIds1, expectedData);
 
-    File targetDataFile = new File(possiblePath1, "pre.data");
-    ShuffleIndexResult shuffleIndexResult = readIndex(readHandler1);
+    ShuffleIndexResult shuffleIndexResult = LocalFileHandlerTestBase.readIndex(readHandler1);
     assertFalse(shuffleIndexResult.isEmpty());
-    List<ShuffleDataResult> shuffleDataResults = readData(readHandler1, shuffleIndexResult);
+    assertEquals(352, shuffleIndexResult.getDataFileLen());
+    List<ShuffleDataResult> shuffleDataResults = LocalFileHandlerTestBase.readData(readHandler1, shuffleIndexResult);
     assertFalse(shuffleDataResults.isEmpty());
+    File targetDataFile = new File(possiblePath1, "pre.data");
     targetDataFile.delete();
-    shuffleDataResults = readData(readHandler1, shuffleIndexResult);
+    shuffleDataResults = LocalFileHandlerTestBase.readData(readHandler1, shuffleIndexResult);
     for (ShuffleDataResult shuffleData : shuffleDataResults) {
       assertEquals(0, shuffleData.getData().length);
       assertTrue(shuffleData.isEmpty());
@@ -129,71 +117,9 @@ public class LocalFileHandlerTest {
     assertEquals(writer.nextOffset(), totalSize);
   }
 
+  @Test
+  public void testReadIndex() {
 
-  private void writeTestData(
-      ShuffleWriteHandler writeHandler,
-      int num, int length,
-      Map<Long, byte[]> expectedData,
-      Set<Long> expectedBlockIds) throws Exception {
-    List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
-    for (int i = 0; i < num; i++) {
-      byte[] buf = new byte[length];
-      new Random().nextBytes(buf);
-      long blockId = ATOMIC_LONG.incrementAndGet();
-      blocks.add(new ShufflePartitionedBlock(length, length, ChecksumUtils.getCrc32(buf), blockId, 100,
-          buf));
-      expectedData.put(blockId, buf);
-      expectedBlockIds.add(blockId);
-    }
-    writeHandler.write(blocks);
-  }
-
-  protected void validateResult(ServerReadHandler readHandler, Set<Long> expectedBlockIds,
-      Map<Long, byte[]> expectedData) {
-    List<ShuffleDataResult> shuffleDataResults = readAll(readHandler);
-    Set<Long> actualBlockIds = Sets.newHashSet();
-    for (ShuffleDataResult sdr : shuffleDataResults) {
-      byte[] buffer = sdr.getData();
-      List<BufferSegment> bufferSegments = sdr.getBufferSegments();
-
-      for (BufferSegment bs : bufferSegments) {
-        byte[] data = new byte[bs.getLength()];
-        System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength());
-        assertEquals(bs.getCrc(), ChecksumUtils.getCrc32(data));
-        assertArrayEquals(expectedData.get(bs.getBlockId()), data);
-        actualBlockIds.add(bs.getBlockId());
-      }
-    }
-    assertEquals(expectedBlockIds, actualBlockIds);
-  }
-
-  private List<ShuffleDataResult> readAll(ServerReadHandler readHandler) {
-    ShuffleIndexResult shuffleIndexResult = readIndex(readHandler);
-    return readData(readHandler, shuffleIndexResult);
-  }
-
-  private ShuffleIndexResult readIndex(ServerReadHandler readHandler) {
-    ShuffleIndexResult shuffleIndexResult = readHandler.getShuffleIndex();
-    return shuffleIndexResult;
-  }
-
-  private List<ShuffleDataResult> readData(ServerReadHandler readHandler, ShuffleIndexResult shuffleIndexResult) {
-    List<ShuffleDataResult> shuffleDataResults = Lists.newLinkedList();
-    if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
-      return shuffleDataResults;
-    }
-
-    List<ShuffleDataSegment> shuffleDataSegments =
-        RssUtils.transIndexDataToSegments(shuffleIndexResult, 32);
-
-    for (ShuffleDataSegment shuffleDataSegment : shuffleDataSegments) {
-      byte[] shuffleData =
-          readHandler.getShuffleData(shuffleDataSegment.getOffset(), shuffleDataSegment.getLength()).getData();
-      ShuffleDataResult sdr = new ShuffleDataResult(shuffleData, shuffleDataSegment.getBufferSegments());
-      shuffleDataResults.add(sdr);
-    }
-
-    return shuffleDataResults;
   }
 
 }
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
new file mode 100644
index 00000000..ae7f0541
--- /dev/null
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleDataSegment;
+import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
+import org.apache.uniffle.storage.handler.api.ServerReadHandler;
+import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class LocalFileHandlerTestBase {
+  private static AtomicLong ATOMIC_LONG = new AtomicLong(0L);
+
+  public static void writeTestData(
+      ShuffleWriteHandler writeHandler,
+      int num, int length,
+      Map<Long, byte[]> expectedData,
+      Set<Long> expectedBlockIds) throws Exception {
+    List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
+    for (int i = 0; i < num; i++) {
+      byte[] buf = new byte[length];
+      new Random().nextBytes(buf);
+      long blockId = ATOMIC_LONG.incrementAndGet();
+      blocks.add(new ShufflePartitionedBlock(length, length, ChecksumUtils.getCrc32(buf), blockId, 100,
+          buf));
+      expectedData.put(blockId, buf);
+      expectedBlockIds.add(blockId);
+    }
+    writeHandler.write(blocks);
+  }
+
+  public static void validateResult(ServerReadHandler readHandler, Set<Long> expectedBlockIds,
+      Map<Long, byte[]> expectedData) {
+    List<ShuffleDataResult> shuffleDataResults = readAll(readHandler);
+    Set<Long> actualBlockIds = Sets.newHashSet();
+    for (ShuffleDataResult sdr : shuffleDataResults) {
+      byte[] buffer = sdr.getData();
+      List<BufferSegment> bufferSegments = sdr.getBufferSegments();
+
+      for (BufferSegment bs : bufferSegments) {
+        byte[] data = new byte[bs.getLength()];
+        System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength());
+        assertEquals(bs.getCrc(), ChecksumUtils.getCrc32(data));
+        assertArrayEquals(expectedData.get(bs.getBlockId()), data);
+        actualBlockIds.add(bs.getBlockId());
+      }
+    }
+    assertEquals(expectedBlockIds, actualBlockIds);
+  }
+
+  public static List<ShuffleDataResult> readAll(ServerReadHandler readHandler) {
+    ShuffleIndexResult shuffleIndexResult = readIndex(readHandler);
+    return readData(readHandler, shuffleIndexResult);
+  }
+
+  public static ShuffleIndexResult readIndex(ServerReadHandler readHandler) {
+    ShuffleIndexResult shuffleIndexResult = readHandler.getShuffleIndex();
+    return shuffleIndexResult;
+  }
+
+  public static List<ShuffleDataResult> readData(ServerReadHandler readHandler, ShuffleIndexResult shuffleIndexResult) {
+    List<ShuffleDataResult> shuffleDataResults = Lists.newLinkedList();
+    if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
+      return shuffleDataResults;
+    }
+
+    List<ShuffleDataSegment> shuffleDataSegments =
+        RssUtils.transIndexDataToSegments(shuffleIndexResult, 32);
+
+    for (ShuffleDataSegment shuffleDataSegment : shuffleDataSegments) {
+      byte[] shuffleData =
+          readHandler.getShuffleData(shuffleDataSegment.getOffset(), shuffleDataSegment.getLength()).getData();
+      ShuffleDataResult sdr = new ShuffleDataResult(shuffleData, shuffleDataSegment.getBufferSegments());
+      shuffleDataResults.add(sdr);
+    }
+
+    return shuffleDataResults;
+  }
+
+  public static void checkData(ShuffleDataResult shuffleDataResult, Map<Long, byte[]> expectedData) {
+    byte[] buffer = shuffleDataResult.getData();
+    List<BufferSegment> bufferSegments = shuffleDataResult.getBufferSegments();
+
+    for (BufferSegment bs : bufferSegments) {
+      byte[] data = new byte[bs.getLength()];
+      System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength());
+      assertEquals(bs.getCrc(), ChecksumUtils.getCrc32(data));
+      assertArrayEquals(expectedData.get(bs.getBlockId()), data);
+    }
+  }
+
+  public static void writeIndex(ByteBuffer byteBuffer, FileBasedShuffleSegment segment) {
+    byteBuffer.putLong(segment.getOffset());
+    byteBuffer.putInt(segment.getLength());
+    byteBuffer.putInt(segment.getUncompressLength());
+    byteBuffer.putLong(segment.getCrc());
+    byteBuffer.putLong(segment.getBlockId());
+    byteBuffer.putLong(segment.getTaskAttemptId());
+  }
+
+  public static List<byte[]> calcSegmentBytes(Map<Long, byte[]> blockIdToData, int bytesPerSegment, int blockNum) {
+    List<byte[]> res = Lists.newArrayList();
+    int curSize = 0;
+    ByteBuffer byteBuffer = ByteBuffer.allocate(2 * bytesPerSegment);
+    for (long i = 1; i <= blockNum; i++) {
+      byte[] data = blockIdToData.get(i);
+      byteBuffer.put(data, 0, data.length);
+      curSize += data.length;
+      if (curSize >= bytesPerSegment) {
+        byte[] newByte = new byte[curSize];
+        System.arraycopy(byteBuffer.array(), 0, newByte, 0, curSize);
+        res.add(newByte);
+        byteBuffer.clear();
+        curSize = 0;
+      }
+    }
+    if (curSize > 0) {
+      byte[] newByte = new byte[curSize];
+      System.arraycopy(byteBuffer.array(), 0, newByte, 0, curSize);
+      res.add(newByte);
+    }
+    return res;
+  }
+}
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
new file mode 100644
index 00000000..d25f44f4
--- /dev/null
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import org.apache.uniffle.client.api.ShuffleServerClient;
+import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
+import org.apache.uniffle.client.response.ResponseStatusCode;
+import org.apache.uniffle.client.response.RssGetShuffleDataResponse;
+import org.apache.uniffle.client.response.RssGetShuffleIndexResponse;
+import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class LocalFileServerReadHandlerTest {
+  @Test
+  public void testDataInconsistent() throws Exception {
+    Map<Long, byte[]> expectedData = Maps.newHashMap();
+    int expectTotalBlockNum = 4;
+    int blockSize = 7;
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(expectTotalBlockNum * 40);
+    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+
+    // We simulate the generation of 4 block index files and 3 block data files to test LocalFileClientReadHandler
+    LocalFileHandlerTestBase.writeTestData(shuffleBlocks -> {
+      int offset = 0;
+      for (ShufflePartitionedBlock block : shuffleBlocks) {
+        FileBasedShuffleSegment segment = new FileBasedShuffleSegment(
+            block.getBlockId(), offset, block.getLength(), block.getUncompressLength(),
+            block.getCrc(), block.getTaskAttemptId());
+        offset += block.getLength();
+        LocalFileHandlerTestBase.writeIndex(byteBuffer, segment);
+      }
+    }, expectTotalBlockNum, blockSize,
+        expectedData, new HashSet<>());
+    expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
+
+    String appId = "app1";
+    int shuffleId = 1;
+    int partitionId = 1;
+    ShuffleServerClient mockShuffleServerClient = Mockito.mock(ShuffleServerClient.class);
+
+    int actualWriteDataBlock = expectTotalBlockNum - 1;
+    int actualFileLen = blockSize * actualWriteDataBlock;
+    RssGetShuffleIndexResponse response = new RssGetShuffleIndexResponse(ResponseStatusCode.SUCCESS,
+        byteBuffer.array(), actualFileLen);
+    Mockito.doReturn(response).when(mockShuffleServerClient).getShuffleIndex(Mockito.any());
+
+    int readBufferSize = 13;
+    int bytesPerSegment = ((readBufferSize / blockSize) + 1) * blockSize;
+    List<byte[]> segments = LocalFileHandlerTestBase.calcSegmentBytes(expectedData,
+        bytesPerSegment, actualWriteDataBlock);
+
+    // first segment include 2 blocks
+    ArgumentMatcher<RssGetShuffleDataRequest> segment1Match =
+        (request) -> request.getOffset() == 0 && request.getLength() == bytesPerSegment;
+    // second segment include 1 block
+    ArgumentMatcher<RssGetShuffleDataRequest> segment2Match =
+        (request) -> request.getOffset() == bytesPerSegment && request.getLength() == blockSize;
+    RssGetShuffleDataResponse segment1Response =
+        new RssGetShuffleDataResponse(ResponseStatusCode.SUCCESS, segments.get(0));
+    RssGetShuffleDataResponse segment2Response =
+        new RssGetShuffleDataResponse(ResponseStatusCode.SUCCESS, segments.get(1));
+
+    Mockito.doReturn(segment1Response).when(mockShuffleServerClient).getShuffleData(Mockito.argThat(segment1Match));
+    Mockito.doReturn(segment2Response).when(mockShuffleServerClient).getShuffleData(Mockito.argThat(segment2Match));
+
+    Roaring64NavigableMap processBlockIds =  Roaring64NavigableMap.bitmapOf();
+    LocalFileClientReadHandler handler = new LocalFileClientReadHandler(appId, partitionId, shuffleId, -1, 1, 1,
+        readBufferSize, expectBlockIds, processBlockIds, mockShuffleServerClient);
+    int totalSegment = ((blockSize * actualWriteDataBlock) / bytesPerSegment) + 1;
+    int readBlocks = 0;
+    for (int i = 0; i < totalSegment; i++) {
+      ShuffleDataResult result = handler.readShuffleData();
+      LocalFileHandlerTestBase.checkData(result, expectedData);
+      readBlocks += result.getBufferSegments().size();
+    }
+    assertEquals(actualWriteDataBlock, readBlocks);
+  }
+
+}