You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2022/11/22 03:43:58 UTC
[incubator-uniffle] branch master updated: [BUG][AQE][LocalOrder] Fix the bug of missed data due to block sorting (#347)
This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5b5acb5f [BUG][AQE][LocalOrder] Fix the bug of missed data due to block sorting (#347)
5b5acb5f is described below
commit 5b5acb5f3bcf8e12cc42612754d325aca4e3b991
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Tue Nov 22 11:43:53 2022 +0800
[BUG][AQE][LocalOrder] Fix the bug of missed data due to block sorting (#347)
### What changes were proposed in this pull request?
1. Create a extra reference copy of inFlushed blocks to avoid breaking down original sequence, if not, it will cause missed data when reading due to the sequence reading when getting memory data.
### Why are the changes needed?
Fix the bug of missed data due to block sorting
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
1. UTs
---
.../uniffle/server/buffer/ShuffleBuffer.java | 9 ++++-
.../uniffle/server/buffer/BufferTestBase.java | 7 ++--
.../uniffle/server/buffer/ShuffleBufferTest.java | 41 ++++++++++++++++++++++
3 files changed, 54 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index fdec31f6..ad0b31af 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -82,7 +82,14 @@ public class ShuffleBuffer {
}
// buffer will be cleared, and new list must be created for async flush
List<ShufflePartitionedBlock> spBlocks = new LinkedList<>(blocks);
+ List<ShufflePartitionedBlock> inFlushedQueueBlocks = spBlocks;
if (dataDistributionType == ShuffleDataDistributionType.LOCAL_ORDER) {
+ /**
+ * When reordering the blocks, it will break down the original reads sequence to cause
+ * the data lost in some cases.
+ * So we should create a reference copy to avoid this.
+ */
+ inFlushedQueueBlocks = new LinkedList<>(spBlocks);
spBlocks.sort((o1, o2) -> new Long(o1.getTaskAttemptId()).compareTo(o2.getTaskAttemptId()));
}
long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement();
@@ -96,7 +103,7 @@ public class ShuffleBuffer {
spBlocks,
isValid,
this);
- inFlushBlockMap.put(eventId, spBlocks);
+ inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
blocks.clear();
size = 0;
return event;
diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java b/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
index bfdd4895..837448a5 100644
--- a/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
+++ b/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
@@ -47,13 +47,16 @@ public abstract class BufferTestBase {
}
protected ShufflePartitionedData createData(int partitionId, int len) {
+ return createData(partitionId, 0, len);
+ }
+
+ protected ShufflePartitionedData createData(int partitionId, int taskAttemptId, int len) {
byte[] buf = new byte[len];
new Random().nextBytes(buf);
ShufflePartitionedBlock block = new ShufflePartitionedBlock(
- len, len, ChecksumUtils.getCrc32(buf), atomBlockId.incrementAndGet(), 0, buf);
+ len, len, ChecksumUtils.getCrc32(buf), atomBlockId.incrementAndGet(), taskAttemptId, buf);
ShufflePartitionedData data = new ShufflePartitionedData(
partitionId, new ShufflePartitionedBlock[]{block});
return data;
}
-
}
diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
index bb59f23c..d275449a 100644
--- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
@@ -81,6 +82,46 @@ public class ShuffleBufferTest extends BufferTestBase {
assertEquals(0, shuffleBuffer.getBlocks().size());
}
+ @Test
+ public void getShuffleDataWithLocalOrderTest() {
+ ShuffleBuffer shuffleBuffer = new ShuffleBuffer(200);
+ ShufflePartitionedData spd1 = createData(1, 1, 15);
+ ShufflePartitionedData spd2 = createData(1, 0, 15);
+ ShufflePartitionedData spd3 = createData(1, 2, 15);
+ shuffleBuffer.append(spd1);
+ shuffleBuffer.append(spd2);
+ shuffleBuffer.append(spd3);
+
+ // First read from the cached data
+ ShuffleDataResult sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 16);
+ byte[] expectedData = getExpectedData(spd1, spd2);
+ compareBufferSegment(shuffleBuffer.getBlocks(), sdr.getBufferSegments(), 0, 2);
+ assertArrayEquals(expectedData, sdr.getData());
+
+ // Second read after flushed
+ ShuffleDataFlushEvent event1 = shuffleBuffer.toFlushEvent(
+ "appId",
+ 0,
+ 0,
+ 1,
+ null,
+ ShuffleDataDistributionType.LOCAL_ORDER
+ );
+ long lastBlockId = sdr.getBufferSegments().get(1).getBlockId();
+ sdr = shuffleBuffer.getShuffleData(lastBlockId, 16);
+ expectedData = getExpectedData(spd3);
+ compareBufferSegment(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()), sdr.getBufferSegments(), 2, 1);
+ assertArrayEquals(expectedData, sdr.getData());
+
+ assertEquals(0, event1.getShuffleBlocks().get(0).getTaskAttemptId());
+ assertEquals(1, event1.getShuffleBlocks().get(1).getTaskAttemptId());
+ assertEquals(2, event1.getShuffleBlocks().get(2).getTaskAttemptId());
+
+ assertEquals(1, shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()).get(0).getTaskAttemptId());
+ assertEquals(0, shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()).get(1).getTaskAttemptId());
+ assertEquals(2, shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()).get(2).getTaskAttemptId());
+ }
+
@Test
public void getShuffleDataTest() {
ShuffleBuffer shuffleBuffer = new ShuffleBuffer(200);