You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/11/18 06:03:21 UTC

[incubator-uniffle] branch master updated: [Improvement][LocalOrder] Add tests about keeping consistent with FixedSize when no skew optimization (#336)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 1aa3661f [Improvement][LocalOrder] Add tests about keeping consistent with FixedSize when no skew optimization (#336)
1aa3661f is described below

commit 1aa3661fbcd5ff26a13f79a05f9f10b47d2c6eab
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Fri Nov 18 14:03:15 2022 +0800

    [Improvement][LocalOrder] Add tests about keeping consistent with FixedSize when no skew optimization (#336)
    
    ### What changes were proposed in this pull request?
    1. Introduce the data length check in LocalOrder strategy
    2. Add tests about keeping consistent with FixedSize Strategy when no skew optimization
    
    ### Why are the changes needed?
    More stability
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UTs
---
 .../common/segment/FixedSizeSegmentSplitter.java   |  5 +-
 .../common/segment/LocalOrderSegmentSplitter.java  |  9 +++-
 .../segment/LocalOrderSegmentSplitterTest.java     | 62 ++++++++++++++++++++++
 3 files changed, 73 insertions(+), 3 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
index e42f10f1..9c1f0d23 100644
--- a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
+++ b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
@@ -82,8 +82,9 @@ public class FixedSizeSegmentSplitter implements SegmentSplitter {
         // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException
         if (dataFileLen != -1 && totalLength > dataFileLen) {
           long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
-          LOGGER.warn("Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
-                  + "the real data file length: {}(bytes). Partition id: {}. This should not happen.",
+          LOGGER.info("Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
+                  + "the real data file length: {}(bytes). Partition id: {}. "
+                  + "This may happen when the data is flushing, please ignore.",
               totalLength, dataFileLen, Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask));
           break;
         }
diff --git a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
index fd5f9091..97a639b3 100644
--- a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
+++ b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
@@ -32,6 +32,7 @@ import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.Constants;
 
 /**
  * {@class LocalOrderSegmentSplitter} will be initialized only when the {@class ShuffleDataDistributionType}
@@ -97,6 +98,7 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
         long blockId = byteBuffer.getLong();
         long taskAttemptId = byteBuffer.getLong();
 
+        totalLen += length;
         indexTaskIds.add(taskAttemptId);
 
         if (lastTaskAttemptId == -1) {
@@ -105,7 +107,12 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
 
         // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater
         // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException
-        if (dataFileLen != -1 && totalLen >= dataFileLen) {
+        if (dataFileLen != -1 && totalLen > dataFileLen) {
+          long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
+          LOGGER.info("Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
+                  + "the real data file length: {}(bytes). Partition id: {}. This should not happen. "
+                  + "This may happen when the data is flushing, please ignore.",
+              totalLen, dataFileLen, Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask));
           break;
         }
 
diff --git a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
index 77b3e2cf..a5fd016c 100644
--- a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
@@ -22,6 +22,8 @@ import java.util.List;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.BufferSegment;
@@ -34,6 +36,66 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 public class LocalOrderSegmentSplitterTest {
 
+  /**
+   * When no spark skew optimization and LOCAL_ORDER is enabled,
+   * the LOCAL_ORDER split segments should be consistent with the
+   * NORMAL
+   */
+  @ParameterizedTest()
+  @ValueSource(ints = {-1, 32, 33, 48, 49, 57})
+  public void testConsistentWithFixSizeSplitterWhenNoSkew(int realDataLength) {
+    Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1, 2, 3, 4, 5, 6);
+    byte[] data = generateData(
+        Pair.of(8, 6),
+        Pair.of(8, 5),
+        Pair.of(8, 4),
+        Pair.of(8, 3),
+        Pair.of(8, 1),
+        Pair.of(8, 2),
+        Pair.of(8, 3),
+        Pair.of(8, 2),
+        Pair.of(8, 3),
+        Pair.of(8, 3),
+        Pair.of(8, 4),
+        Pair.of(8, 5)
+    );
+    List<ShuffleDataSegment> dataSegments1 = new LocalOrderSegmentSplitter(taskIds, 32)
+        .split(new ShuffleIndexResult(data, realDataLength));
+
+    List<ShuffleDataSegment> dataSegments2 = new FixedSizeSegmentSplitter(32)
+        .split(new ShuffleIndexResult(data, realDataLength));
+
+    checkConsistency(dataSegments1, dataSegments2);
+  }
+
+  private void checkConsistency(List<ShuffleDataSegment> dataSegments1, List<ShuffleDataSegment> dataSegments2) {
+    assertEquals(dataSegments1.size(), dataSegments2.size());
+
+    for (int i = 0; i < dataSegments1.size(); i++) {
+      ShuffleDataSegment segment1 = dataSegments1.get(i);
+      ShuffleDataSegment segment2 = dataSegments2.get(i);
+
+      assertEquals(segment1.getLength(), segment2.getLength());
+      assertEquals(segment1.getOffset(), segment2.getOffset());
+
+      List<BufferSegment> bufferSegments1 = segment1.getBufferSegments();
+      List<BufferSegment> bufferSegments2 = segment2.getBufferSegments();
+
+      assertEquals(bufferSegments1.size(), bufferSegments2.size());
+
+      for (int j = 0; j < bufferSegments1.size(); j++) {
+        BufferSegment bs1 = bufferSegments1.get(j);
+        BufferSegment bs2 = bufferSegments2.get(j);
+        assertEquals(bs1.getLength(), bs2.getLength());
+        assertEquals(bs1.getOffset(), bs2.getOffset());
+        assertEquals(bs1.getBlockId(), bs2.getBlockId());
+        assertEquals(bs1.getCrc(), bs2.getCrc());
+        assertEquals(bs1.getUncompressLength(), bs2.getUncompressLength());
+        assertEquals(bs1.getTaskAttemptId(), bs2.getTaskAttemptId());
+      }
+    }
+  }
+
   @Test
   public void testSplitWithDiscontinuousBlocksShouldThrowException() {
     Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1, 2, 4);