You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/01 17:01:28 UTC

[flink] 04/07: [FLINK-21104][network] Do not enqueue released channels into the input gate.

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f29d03cb3a47037a19bd01d626a83bb3f656a7a
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Jan 26 15:55:40 2021 +0100

    [FLINK-21104][network] Do not enqueue released channels into the input gate.
---
 .../io/network/partition/consumer/SingleInputGate.java  |  6 ++++++
 .../network/partition/consumer/SingleInputGateTest.java | 17 +++++++++++++++++
 .../io/network/partition/consumer/TestInputChannel.java |  7 ++++++-
 3 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index c49ff1c..12070a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -853,6 +853,12 @@ public class SingleInputGate extends IndexedInputGate {
                     return;
                 }
 
+                if (channel.isReleased()) {
+                    // when channel is closed, EndOfPartitionEvent is send and a final notification
+                    // if EndOfPartitionEvent causes a release, we must ignore the notification
+                    return;
+                }
+
                 if (!queueChannelUnsafe(channel, priority)) {
                     return;
                 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 924211c..1e136ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
@@ -283,6 +284,22 @@ public class SingleInputGateTest extends InputGateTestBase {
     }
 
     @Test
+    public void testNotifyAfterEndOfPartition() throws Exception {
+        final SingleInputGate inputGate = createInputGate(2);
+        TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
+        inputGate.setInputChannels(inputChannel, new TestInputChannel(inputGate, 1));
+
+        inputChannel.readEndOfPartitionEvent();
+        inputChannel.notifyChannelNonEmpty();
+        assertEquals(EndOfPartitionEvent.INSTANCE, inputGate.pollNext().get().getEvent());
+
+        // gate is still active because of secondary channel
+        // test if released channel is enqueued
+        inputChannel.notifyChannelNonEmpty();
+        assertFalse(inputGate.pollNext().isPresent());
+    }
+
+    @Test
     public void testIsAvailable() throws Exception {
         final SingleInputGate inputGate = createInputGate(1);
         TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index a17ac7b..5eafb27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.fail;
 
 /** A mocked input channel. */
@@ -149,6 +150,8 @@ public class TestInputChannel extends InputChannel {
 
     @Override
     Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException {
+        checkState(!isReleased);
+
         BufferAndAvailabilityProvider provider = buffers.poll();
 
         if (provider != null) {
@@ -178,7 +181,9 @@ public class TestInputChannel extends InputChannel {
     }
 
     @Override
-    void releaseAllResources() throws IOException {}
+    void releaseAllResources() throws IOException {
+        isReleased = true;
+    }
 
     @Override
     public void resumeConsumption() {