You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/11/24 06:54:06 UTC

[incubator-uniffle] branch master updated: [BUG][AQE][LocalOrder] Remove check of discontinuous map task ids (#354)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 91eef084 [BUG][AQE][LocalOrder] Remove check of discontinuous map task ids (#354)
91eef084 is described below

commit 91eef08476319ca35e22839d042acc8773a8eff3
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Thu Nov 24 14:54:00 2022 +0800

    [BUG][AQE][LocalOrder] Remove check of discontinuous map task ids (#354)
    
    ### What changes were proposed in this pull request?
    Remove check of discontinuous map task ids
    
    ### Why are the changes needed?
    In the [startMapIndex, endMapIndex) sequence, the index is not the map task id, which only indicate the range of the succeed map task ids set.
    
    So that means. if the range is [0, 5), it only indicate the number of tasks is 5. And the task id list may be as follows:
    
    `[task-1, task-3, task-9, task-15, task-16]`
    
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    1. UTs
---
 .../common/segment/LocalOrderSegmentSplitter.java  |  42 ++++---
 .../segment/LocalOrderSegmentSplitterTest.java     | 121 +++++++++++++++++----
 2 files changed, 121 insertions(+), 42 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
index 97a639b3..d3f912f7 100644
--- a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
+++ b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
@@ -42,7 +42,7 @@ import org.apache.uniffle.common.util.Constants;
  *
  * This strategy will be useful for Spark AQE skew optimization, it will split the single partition into
  * multiple shuffle readers, and each one will fetch partial single partition data which is in the range of
- * [StartMapId, endMapId). And so if one reader uses this, it will skip lots of unnecessary blocks.
+ * [StartMapIndex, endMapIndex). And so if one reader uses this, it will skip lots of unnecessary blocks.
  *
  * Last but not least, this split strategy depends on LOCAL_ORDER of index file, which must be guaranteed by
  * the shuffle server.
@@ -75,7 +75,6 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
     long fileOffset = -1;
     long totalLen = 0;
 
-    long lastTaskAttemptId = -1;
     long lastExpectedBlockIndex = -1;
 
     List<Long> indexTaskIds = new ArrayList<>();
@@ -83,10 +82,11 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
     /**
      * One ShuffleDataSegment should meet following requirements:
      *
-     * 1. taskId in [startMapId, endMapId) taskIds bitmap
+     * 1. taskId in [startMapIndex, endMapIndex) taskIds bitmap.
+     *    Attention: the index in the range is not the map task id, which means the required task ids are not
+     *               continuous.
      * 2. ShuffleDataSegment size should < readBufferSize
-     * 3. ShuffleDataSegment's blocks should be continuous
-     *
+     * 3. Single shuffleDataSegment's blocks should be continuous
      */
     int index = 0;
     while (byteBuffer.hasRemaining()) {
@@ -101,10 +101,6 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
         totalLen += length;
         indexTaskIds.add(taskAttemptId);
 
-        if (lastTaskAttemptId == -1) {
-          lastTaskAttemptId = taskAttemptId;
-        }
-
         // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater
         // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException
         if (dataFileLen != -1 && totalLen > dataFileLen) {
@@ -116,10 +112,15 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
           break;
         }
 
-        if ((taskAttemptId < lastTaskAttemptId
-            && bufferSegments.size() > 0
-            && (expectTaskIds.contains(taskAttemptId) ? index - lastExpectedBlockIndex != 1 : true))
-            || bufferOffset >= readBufferSize) {
+        boolean conditionOfDiscontinuousBlocks =
+            lastExpectedBlockIndex != -1
+                && bufferSegments.size() > 0
+                && expectTaskIds.contains(taskAttemptId)
+                && index - lastExpectedBlockIndex != 1;
+
+        boolean conditionOfLimitedBufferSize = bufferOffset >= readBufferSize;
+
+        if (conditionOfDiscontinuousBlocks || conditionOfLimitedBufferSize) {
           ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
           dataFileSegments.add(sds);
           bufferSegments = Lists.newArrayList();
@@ -128,14 +129,6 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
         }
 
         if (expectTaskIds.contains(taskAttemptId)) {
-          if (bufferOffset != 0 && index - lastExpectedBlockIndex > 1) {
-            List<Long> expectedTaskIds = getExpectedTaskIds(expectTaskIds);
-            LOGGER.error("There are discontinuous blocks, all task ids in index file: {}, all expected task ids: {}, "
-                    + "current expected task id: {}, last unexpected task id: {}, current data segment size: {}",
-                indexTaskIds, expectedTaskIds, taskAttemptId, lastTaskAttemptId, dataFileSegments.size());
-            throw new RssException("There are discontinuous blocks which should not happen when using LOCAL_ORDER.");
-          }
-
           if (fileOffset == -1) {
             fileOffset = offset;
           }
@@ -143,8 +136,6 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
           bufferOffset += length;
           lastExpectedBlockIndex = index;
         }
-
-        lastTaskAttemptId = taskAttemptId;
         index++;
       } catch (BufferUnderflowException ue) {
         throw new RssException("Read index data under flow", ue);
@@ -155,6 +146,11 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
       ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
       dataFileSegments.add(sds);
     }
+
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Index file task-ids sequence: {}, expected task-ids: {}",
+          indexTaskIds, getExpectedTaskIds(expectTaskIds));
+    }
     return dataFileSegments;
   }
 
diff --git a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
index a5fd016c..b93ab23b 100644
--- a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
@@ -32,10 +32,111 @@ import org.apache.uniffle.common.ShuffleIndexResult;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
 public class LocalOrderSegmentSplitterTest {
 
+  @Test
+  public void testDiscontinuousMapTaskIds() {
+    // case1
+    Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(6, 7, 9);
+    byte[] data = generateData(
+        Pair.of(8, 4),
+        Pair.of(8, 5),
+        Pair.of(8, 6),
+        Pair.of(8, 7),
+        Pair.of(8, 8),
+        Pair.of(8, 9),
+        Pair.of(8, 6),
+        Pair.of(8, 9)
+    );
+    List<ShuffleDataSegment> dataSegments1 = new LocalOrderSegmentSplitter(taskIds, 1000)
+        .split(new ShuffleIndexResult(data, -1));
+
+    assertEquals(2, dataSegments1.size());
+    assertEquals(16, dataSegments1.get(0).getOffset());
+    assertEquals(16, dataSegments1.get(0).getLength());
+    assertEquals(40, dataSegments1.get(1).getOffset());
+    assertEquals(24, dataSegments1.get(1).getLength());
+
+    assertEquals(2, dataSegments1.get(0).getBufferSegments().size());
+    assertEquals(3, dataSegments1.get(1).getBufferSegments().size());
+
+    BufferSegment bufferSegment = dataSegments1.get(0).getBufferSegments().get(0);
+    assertEquals(0, bufferSegment.getOffset());
+    assertEquals(8, bufferSegment.getLength());
+    bufferSegment = dataSegments1.get(0).getBufferSegments().get(1);
+    assertEquals(8, bufferSegment.getOffset());
+    assertEquals(8, bufferSegment.getLength());
+
+    bufferSegment = dataSegments1.get(1).getBufferSegments().get(0);
+    assertEquals(0, bufferSegment.getOffset());
+    assertEquals(8, bufferSegment.getLength());
+    bufferSegment = dataSegments1.get(1).getBufferSegments().get(1);
+    assertEquals(8, bufferSegment.getOffset());
+    assertEquals(8, bufferSegment.getLength());
+    bufferSegment = dataSegments1.get(1).getBufferSegments().get(2);
+    assertEquals(16, bufferSegment.getOffset());
+    assertEquals(8, bufferSegment.getLength());
+
+    // case2
+    taskIds = Roaring64NavigableMap.bitmapOf(1, 2, 4);
+    data = generateData(
+        Pair.of(1, 1),
+        Pair.of(1, 2),
+        Pair.of(1, 3),
+        Pair.of(1, 4)
+    );
+    List<ShuffleDataSegment> dataSegments2 =
+        new LocalOrderSegmentSplitter(taskIds, 32).split(new ShuffleIndexResult(data, -1));
+    assertEquals(2, dataSegments2.size());
+    assertEquals(0, dataSegments2.get(0).getOffset());
+    assertEquals(2, dataSegments2.get(0).getLength());
+    assertEquals(2, dataSegments2.get(0).getBufferSegments().size());
+
+    assertEquals(3, dataSegments2.get(1).getOffset());
+    assertEquals(1, dataSegments2.get(1).getLength());
+    assertEquals(1, dataSegments2.get(1).getBufferSegments().size());
+
+    // case3
+    taskIds = Roaring64NavigableMap.bitmapOf(1, 2, 4);
+    data = generateData(
+        Pair.of(1, 1),
+        Pair.of(1, 2),
+        Pair.of(1, 1),
+        Pair.of(1, 1),
+        Pair.of(1, 3),
+        Pair.of(1, 4),
+        Pair.of(1, 1)
+    );
+    List<ShuffleDataSegment> dataSegments3 =
+        new LocalOrderSegmentSplitter(taskIds, 3).split(new ShuffleIndexResult(data, -1));
+    assertEquals(3, dataSegments3.size());
+    assertEquals(0, dataSegments3.get(0).getOffset());
+    assertEquals(3, dataSegments3.get(0).getLength());
+    assertEquals(3, dataSegments3.get(0).getBufferSegments().size());
+
+    assertEquals(3, dataSegments3.get(1).getOffset());
+    assertEquals(1, dataSegments3.get(1).getLength());
+    assertEquals(1, dataSegments3.get(1).getBufferSegments().size());
+
+    assertEquals(5, dataSegments3.get(2).getOffset());
+    assertEquals(2, dataSegments3.get(2).getLength());
+    assertEquals(2, dataSegments3.get(2).getBufferSegments().size());
+
+    // case4
+    taskIds = Roaring64NavigableMap.bitmapOf(1, 3);
+    data = generateData(
+        Pair.of(1, 1),
+        Pair.of(1, 2),
+        Pair.of(1, 3),
+        Pair.of(1, 2),
+        Pair.of(1, 4)
+    );
+    List<ShuffleDataSegment> dataSegments4 =
+        new LocalOrderSegmentSplitter(taskIds, 3).split(new ShuffleIndexResult(data, -1));
+    assertEquals(2, dataSegments4.size());
+  }
+
   /**
    * When no spark skew optimization and LOCAL_ORDER is enabled,
    * the LOCAL_ORDER split segments should be consistent with the
@@ -96,24 +197,6 @@ public class LocalOrderSegmentSplitterTest {
     }
   }
 
-  @Test
-  public void testSplitWithDiscontinuousBlocksShouldThrowException() {
-    Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1, 2, 4);
-    LocalOrderSegmentSplitter splitter = new LocalOrderSegmentSplitter(taskIds, 32);
-    byte[] data = generateData(
-        Pair.of(1, 1),
-        Pair.of(1, 2),
-        Pair.of(1, 3),
-        Pair.of(1, 4)
-    );
-    try {
-      splitter.split(new ShuffleIndexResult(data, -1));
-      fail();
-    } catch (Exception e) {
-      // ignore
-    }
-  }
-
   @Test
   public void testSplitForMergeContinuousSegments() {
     /**