You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yu...@apache.org on 2021/11/02 09:56:19 UTC

[flink] branch release-1.14 updated: [FLINK-24671][runtime] Return 0 buffers in use until subpartition view initialization in order to avoid NPE

This is an automated email from the ASF dual-hosted git repository.

yuanmei pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new c629c5e  [FLINK-24671][runtime] Return 0 buffers in use until subpartition view initialization in order to avoid NPE
c629c5e is described below

commit c629c5e5339f60ce761edd8d01ff700474b7c89e
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Thu Oct 28 12:15:48 2021 +0200

    [FLINK-24671][runtime] Return 0 buffers in use until subpartition view initialization in order to avoid NPE
    
    (cherry picked from commit f91bd772de866a48d65dfcb31d4ef0d1ef2c001e)
---
 .../partition/consumer/LocalInputChannel.java      |  3 ++-
 .../partition/consumer/LocalInputChannelTest.java  | 25 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index ba1c075..120ffe9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -346,7 +346,8 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 
     @Override
     int getBuffersInUseCount() {
-        return subpartitionView.getNumberOfQueuedBuffers();
+        ResultSubpartitionView view = this.subpartitionView;
+        return view == null ? 0 : view.getNumberOfQueuedBuffers();
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index f3f33b8..991ef74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -643,6 +643,31 @@ public class LocalInputChannelTest {
         assertEquals(20, subpartition1.add(createFilledFinishedBufferConsumer(16)));
     }
 
+    @Test
+    public void testReceivingBuffersInUseBeforeSubpartitionViewInitialization() throws Exception {
+        // given: Local input channel without initialized subpartition view.
+        ResultSubpartitionView subpartitionView =
+                createResultSubpartitionView(
+                        createFilledFinishedBufferConsumer(4096),
+                        createFilledFinishedBufferConsumer(4096),
+                        createFilledFinishedBufferConsumer(4096));
+        TestingResultPartitionManager partitionManager =
+                new TestingResultPartitionManager(subpartitionView);
+        final SingleInputGate inputGate = createSingleInputGate(1);
+        final LocalInputChannel localChannel = createLocalInputChannel(inputGate, partitionManager);
+
+        inputGate.setInputChannels(localChannel);
+
+        // then: Buffers in use should be equal to 0 until subpartition view initialization.
+        assertEquals(0, localChannel.getBuffersInUseCount());
+
+        // when: The subpartition view is initialized.
+        localChannel.requestSubpartition(0);
+
+        // then: Buffers in use should show correct value.
+        assertEquals(3, localChannel.getBuffersInUseCount());
+    }
+
     // ---------------------------------------------------------------------------------------------
 
     private static ResultSubpartitionView createResultSubpartitionView(boolean addBuffer)