You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/11 09:48:38 UTC

[flink] branch master updated (3143526637e -> 0b5aa420180)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 3143526637e [FLINK-28783] [flink-core] Fix typo in ConfigOptions's example code
     new 0f376641e88 [FLINK-28884] HsSubpartitionView should be initialized to a notifiable state.
     new 0b5aa420180 [FLINK-28884] Reset needNotify to true when get a zero backlog.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../network/partition/hybrid/HsSubpartitionView.java   |  9 +++++++--
 .../partition/hybrid/HsSubpartitionViewTest.java       | 18 ++++++++++++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)


[flink] 01/02: [FLINK-28884] HsSubpartitionView should be initialized to a notifiable state.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0f376641e88794ab499f0203be4820696694eba7
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Aug 5 10:33:23 2022 +0800

    [FLINK-28884] HsSubpartitionView should be initialized to a notifiable state.
    
    HsSubpartitionView should be initialized to a notifiable state, there may be a problem of never consuming otherwise. Imagine the following situation: Downstream task has no initial credit(i.e. exclusive buffers is configured to zero), if there is no data output in the upstream, it will feedback a zero backlog to downstream input channel. All subsequent data available notifications will be intercepted as needNotify is false.
---
 .../flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
index a56947b0e01..e40bcbf6b4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
@@ -43,7 +43,7 @@ public class HsSubpartitionView
     private int lastConsumedBufferIndex = -1;
 
     @GuardedBy("lock")
-    private boolean needNotify = false;
+    private boolean needNotify = true;
 
     @Nullable
     @GuardedBy("lock")


[flink] 02/02: [FLINK-28884] Reset needNotify to true when get a zero backlog.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0b5aa420180729d8792c5d6feecfeb14382f72ee
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Aug 10 14:49:00 2022 +0800

    [FLINK-28884] Reset needNotify to true when get a zero backlog.
    
    HsSubpartitionView should be notifiable when downstream get a zero backlog. Generally speaking, if the backlog is zero, when data become available, even if there is no credit, the backlog information will be notified also. However, in the hybrid shuffle, the notification will be ignored. This behavior is incorrect.
    
    This closes #20529
---
 .../network/partition/hybrid/HsSubpartitionView.java   |  7 ++++++-
 .../partition/hybrid/HsSubpartitionViewTest.java       | 18 ++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
index e40bcbf6b4f..704d52703a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
@@ -118,7 +118,12 @@ public class HsSubpartitionView
                     && cachedNextDataType == Buffer.DataType.EVENT_BUFFER) {
                 availability = true;
             }
-            return new AvailabilityWithBacklog(availability, getSubpartitionBacklog());
+
+            int backlog = getSubpartitionBacklog();
+            if (backlog == 0) {
+                needNotify = true;
+            }
+            return new AvailabilityWithBacklog(availability, backlog);
         }
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
index 659984f2f0a..b81373ac11e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -271,6 +271,24 @@ class HsSubpartitionViewTest {
         assertThat(notifyAvailableFuture).isNotCompleted();
     }
 
+    @Test
+    void testGetZeroBacklogNeedNotify() {
+        CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>();
+        HsSubpartitionView subpartitionView =
+                createSubpartitionView(() -> notifyAvailableFuture.complete(null));
+        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+        subpartitionView.setDiskDataView(
+                TestingHsDataView.builder().setGetBacklogSupplier(() -> 0).build());
+
+        AvailabilityWithBacklog availabilityAndBacklog =
+                subpartitionView.getAvailabilityAndBacklog(0);
+        assertThat(availabilityAndBacklog.getBacklog()).isZero();
+
+        assertThat(notifyAvailableFuture).isNotCompleted();
+        subpartitionView.notifyDataAvailable();
+        assertThat(notifyAvailableFuture).isCompleted();
+    }
+
     @Test
     void testGetAvailabilityAndBacklogPositiveCredit() {
         HsSubpartitionView subpartitionView = createSubpartitionView();