You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/11 09:48:40 UTC

[flink] 02/02: [FLINK-28884] Reset needNotify to true when get a zero backlog.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0b5aa420180729d8792c5d6feecfeb14382f72ee
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Aug 10 14:49:00 2022 +0800

    [FLINK-28884] Reset needNotify to true when get a zero backlog.
    
    HsSubpartitionView should be notifiable when downstream get a zero backlog. Generally speaking, if the backlog is zero, when data become available, even if there is no credit, the backlog information will be notified also. However, in the hybrid shuffle, the notification will be ignored. This behavior is incorrect.
    
    This closes #20529
---
 .../network/partition/hybrid/HsSubpartitionView.java   |  7 ++++++-
 .../partition/hybrid/HsSubpartitionViewTest.java       | 18 ++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
index e40bcbf6b4f..704d52703a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
@@ -118,7 +118,12 @@ public class HsSubpartitionView
                     && cachedNextDataType == Buffer.DataType.EVENT_BUFFER) {
                 availability = true;
             }
-            return new AvailabilityWithBacklog(availability, getSubpartitionBacklog());
+
+            int backlog = getSubpartitionBacklog();
+            if (backlog == 0) {
+                needNotify = true;
+            }
+            return new AvailabilityWithBacklog(availability, backlog);
         }
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
index 659984f2f0a..b81373ac11e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -271,6 +271,24 @@ class HsSubpartitionViewTest {
         assertThat(notifyAvailableFuture).isNotCompleted();
     }
 
+    @Test
+    void testGetZeroBacklogNeedNotify() {
+        CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>();
+        HsSubpartitionView subpartitionView =
+                createSubpartitionView(() -> notifyAvailableFuture.complete(null));
+        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+        subpartitionView.setDiskDataView(
+                TestingHsDataView.builder().setGetBacklogSupplier(() -> 0).build());
+
+        AvailabilityWithBacklog availabilityAndBacklog =
+                subpartitionView.getAvailabilityAndBacklog(0);
+        assertThat(availabilityAndBacklog.getBacklog()).isZero();
+
+        assertThat(notifyAvailableFuture).isNotCompleted();
+        subpartitionView.notifyDataAvailable();
+        assertThat(notifyAvailableFuture).isCompleted();
+    }
+
     @Test
     void testGetAvailabilityAndBacklogPositiveCredit() {
         HsSubpartitionView subpartitionView = createSubpartitionView();