You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/07 04:37:47 UTC
[flink] branch master updated: [FLINK-30189][runtime] HsSubpartitionFileReader may load data that has been consumed from memory
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4c67f8fca52 [FLINK-30189][runtime] HsSubpartitionFileReader may load data that has been consumed from memory
4c67f8fca52 is described below
commit 4c67f8fca529a72389d69990307bbf78fcd3d99d
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Dec 6 19:24:36 2022 +0800
[FLINK-30189][runtime] HsSubpartitionFileReader may load data that has been consumed from memory
This closes #21415
---
.../hybrid/HsSubpartitionFileReaderImpl.java | 25 ++++++-----
.../hybrid/HsSubpartitionFileReaderImplTest.java | 48 ++++++++++++++++------
2 files changed, 50 insertions(+), 23 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index e40e917cfa1..c190690d45c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -265,19 +265,24 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
private Optional<BufferIndexOrError> checkAndGetFirstBufferIndexOrError(int expectedBufferIndex)
throws Throwable {
- if (loadedBuffers.isEmpty()) {
- return Optional.empty();
- }
-
BufferIndexOrError peek = loadedBuffers.peek();
-
- if (peek.getThrowable().isPresent()) {
- throw peek.getThrowable().get();
- } else if (peek.getIndex() != expectedBufferIndex) {
- return Optional.empty();
+ while (peek != null) {
+ if (peek.getThrowable().isPresent()) {
+ throw peek.getThrowable().get();
+ } else if (peek.getIndex() == expectedBufferIndex) {
+ break;
+ } else if (peek.getIndex() > expectedBufferIndex) {
+ return Optional.empty();
+ } else if (peek.getIndex() < expectedBufferIndex) {
+ // Because the update of consumption progress may be delayed, there is a
+ // very small probability to load the buffer that has been consumed from memory.
+ // Skip these buffers directly to avoid repeated consumption.
+ loadedBuffers.poll();
+ peek = loadedBuffers.peek();
+ }
}
- return Optional.of(peek);
+ return Optional.ofNullable(peek);
}
private void moveFileOffsetToBuffer(int bufferIndex) throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index cccf9f5d93e..b8f5d206a78 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -398,22 +398,20 @@ class HsSubpartitionFileReaderImplTest {
// if no preload data in file reader, return Optional.empty.
assertThat(subpartitionFileReader.consumeBuffer(0)).isNotPresent();
- // buffers in file: (0-0, 0-1)
- writeDataToFile(0, 0, 0, 2);
+ // buffers in file: (0-0, 0-1, 0-2)
+ writeDataToFile(0, 0, 0, 3);
- Queue<MemorySegment> memorySegments = createsMemorySegments(2);
+ Queue<MemorySegment> memorySegments = createsMemorySegments(3);
subpartitionFileReader.prepareForScheduling();
// trigger reading, add buffer to queue.
subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
- // if nextBufferToConsume is not equal to peek elements index, return Optional.empty.
- assertThat(subpartitionFileReader.consumeBuffer(10)).isNotPresent();
-
+ // if nextBufferToConsume is equal to peek elements index.
assertThat(subpartitionFileReader.consumeBuffer(0))
.hasValueSatisfying(
(bufferAndBacklog -> {
assertThat(bufferAndBacklog.getNextDataType())
- .isEqualTo(DataType.EVENT_BUFFER);
+ .isEqualTo(DataType.DATA_BUFFER);
assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(0);
// first buffer's data is 0.
assertThat(
@@ -424,6 +422,26 @@ class HsSubpartitionFileReaderImplTest {
.getInt())
.isEqualTo(0);
}));
+
+ // if nextBufferToConsume is less than peek elements index, return Optional.empty.
+ assertThat(subpartitionFileReader.consumeBuffer(0)).isNotPresent();
+
+ // if nextBufferToConsume is greater than peek elements index, skip this buffer and keep
+ // looking.
+ assertThat(subpartitionFileReader.consumeBuffer(2))
+ .hasValueSatisfying(
+ (bufferAndBacklog -> {
+ assertThat(bufferAndBacklog.getNextDataType()).isEqualTo(DataType.NONE);
+ assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(2);
+ assertThat(
+ bufferAndBacklog
+ .buffer()
+ .getNioBufferReadable()
+ .order(ByteOrder.nativeOrder())
+ .getInt())
+ .isEqualTo(2);
+ }));
+ assertThat(subpartitionFileReader.getLoadedBuffers()).isEmpty();
}
@Test
@@ -454,20 +472,24 @@ class HsSubpartitionFileReaderImplTest {
// if no preload data in file reader, return DataType.NONE.
assertThat(subpartitionFileReader.peekNextToConsumeDataType(0)).isEqualTo(DataType.NONE);
- // buffers in file: (0-0, 0-1)
- writeDataToFile(0, 0, 2);
+ // buffers in file: (0-0, 0-1, 0-2)
+ writeDataToFile(0, 0, 3);
- Queue<MemorySegment> memorySegments = createsMemorySegments(2);
+ Queue<MemorySegment> memorySegments = createsMemorySegments(3);
subpartitionFileReader.prepareForScheduling();
// trigger reading, add buffer to queue.
subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
- // if nextBufferToConsume is not equal to peek elements index, return DataType.NONE.
- assertThat(subpartitionFileReader.peekNextToConsumeDataType(10)).isEqualTo(DataType.NONE);
-
// if nextBufferToConsume is equal to peek elements index, return the real DataType.
assertThat(subpartitionFileReader.peekNextToConsumeDataType(0))
.isEqualTo(DataType.DATA_BUFFER);
+
+ // if nextBufferToConsume is greater than peek elements index, skip this buffer and keep
+ // looking.
+ assertThat(subpartitionFileReader.peekNextToConsumeDataType(2))
+ .isEqualTo(DataType.EVENT_BUFFER);
+ // if nextBufferToConsume is less than peek elements index, return DataType.NONE.
+ assertThat(subpartitionFileReader.peekNextToConsumeDataType(1)).isEqualTo(DataType.NONE);
}
/**