You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/11/15 03:32:10 UTC

[incubator-uniffle] branch master updated: [Bug] Fix potenial bug when the index reading offset is greater than data length (#320)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new eae2621b [Bug] Fix potenial bug when the index reading offset is greater than data length (#320)
eae2621b is described below

commit eae2621b33a51013348a3060ed42e9eb1388c59b
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Tue Nov 15 11:32:04 2022 +0800

    [Bug] Fix potenial bug when the index reading offset is greater than data length (#320)
    
    ### What changes were proposed in this pull request?
    Fix potenial bug when the index reading offset equals to data length
    
    ### Why are the changes needed?
    If the reading length == data length in `FixedSizeSegmentSplitter`, it should involve this block. And if its length > data length. it should abort it.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UTs
---
 .../common/segment/FixedSizeSegmentSplitter.java   | 28 +++++++++++++++-------
 .../segment/FixedSizeSegmentSplitterTest.java      | 18 ++++++++++++++
 2 files changed, 37 insertions(+), 9 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
index 79e59b62..e42f10f1 100644
--- a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
+++ b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
@@ -22,13 +22,17 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.Constants;
 
 public class FixedSizeSegmentSplitter implements SegmentSplitter {
+  private static final Logger LOGGER = LoggerFactory.getLogger(FixedSizeSegmentSplitter.class);
 
   private int readBufferSize;
 
@@ -58,12 +62,13 @@ public class FixedSizeSegmentSplitter implements SegmentSplitter {
 
     while (byteBuffer.hasRemaining()) {
       try {
-        long offset = byteBuffer.getLong();
-        int length = byteBuffer.getInt();
-        int uncompressLength = byteBuffer.getInt();
-        long crc = byteBuffer.getLong();
-        long blockId = byteBuffer.getLong();
-        long taskAttemptId = byteBuffer.getLong();
+        final long offset = byteBuffer.getLong();
+        final int length = byteBuffer.getInt();
+        final int uncompressLength = byteBuffer.getInt();
+        final long crc = byteBuffer.getLong();
+        final long blockId = byteBuffer.getLong();
+        final long taskAttemptId = byteBuffer.getLong();
+
         // The index file is written, read and parsed sequentially, so these parsed index segments
         // index a continuous shuffle data in the corresponding data file and the first segment's
         // offset field is the offset of these shuffle data in the data file.
@@ -71,16 +76,21 @@ public class FixedSizeSegmentSplitter implements SegmentSplitter {
           fileOffset = offset;
         }
 
-        bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
-        bufferOffset += length;
         totalLength += length;
 
         // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater
         // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException
-        if (dataFileLen != -1 && totalLength >= dataFileLen) {
+        if (dataFileLen != -1 && totalLength > dataFileLen) {
+          long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
+          LOGGER.warn("Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
+                  + "the real data file length: {}(bytes). Partition id: {}. This should not happen.",
+              totalLength, dataFileLen, Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask));
           break;
         }
 
+        bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
+        bufferOffset += length;
+
         if (bufferOffset >= readBufferSize) {
           ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
           dataFileSegments.add(sds);
diff --git a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java
index 9282b801..5655288c 100644
--- a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java
@@ -22,6 +22,8 @@ import java.util.List;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
@@ -33,6 +35,22 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 public class FixedSizeSegmentSplitterTest {
 
+  @ParameterizedTest
+  @ValueSource(ints = {48, 49, 57})
+  public void testAvoidEOFException(int dataLength) {
+    SegmentSplitter splitter = new FixedSizeSegmentSplitter(1000);
+    byte[] data = generateData(
+        Pair.of(32, 0),
+        Pair.of(16, 0),
+        Pair.of(10, 0)
+    );
+
+    List<ShuffleDataSegment> shuffleDataSegments = splitter.split(new ShuffleIndexResult(data, dataLength));
+    assertEquals(1, shuffleDataSegments.size());
+    assertEquals(0, shuffleDataSegments.get(0).getOffset());
+    assertEquals(48, shuffleDataSegments.get(0).getLength());
+  }
+
   @Test
   public void testSplit() {
     SegmentSplitter splitter = new FixedSizeSegmentSplitter(100);