You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/05 01:55:05 UTC
[flink] branch master updated: [FLINK-27908] HsSubpartitionView should calculate backlog no less than true value.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2dd51f644da [FLINK-27908] HsSubpartitionView should calculate backlog no less than true value.
2dd51f644da is described below
commit 2dd51f644da14a88914cc7aeeb021f824a28ee48
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Aug 3 13:43:18 2022 +0800
[FLINK-27908] HsSubpartitionView should calculate backlog no less than true value.
This closes #20445
---
.../partition/hybrid/HsSubpartitionView.java | 30 +++++++++++++----
.../partition/hybrid/HsSubpartitionViewTest.java | 39 ++++++++++++++++++++--
2 files changed, 61 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
index 5aa7e8bac0b..b7af284287b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
@@ -83,7 +83,7 @@ public class HsSubpartitionView
bufferToConsume = memoryDataView.consumeBuffer(lastConsumedBufferIndex + 1);
}
updateConsumingStatus(bufferToConsume);
- return bufferToConsume.orElse(null);
+ return bufferToConsume.map(this::handleBacklog).orElse(null);
} catch (Throwable cause) {
releaseInternal(cause);
return null;
@@ -118,7 +118,7 @@ public class HsSubpartitionView
&& cachedNextDataType == Buffer.DataType.EVENT_BUFFER) {
availability = true;
}
- return new AvailabilityWithBacklog(availability, diskDataView.getBacklog());
+ return new AvailabilityWithBacklog(availability, getSubpartitionBacklog());
}
}
@@ -182,16 +182,16 @@ public class HsSubpartitionView
// the ack, as there are no checkpoints
}
- @SuppressWarnings("FieldAccessNotGuarded")
@Override
public int unsynchronizedGetNumberOfQueuedBuffers() {
- return diskDataView.getBacklog();
+ return getSubpartitionBacklog();
}
- @SuppressWarnings("FieldAccessNotGuarded")
@Override
public int getNumberOfQueuedBuffers() {
- return diskDataView.getBacklog();
+ synchronized (lock) {
+ return getSubpartitionBacklog();
+ }
}
@Override
@@ -203,6 +203,24 @@ public class HsSubpartitionView
// Internal Methods
// -------------------------------
+ @SuppressWarnings("FieldAccessNotGuarded")
+ private int getSubpartitionBacklog() {
+ if (memoryDataView == null || diskDataView == null) {
+ return 0;
+ }
+ return Math.max(memoryDataView.getBacklog(), diskDataView.getBacklog());
+ }
+
+ private BufferAndBacklog handleBacklog(BufferAndBacklog bufferToConsume) {
+ return bufferToConsume.buffersInBacklog() == 0
+ ? new BufferAndBacklog(
+ bufferToConsume.buffer(),
+ getSubpartitionBacklog(),
+ bufferToConsume.getNextDataType(),
+ bufferToConsume.getSequenceNumber())
+ : bufferToConsume;
+ }
+
@GuardedBy("lock")
private Optional<BufferAndBacklog> tryReadFromDisk() throws Throwable {
final int nextBufferIndexToConsume = lastConsumedBufferIndex + 1;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
index 77a797a80f1..b3331b12dbc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -40,7 +40,7 @@ class HsSubpartitionViewTest {
void testGetNextBufferFromDisk() {
HsSubpartitionView subpartitionView = createSubpartitionView();
- BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.DATA_BUFFER, 0);
+ BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(1, DataType.DATA_BUFFER, 0);
CompletableFuture<Void> consumeBufferFromMemoryFuture = new CompletableFuture<>();
TestingHsDataView diskDataView =
TestingHsDataView.builder()
@@ -97,7 +97,7 @@ class HsSubpartitionViewTest {
void testGetNextBufferFromMemory() {
HsSubpartitionView subpartitionView = createSubpartitionView();
- BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.DATA_BUFFER, 0);
+ BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(1, DataType.DATA_BUFFER, 0);
TestingHsDataView memoryDataView =
TestingHsDataView.builder()
.setConsumeBufferFunction(
@@ -135,6 +135,41 @@ class HsSubpartitionViewTest {
assertThat(subpartitionView.isReleased()).isTrue();
}
+ @Test
+ void testGetNextBufferZeroBacklog() {
+ HsSubpartitionView subpartitionView = createSubpartitionView();
+
+ final int diskBacklog = 0;
+ final int memoryBacklog = 10;
+ BufferAndBacklog targetBufferAndBacklog =
+ createBufferAndBacklog(diskBacklog, DataType.DATA_BUFFER, 0);
+
+ TestingHsDataView diskDataView =
+ TestingHsDataView.builder()
+ .setConsumeBufferFunction(
+ (bufferToConsume) -> Optional.of(targetBufferAndBacklog))
+ .build();
+ TestingHsDataView memoryDataView =
+ TestingHsDataView.builder().setGetBacklogSupplier(() -> memoryBacklog).build();
+ subpartitionView.setDiskDataView(diskDataView);
+ subpartitionView.setMemoryDataView(memoryDataView);
+
+ assertThat(subpartitionView.getNextBuffer())
+ .satisfies(
+ (bufferAndBacklog -> {
+ // backlog is reset to maximum backlog of memory and disk.
+ assertThat(bufferAndBacklog.buffersInBacklog())
+ .isEqualTo(memoryBacklog);
+ // other field is not changed.
+ assertThat(bufferAndBacklog.buffer())
+ .isEqualTo(targetBufferAndBacklog.buffer());
+ assertThat(bufferAndBacklog.getNextDataType())
+ .isEqualTo(targetBufferAndBacklog.getNextDataType());
+ assertThat(bufferAndBacklog.getSequenceNumber())
+ .isEqualTo(targetBufferAndBacklog.getSequenceNumber());
+ }));
+ }
+
@Test
void testNotifyDataAvailableNeedNotify() {
CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>();