You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2022/11/22 03:43:58 UTC

[incubator-uniffle] branch master updated: [BUG][AQE][LocalOrder] Fix the bug of missed data due to block sorting (#347)

This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b5acb5f [BUG][AQE][LocalOrder] Fix the bug of missed data due to block sorting (#347)
5b5acb5f is described below

commit 5b5acb5f3bcf8e12cc42612754d325aca4e3b991
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Tue Nov 22 11:43:53 2022 +0800

    [BUG][AQE][LocalOrder] Fix the bug of missed data due to block sorting (#347)
    
    
    ### What changes were proposed in this pull request?
    
    1. Create a extra reference copy of inFlushed blocks to avoid breaking down original sequence,  if not, it will cause missed data when reading due to the sequence reading when getting memory data.
    
    ### Why are the changes needed?
    
    Fix the bug of missed data due to block sorting
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    1. UTs
---
 .../uniffle/server/buffer/ShuffleBuffer.java       |  9 ++++-
 .../uniffle/server/buffer/BufferTestBase.java      |  7 ++--
 .../uniffle/server/buffer/ShuffleBufferTest.java   | 41 ++++++++++++++++++++++
 3 files changed, 54 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index fdec31f6..ad0b31af 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -82,7 +82,14 @@ public class ShuffleBuffer {
     }
     // buffer will be cleared, and new list must be created for async flush
     List<ShufflePartitionedBlock> spBlocks = new LinkedList<>(blocks);
+    List<ShufflePartitionedBlock> inFlushedQueueBlocks = spBlocks;
     if (dataDistributionType == ShuffleDataDistributionType.LOCAL_ORDER) {
+      /**
+       * When reordering the blocks, it will break down the original reads sequence to cause
+       * the data lost in some cases.
+       * So we should create a reference copy to avoid this.
+       */
+      inFlushedQueueBlocks = new LinkedList<>(spBlocks);
       spBlocks.sort((o1, o2) -> new Long(o1.getTaskAttemptId()).compareTo(o2.getTaskAttemptId()));
     }
     long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement();
@@ -96,7 +103,7 @@ public class ShuffleBuffer {
         spBlocks,
         isValid,
         this);
-    inFlushBlockMap.put(eventId, spBlocks);
+    inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
     blocks.clear();
     size = 0;
     return event;
diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java b/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
index bfdd4895..837448a5 100644
--- a/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
+++ b/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
@@ -47,13 +47,16 @@ public abstract class BufferTestBase {
   }
 
   protected ShufflePartitionedData createData(int partitionId, int len) {
+    return createData(partitionId, 0, len);
+  }
+
+  protected ShufflePartitionedData createData(int partitionId, int taskAttemptId, int len) {
     byte[] buf = new byte[len];
     new Random().nextBytes(buf);
     ShufflePartitionedBlock block = new ShufflePartitionedBlock(
-        len, len, ChecksumUtils.getCrc32(buf), atomBlockId.incrementAndGet(), 0, buf);
+        len, len, ChecksumUtils.getCrc32(buf), atomBlockId.incrementAndGet(), taskAttemptId, buf);
     ShufflePartitionedData data = new ShufflePartitionedData(
         partitionId, new ShufflePartitionedBlock[]{block});
     return data;
   }
-
 }
diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
index bb59f23c..d275449a 100644
--- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShufflePartitionedData;
@@ -81,6 +82,46 @@ public class ShuffleBufferTest extends BufferTestBase {
     assertEquals(0, shuffleBuffer.getBlocks().size());
   }
 
+  @Test
+  public void getShuffleDataWithLocalOrderTest() {
+    ShuffleBuffer shuffleBuffer = new ShuffleBuffer(200);
+    ShufflePartitionedData spd1 = createData(1, 1, 15);
+    ShufflePartitionedData spd2 = createData(1, 0, 15);
+    ShufflePartitionedData spd3 = createData(1, 2, 15);
+    shuffleBuffer.append(spd1);
+    shuffleBuffer.append(spd2);
+    shuffleBuffer.append(spd3);
+
+    // First read from the cached data
+    ShuffleDataResult sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 16);
+    byte[] expectedData = getExpectedData(spd1, spd2);
+    compareBufferSegment(shuffleBuffer.getBlocks(), sdr.getBufferSegments(), 0, 2);
+    assertArrayEquals(expectedData, sdr.getData());
+
+    // Second read after flushed
+    ShuffleDataFlushEvent event1 = shuffleBuffer.toFlushEvent(
+        "appId",
+        0,
+        0,
+        1,
+        null,
+        ShuffleDataDistributionType.LOCAL_ORDER
+    );
+    long lastBlockId = sdr.getBufferSegments().get(1).getBlockId();
+    sdr = shuffleBuffer.getShuffleData(lastBlockId, 16);
+    expectedData = getExpectedData(spd3);
+    compareBufferSegment(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()), sdr.getBufferSegments(), 2, 1);
+    assertArrayEquals(expectedData, sdr.getData());
+
+    assertEquals(0, event1.getShuffleBlocks().get(0).getTaskAttemptId());
+    assertEquals(1, event1.getShuffleBlocks().get(1).getTaskAttemptId());
+    assertEquals(2, event1.getShuffleBlocks().get(2).getTaskAttemptId());
+
+    assertEquals(1, shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()).get(0).getTaskAttemptId());
+    assertEquals(0, shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()).get(1).getTaskAttemptId());
+    assertEquals(2, shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()).get(2).getTaskAttemptId());
+  }
+
   @Test
   public void getShuffleDataTest() {
     ShuffleBuffer shuffleBuffer = new ShuffleBuffer(200);