You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/07 04:38:30 UTC

[flink] branch release-1.16 updated: [FLINK-30189][runtime] HsSubpartitionFileReader may load data that has been consumed from memory

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new a9e65bc2377 [FLINK-30189][runtime] HsSubpartitionFileReader may load data that has been consumed from memory
a9e65bc2377 is described below

commit a9e65bc2377ee7a4b3599b58a58ff0301b79c5d8
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Dec 6 19:24:36 2022 +0800

    [FLINK-30189][runtime] HsSubpartitionFileReader may load data that has been consumed from memory
    
    This closes #21415
---
 .../hybrid/HsSubpartitionFileReaderImpl.java       | 25 ++++++-----
 .../hybrid/HsSubpartitionFileReaderImplTest.java   | 48 ++++++++++++++++------
 2 files changed, 50 insertions(+), 23 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index e6dc7122c5e..88d730b14ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -261,19 +261,24 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
 
     private Optional<BufferIndexOrError> checkAndGetFirstBufferIndexOrError(int expectedBufferIndex)
             throws Throwable {
-        if (loadedBuffers.isEmpty()) {
-            return Optional.empty();
-        }
-
         BufferIndexOrError peek = loadedBuffers.peek();
-
-        if (peek.getThrowable().isPresent()) {
-            throw peek.getThrowable().get();
-        } else if (peek.getIndex() != expectedBufferIndex) {
-            return Optional.empty();
+        while (peek != null) {
+            if (peek.getThrowable().isPresent()) {
+                throw peek.getThrowable().get();
+            } else if (peek.getIndex() == expectedBufferIndex) {
+                break;
+            } else if (peek.getIndex() > expectedBufferIndex) {
+                return Optional.empty();
+            } else if (peek.getIndex() < expectedBufferIndex) {
+                // Because the update of consumption progress may be delayed, there is a
+                // very small probability to load the buffer that has been consumed from memory.
+                // Skip these buffers directly to avoid repeated consumption.
+                loadedBuffers.poll();
+                peek = loadedBuffers.peek();
+            }
         }
 
-        return Optional.of(peek);
+        return Optional.ofNullable(peek);
     }
 
     private void moveFileOffsetToBuffer(int bufferIndex) throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index 03a560589d6..411a3131975 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -398,22 +398,20 @@ class HsSubpartitionFileReaderImplTest {
         // if no preload data in file reader, return Optional.empty.
         assertThat(subpartitionFileReader.consumeBuffer(0)).isNotPresent();
 
-        // buffers in file: (0-0, 0-1)
-        writeDataToFile(0, 0, 0, 2);
+        // buffers in file: (0-0, 0-1, 0-2)
+        writeDataToFile(0, 0, 0, 3);
 
-        Queue<MemorySegment> memorySegments = createsMemorySegments(2);
+        Queue<MemorySegment> memorySegments = createsMemorySegments(3);
         subpartitionFileReader.prepareForScheduling();
         // trigger reading, add buffer to queue.
         subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
 
-        // if nextBufferToConsume is not equal to peek elements index, return Optional.empty.
-        assertThat(subpartitionFileReader.consumeBuffer(10)).isNotPresent();
-
+        // if nextBufferToConsume is equal to peek elements index.
         assertThat(subpartitionFileReader.consumeBuffer(0))
                 .hasValueSatisfying(
                         (bufferAndBacklog -> {
                             assertThat(bufferAndBacklog.getNextDataType())
-                                    .isEqualTo(DataType.EVENT_BUFFER);
+                                    .isEqualTo(DataType.DATA_BUFFER);
                             assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(0);
                             // first buffer's data is 0.
                             assertThat(
@@ -424,6 +422,26 @@ class HsSubpartitionFileReaderImplTest {
                                                     .getInt())
                                     .isEqualTo(0);
                         }));
+
+        // if nextBufferToConsume is less than peek elements index, return Optional.empty.
+        assertThat(subpartitionFileReader.consumeBuffer(0)).isNotPresent();
+
+        // if nextBufferToConsume is greater than peek elements index, skip this buffer and keep
+        // looking.
+        assertThat(subpartitionFileReader.consumeBuffer(2))
+                .hasValueSatisfying(
+                        (bufferAndBacklog -> {
+                            assertThat(bufferAndBacklog.getNextDataType()).isEqualTo(DataType.NONE);
+                            assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(2);
+                            assertThat(
+                                            bufferAndBacklog
+                                                    .buffer()
+                                                    .getNioBufferReadable()
+                                                    .order(ByteOrder.nativeOrder())
+                                                    .getInt())
+                                    .isEqualTo(2);
+                        }));
+        assertThat(subpartitionFileReader.getLoadedBuffers()).isEmpty();
     }
 
     @Test
@@ -454,20 +472,24 @@ class HsSubpartitionFileReaderImplTest {
         // if no preload data in file reader, return DataType.NONE.
         assertThat(subpartitionFileReader.peekNextToConsumeDataType(0)).isEqualTo(DataType.NONE);
 
-        // buffers in file: (0-0, 0-1)
-        writeDataToFile(0, 0, 2);
+        // buffers in file: (0-0, 0-1, 0-2)
+        writeDataToFile(0, 0, 3);
 
-        Queue<MemorySegment> memorySegments = createsMemorySegments(2);
+        Queue<MemorySegment> memorySegments = createsMemorySegments(3);
         subpartitionFileReader.prepareForScheduling();
         // trigger reading, add buffer to queue.
         subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
 
-        // if nextBufferToConsume is not equal to peek elements index, return DataType.NONE.
-        assertThat(subpartitionFileReader.peekNextToConsumeDataType(10)).isEqualTo(DataType.NONE);
-
         // if nextBufferToConsume is equal to peek elements index, return the real DataType.
         assertThat(subpartitionFileReader.peekNextToConsumeDataType(0))
                 .isEqualTo(DataType.DATA_BUFFER);
+
+        // if nextBufferToConsume is greater than peek elements index, skip this buffer and keep
+        // looking.
+        assertThat(subpartitionFileReader.peekNextToConsumeDataType(2))
+                .isEqualTo(DataType.EVENT_BUFFER);
+        // if nextBufferToConsume is less than peek elements index, return DataType.NONE.
+        assertThat(subpartitionFileReader.peekNextToConsumeDataType(1)).isEqualTo(DataType.NONE);
     }
 
     /**