You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/11/24 06:54:06 UTC
[incubator-uniffle] branch master updated: [BUG][AQE][LocalOrder] Remove check of discontinuous map task ids (#354)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 91eef084 [BUG][AQE][LocalOrder] Remove check of discontinuous map task ids (#354)
91eef084 is described below
commit 91eef08476319ca35e22839d042acc8773a8eff3
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Thu Nov 24 14:54:00 2022 +0800
[BUG][AQE][LocalOrder] Remove check of discontinuous map task ids (#354)
### What changes were proposed in this pull request?
Remove check of discontinuous map task ids
### Why are the changes needed?
In the [startMapIndex, endMapIndex) sequence, the index is not the map task id, which only indicate the range of the succeed map task ids set.
So that means. if the range is [0, 5), it only indicate the number of tasks is 5. And the task id list may be as follows:
`[task-1, task-3, task-9, task-15, task-16]`
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
1. UTs
---
.../common/segment/LocalOrderSegmentSplitter.java | 42 ++++---
.../segment/LocalOrderSegmentSplitterTest.java | 121 +++++++++++++++++----
2 files changed, 121 insertions(+), 42 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
index 97a639b3..d3f912f7 100644
--- a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
+++ b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
@@ -42,7 +42,7 @@ import org.apache.uniffle.common.util.Constants;
*
* This strategy will be useful for Spark AQE skew optimization, it will split the single partition into
* multiple shuffle readers, and each one will fetch partial single partition data which is in the range of
- * [StartMapId, endMapId). And so if one reader uses this, it will skip lots of unnecessary blocks.
+ * [StartMapIndex, endMapIndex). And so if one reader uses this, it will skip lots of unnecessary blocks.
*
* Last but not least, this split strategy depends on LOCAL_ORDER of index file, which must be guaranteed by
* the shuffle server.
@@ -75,7 +75,6 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
long fileOffset = -1;
long totalLen = 0;
- long lastTaskAttemptId = -1;
long lastExpectedBlockIndex = -1;
List<Long> indexTaskIds = new ArrayList<>();
@@ -83,10 +82,11 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
/**
* One ShuffleDataSegment should meet following requirements:
*
- * 1. taskId in [startMapId, endMapId) taskIds bitmap
+ * 1. taskId in [startMapIndex, endMapIndex) taskIds bitmap.
+ * Attention: the index in the range is not the map task id, which means the required task ids are not
+ * continuous.
* 2. ShuffleDataSegment size should < readBufferSize
- * 3. ShuffleDataSegment's blocks should be continuous
- *
+ * 3. Single shuffleDataSegment's blocks should be continuous
*/
int index = 0;
while (byteBuffer.hasRemaining()) {
@@ -101,10 +101,6 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
totalLen += length;
indexTaskIds.add(taskAttemptId);
- if (lastTaskAttemptId == -1) {
- lastTaskAttemptId = taskAttemptId;
- }
-
// If ShuffleServer is flushing the file at this time, the length in the index file record may be greater
// than the length in the actual data file, and it needs to be returned at this time to avoid EOFException
if (dataFileLen != -1 && totalLen > dataFileLen) {
@@ -116,10 +112,15 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
break;
}
- if ((taskAttemptId < lastTaskAttemptId
- && bufferSegments.size() > 0
- && (expectTaskIds.contains(taskAttemptId) ? index - lastExpectedBlockIndex != 1 : true))
- || bufferOffset >= readBufferSize) {
+ boolean conditionOfDiscontinuousBlocks =
+ lastExpectedBlockIndex != -1
+ && bufferSegments.size() > 0
+ && expectTaskIds.contains(taskAttemptId)
+ && index - lastExpectedBlockIndex != 1;
+
+ boolean conditionOfLimitedBufferSize = bufferOffset >= readBufferSize;
+
+ if (conditionOfDiscontinuousBlocks || conditionOfLimitedBufferSize) {
ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
dataFileSegments.add(sds);
bufferSegments = Lists.newArrayList();
@@ -128,14 +129,6 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
}
if (expectTaskIds.contains(taskAttemptId)) {
- if (bufferOffset != 0 && index - lastExpectedBlockIndex > 1) {
- List<Long> expectedTaskIds = getExpectedTaskIds(expectTaskIds);
- LOGGER.error("There are discontinuous blocks, all task ids in index file: {}, all expected task ids: {}, "
- + "current expected task id: {}, last unexpected task id: {}, current data segment size: {}",
- indexTaskIds, expectedTaskIds, taskAttemptId, lastTaskAttemptId, dataFileSegments.size());
- throw new RssException("There are discontinuous blocks which should not happen when using LOCAL_ORDER.");
- }
-
if (fileOffset == -1) {
fileOffset = offset;
}
@@ -143,8 +136,6 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
bufferOffset += length;
lastExpectedBlockIndex = index;
}
-
- lastTaskAttemptId = taskAttemptId;
index++;
} catch (BufferUnderflowException ue) {
throw new RssException("Read index data under flow", ue);
@@ -155,6 +146,11 @@ public class LocalOrderSegmentSplitter implements SegmentSplitter {
ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
dataFileSegments.add(sds);
}
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Index file task-ids sequence: {}, expected task-ids: {}",
+ indexTaskIds, getExpectedTaskIds(expectTaskIds));
+ }
return dataFileSegments;
}
diff --git a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
index a5fd016c..b93ab23b 100644
--- a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
@@ -32,10 +32,111 @@ import org.apache.uniffle.common.ShuffleIndexResult;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
public class LocalOrderSegmentSplitterTest {
+ @Test
+ public void testDiscontinuousMapTaskIds() {
+ // case1
+ Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(6, 7, 9);
+ byte[] data = generateData(
+ Pair.of(8, 4),
+ Pair.of(8, 5),
+ Pair.of(8, 6),
+ Pair.of(8, 7),
+ Pair.of(8, 8),
+ Pair.of(8, 9),
+ Pair.of(8, 6),
+ Pair.of(8, 9)
+ );
+ List<ShuffleDataSegment> dataSegments1 = new LocalOrderSegmentSplitter(taskIds, 1000)
+ .split(new ShuffleIndexResult(data, -1));
+
+ assertEquals(2, dataSegments1.size());
+ assertEquals(16, dataSegments1.get(0).getOffset());
+ assertEquals(16, dataSegments1.get(0).getLength());
+ assertEquals(40, dataSegments1.get(1).getOffset());
+ assertEquals(24, dataSegments1.get(1).getLength());
+
+ assertEquals(2, dataSegments1.get(0).getBufferSegments().size());
+ assertEquals(3, dataSegments1.get(1).getBufferSegments().size());
+
+ BufferSegment bufferSegment = dataSegments1.get(0).getBufferSegments().get(0);
+ assertEquals(0, bufferSegment.getOffset());
+ assertEquals(8, bufferSegment.getLength());
+ bufferSegment = dataSegments1.get(0).getBufferSegments().get(1);
+ assertEquals(8, bufferSegment.getOffset());
+ assertEquals(8, bufferSegment.getLength());
+
+ bufferSegment = dataSegments1.get(1).getBufferSegments().get(0);
+ assertEquals(0, bufferSegment.getOffset());
+ assertEquals(8, bufferSegment.getLength());
+ bufferSegment = dataSegments1.get(1).getBufferSegments().get(1);
+ assertEquals(8, bufferSegment.getOffset());
+ assertEquals(8, bufferSegment.getLength());
+ bufferSegment = dataSegments1.get(1).getBufferSegments().get(2);
+ assertEquals(16, bufferSegment.getOffset());
+ assertEquals(8, bufferSegment.getLength());
+
+ // case2
+ taskIds = Roaring64NavigableMap.bitmapOf(1, 2, 4);
+ data = generateData(
+ Pair.of(1, 1),
+ Pair.of(1, 2),
+ Pair.of(1, 3),
+ Pair.of(1, 4)
+ );
+ List<ShuffleDataSegment> dataSegments2 =
+ new LocalOrderSegmentSplitter(taskIds, 32).split(new ShuffleIndexResult(data, -1));
+ assertEquals(2, dataSegments2.size());
+ assertEquals(0, dataSegments2.get(0).getOffset());
+ assertEquals(2, dataSegments2.get(0).getLength());
+ assertEquals(2, dataSegments2.get(0).getBufferSegments().size());
+
+ assertEquals(3, dataSegments2.get(1).getOffset());
+ assertEquals(1, dataSegments2.get(1).getLength());
+ assertEquals(1, dataSegments2.get(1).getBufferSegments().size());
+
+ // case3
+ taskIds = Roaring64NavigableMap.bitmapOf(1, 2, 4);
+ data = generateData(
+ Pair.of(1, 1),
+ Pair.of(1, 2),
+ Pair.of(1, 1),
+ Pair.of(1, 1),
+ Pair.of(1, 3),
+ Pair.of(1, 4),
+ Pair.of(1, 1)
+ );
+ List<ShuffleDataSegment> dataSegments3 =
+ new LocalOrderSegmentSplitter(taskIds, 3).split(new ShuffleIndexResult(data, -1));
+ assertEquals(3, dataSegments3.size());
+ assertEquals(0, dataSegments3.get(0).getOffset());
+ assertEquals(3, dataSegments3.get(0).getLength());
+ assertEquals(3, dataSegments3.get(0).getBufferSegments().size());
+
+ assertEquals(3, dataSegments3.get(1).getOffset());
+ assertEquals(1, dataSegments3.get(1).getLength());
+ assertEquals(1, dataSegments3.get(1).getBufferSegments().size());
+
+ assertEquals(5, dataSegments3.get(2).getOffset());
+ assertEquals(2, dataSegments3.get(2).getLength());
+ assertEquals(2, dataSegments3.get(2).getBufferSegments().size());
+
+ // case4
+ taskIds = Roaring64NavigableMap.bitmapOf(1, 3);
+ data = generateData(
+ Pair.of(1, 1),
+ Pair.of(1, 2),
+ Pair.of(1, 3),
+ Pair.of(1, 2),
+ Pair.of(1, 4)
+ );
+ List<ShuffleDataSegment> dataSegments4 =
+ new LocalOrderSegmentSplitter(taskIds, 3).split(new ShuffleIndexResult(data, -1));
+ assertEquals(2, dataSegments4.size());
+ }
+
/**
* When no spark skew optimization and LOCAL_ORDER is enabled,
* the LOCAL_ORDER split segments should be consistent with the
@@ -96,24 +197,6 @@ public class LocalOrderSegmentSplitterTest {
}
}
- @Test
- public void testSplitWithDiscontinuousBlocksShouldThrowException() {
- Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1, 2, 4);
- LocalOrderSegmentSplitter splitter = new LocalOrderSegmentSplitter(taskIds, 32);
- byte[] data = generateData(
- Pair.of(1, 1),
- Pair.of(1, 2),
- Pair.of(1, 3),
- Pair.of(1, 4)
- );
- try {
- splitter.split(new ShuffleIndexResult(data, -1));
- fail();
- } catch (Exception e) {
- // ignore
- }
- }
-
@Test
public void testSplitForMergeContinuousSegments() {
/**