You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/01 17:01:28 UTC
[flink] 04/07: [FLINK-21104][network] Do not enqueue released
channels into the input gate.
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1f29d03cb3a47037a19bd01d626a83bb3f656a7a
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Jan 26 15:55:40 2021 +0100
[FLINK-21104][network] Do not enqueue released channels into the input gate.
---
.../io/network/partition/consumer/SingleInputGate.java | 6 ++++++
.../network/partition/consumer/SingleInputGateTest.java | 17 +++++++++++++++++
.../io/network/partition/consumer/TestInputChannel.java | 7 ++++++-
3 files changed, 29 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index c49ff1c..12070a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -853,6 +853,12 @@ public class SingleInputGate extends IndexedInputGate {
return;
}
+ if (channel.isReleased()) {
+ // when channel is closed, EndOfPartitionEvent is send and a final notification
+ // if EndOfPartitionEvent causes a release, we must ignore the notification
+ return;
+ }
+
if (!queueChannelUnsafe(channel, priority)) {
return;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 924211c..1e136ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
@@ -283,6 +284,22 @@ public class SingleInputGateTest extends InputGateTestBase {
}
@Test
+ public void testNotifyAfterEndOfPartition() throws Exception {
+ final SingleInputGate inputGate = createInputGate(2);
+ TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
+ inputGate.setInputChannels(inputChannel, new TestInputChannel(inputGate, 1));
+
+ inputChannel.readEndOfPartitionEvent();
+ inputChannel.notifyChannelNonEmpty();
+ assertEquals(EndOfPartitionEvent.INSTANCE, inputGate.pollNext().get().getEvent());
+
+ // gate is still active because of secondary channel
+ // test if released channel is enqueued
+ inputChannel.notifyChannelNonEmpty();
+ assertFalse(inputGate.pollNext().isPresent());
+ }
+
+ @Test
public void testIsAvailable() throws Exception {
final SingleInputGate inputGate = createInputGate(1);
TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index a17ac7b..5eafb27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.fail;
/** A mocked input channel. */
@@ -149,6 +150,8 @@ public class TestInputChannel extends InputChannel {
@Override
Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException {
+ checkState(!isReleased);
+
BufferAndAvailabilityProvider provider = buffers.poll();
if (provider != null) {
@@ -178,7 +181,9 @@ public class TestInputChannel extends InputChannel {
}
@Override
- void releaseAllResources() throws IOException {}
+ void releaseAllResources() throws IOException {
+ isReleased = true;
+ }
@Override
public void resumeConsumption() {