You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/05 01:55:05 UTC

[flink] branch master updated: [FLINK-27908] HsSubpartitionView should calculate backlog no less than true value.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2dd51f644da [FLINK-27908] HsSubpartitionView should calculate backlog no less than true value.
2dd51f644da is described below

commit 2dd51f644da14a88914cc7aeeb021f824a28ee48
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Aug 3 13:43:18 2022 +0800

    [FLINK-27908] HsSubpartitionView should calculate backlog no less than true value.
    
    This closes #20445
---
 .../partition/hybrid/HsSubpartitionView.java       | 30 +++++++++++++----
 .../partition/hybrid/HsSubpartitionViewTest.java   | 39 ++++++++++++++++++++--
 2 files changed, 61 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
index 5aa7e8bac0b..b7af284287b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
@@ -83,7 +83,7 @@ public class HsSubpartitionView
                     bufferToConsume = memoryDataView.consumeBuffer(lastConsumedBufferIndex + 1);
                 }
                 updateConsumingStatus(bufferToConsume);
-                return bufferToConsume.orElse(null);
+                return bufferToConsume.map(this::handleBacklog).orElse(null);
             } catch (Throwable cause) {
                 releaseInternal(cause);
                 return null;
@@ -118,7 +118,7 @@ public class HsSubpartitionView
                     && cachedNextDataType == Buffer.DataType.EVENT_BUFFER) {
                 availability = true;
             }
-            return new AvailabilityWithBacklog(availability, diskDataView.getBacklog());
+            return new AvailabilityWithBacklog(availability, getSubpartitionBacklog());
         }
     }
 
@@ -182,16 +182,16 @@ public class HsSubpartitionView
         // the ack, as there are no checkpoints
     }
 
-    @SuppressWarnings("FieldAccessNotGuarded")
     @Override
     public int unsynchronizedGetNumberOfQueuedBuffers() {
-        return diskDataView.getBacklog();
+        return getSubpartitionBacklog();
     }
 
-    @SuppressWarnings("FieldAccessNotGuarded")
     @Override
     public int getNumberOfQueuedBuffers() {
-        return diskDataView.getBacklog();
+        synchronized (lock) {
+            return getSubpartitionBacklog();
+        }
     }
 
     @Override
@@ -203,6 +203,24 @@ public class HsSubpartitionView
     //       Internal Methods
     // -------------------------------
 
+    @SuppressWarnings("FieldAccessNotGuarded")
+    private int getSubpartitionBacklog() {
+        if (memoryDataView == null || diskDataView == null) {
+            return 0;
+        }
+        return Math.max(memoryDataView.getBacklog(), diskDataView.getBacklog());
+    }
+
+    private BufferAndBacklog handleBacklog(BufferAndBacklog bufferToConsume) {
+        return bufferToConsume.buffersInBacklog() == 0
+                ? new BufferAndBacklog(
+                        bufferToConsume.buffer(),
+                        getSubpartitionBacklog(),
+                        bufferToConsume.getNextDataType(),
+                        bufferToConsume.getSequenceNumber())
+                : bufferToConsume;
+    }
+
     @GuardedBy("lock")
     private Optional<BufferAndBacklog> tryReadFromDisk() throws Throwable {
         final int nextBufferIndexToConsume = lastConsumedBufferIndex + 1;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
index 77a797a80f1..b3331b12dbc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -40,7 +40,7 @@ class HsSubpartitionViewTest {
     void testGetNextBufferFromDisk() {
         HsSubpartitionView subpartitionView = createSubpartitionView();
 
-        BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.DATA_BUFFER, 0);
+        BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(1, DataType.DATA_BUFFER, 0);
         CompletableFuture<Void> consumeBufferFromMemoryFuture = new CompletableFuture<>();
         TestingHsDataView diskDataView =
                 TestingHsDataView.builder()
@@ -97,7 +97,7 @@ class HsSubpartitionViewTest {
     void testGetNextBufferFromMemory() {
         HsSubpartitionView subpartitionView = createSubpartitionView();
 
-        BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.DATA_BUFFER, 0);
+        BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(1, DataType.DATA_BUFFER, 0);
         TestingHsDataView memoryDataView =
                 TestingHsDataView.builder()
                         .setConsumeBufferFunction(
@@ -135,6 +135,41 @@ class HsSubpartitionViewTest {
         assertThat(subpartitionView.isReleased()).isTrue();
     }
 
+    @Test
+    void testGetNextBufferZeroBacklog() {
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+
+        final int diskBacklog = 0;
+        final int memoryBacklog = 10;
+        BufferAndBacklog targetBufferAndBacklog =
+                createBufferAndBacklog(diskBacklog, DataType.DATA_BUFFER, 0);
+
+        TestingHsDataView diskDataView =
+                TestingHsDataView.builder()
+                        .setConsumeBufferFunction(
+                                (bufferToConsume) -> Optional.of(targetBufferAndBacklog))
+                        .build();
+        TestingHsDataView memoryDataView =
+                TestingHsDataView.builder().setGetBacklogSupplier(() -> memoryBacklog).build();
+        subpartitionView.setDiskDataView(diskDataView);
+        subpartitionView.setMemoryDataView(memoryDataView);
+
+        assertThat(subpartitionView.getNextBuffer())
+                .satisfies(
+                        (bufferAndBacklog -> {
+                            // backlog is reset to maximum backlog of memory and disk.
+                            assertThat(bufferAndBacklog.buffersInBacklog())
+                                    .isEqualTo(memoryBacklog);
+                            // other field is not changed.
+                            assertThat(bufferAndBacklog.buffer())
+                                    .isEqualTo(targetBufferAndBacklog.buffer());
+                            assertThat(bufferAndBacklog.getNextDataType())
+                                    .isEqualTo(targetBufferAndBacklog.getNextDataType());
+                            assertThat(bufferAndBacklog.getSequenceNumber())
+                                    .isEqualTo(targetBufferAndBacklog.getSequenceNumber());
+                        }));
+    }
+
     @Test
     void testNotifyDataAvailableNeedNotify() {
         CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>();