You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yu...@apache.org on 2021/11/01 11:13:30 UTC
[flink] branch master updated: [FLINK-24671][runtime] Return 0
buffers in use until subpartition view initialization in order to avoid NPE
This is an automated email from the ASF dual-hosted git repository.
yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f91bd77 [FLINK-24671][runtime] Return 0 buffers in use until subpartition view initialization in order to avoid NPE
f91bd77 is described below
commit f91bd772de866a48d65dfcb31d4ef0d1ef2c001e
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Thu Oct 28 12:15:48 2021 +0200
[FLINK-24671][runtime] Return 0 buffers in use until subpartition view initialization in order to avoid NPE
---
.../partition/consumer/LocalInputChannel.java | 3 ++-
.../partition/consumer/LocalInputChannelTest.java | 25 ++++++++++++++++++++++
2 files changed, 27 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index ba1c075..120ffe9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -346,7 +346,8 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
@Override
int getBuffersInUseCount() {
- return subpartitionView.getNumberOfQueuedBuffers();
+ ResultSubpartitionView view = this.subpartitionView;
+ return view == null ? 0 : view.getNumberOfQueuedBuffers();
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index f3f33b8..991ef74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -643,6 +643,31 @@ public class LocalInputChannelTest {
assertEquals(20, subpartition1.add(createFilledFinishedBufferConsumer(16)));
}
+ @Test
+ public void testReceivingBuffersInUseBeforeSubpartitionViewInitialization() throws Exception {
+ // given: Local input channel without initialized subpartition view.
+ ResultSubpartitionView subpartitionView =
+ createResultSubpartitionView(
+ createFilledFinishedBufferConsumer(4096),
+ createFilledFinishedBufferConsumer(4096),
+ createFilledFinishedBufferConsumer(4096));
+ TestingResultPartitionManager partitionManager =
+ new TestingResultPartitionManager(subpartitionView);
+ final SingleInputGate inputGate = createSingleInputGate(1);
+ final LocalInputChannel localChannel = createLocalInputChannel(inputGate, partitionManager);
+
+ inputGate.setInputChannels(localChannel);
+
+ // then: Buffers in use should be equal to 0 until subpartition view initialization.
+ assertEquals(0, localChannel.getBuffersInUseCount());
+
+ // when: The subpartition view is initialized.
+ localChannel.requestSubpartition(0);
+
+ // then: Buffers in use should show correct value.
+ assertEquals(3, localChannel.getBuffersInUseCount());
+ }
+
// ---------------------------------------------------------------------------------------------
private static ResultSubpartitionView createResultSubpartitionView(boolean addBuffer)