You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/11/10 13:10:50 UTC
[flink] branch release-1.14 updated: [FLINK-24738][runtime]
Ignoring buffer size announcement if the channel is released already
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new d1a3ad0 [FLINK-24738][runtime] Ignoring buffer size announcement if the channel is released already
d1a3ad0 is described below
commit d1a3ad01a96d2d565c5311e4d786917d0c71e8af
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Nov 3 16:13:42 2021 +0100
[FLINK-24738][runtime] Ignoring buffer size announcement if the channel is released already
---
.../partition/consumer/LocalInputChannel.java | 6 ++-
.../partition/consumer/RemoteInputChannel.java | 1 +
.../partition/consumer/SingleInputGate.java | 4 +-
.../network/partition/InputChannelTestUtils.java | 28 +++++++++++
.../partition/consumer/LocalInputChannelTest.java | 57 +++++++++++-----------
.../partition/consumer/RemoteInputChannelTest.java | 9 ++++
.../partition/consumer/SingleInputGateTest.java | 33 +++++++++++++
.../tasks/bufferdebloat/BufferDebloater.java | 4 +-
.../tasks/bufferdebloat/BufferDebloaterTest.java | 2 +-
9 files changed, 111 insertions(+), 33 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 120ffe9..13c6538 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -340,8 +340,10 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
void announceBufferSize(int newBufferSize) {
checkState(!isReleased, "Channel released.");
- ResultSubpartitionView subpartitionView = checkNotNull(this.subpartitionView);
- subpartitionView.notifyNewBufferSize(newBufferSize);
+ ResultSubpartitionView view = this.subpartitionView;
+ if (view != null) {
+ view.notifyNewBufferSize(newBufferSize);
+ }
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 07de42c..90a5da2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -324,6 +324,7 @@ public class RemoteInputChannel extends InputChannel {
}
private void notifyNewBufferSize(int newBufferSize) throws IOException {
+ checkState(!isReleased.get(), "Channel released.");
checkPartitionRequestQueueInitialized();
partitionRequestClient.notifyNewBufferSize(this, newBufferSize);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 7652c20..c4584da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -386,7 +386,9 @@ public class SingleInputGate extends IndexedInputGate {
@Override
public void announceBufferSize(int newBufferSize) {
for (InputChannel channel : channels) {
- channel.announceBufferSize(newBufferSize);
+ if (!channel.isReleased()) {
+ channel.announceBufferSize(newBufferSize);
+ }
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index c23f215..8abcdd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
@@ -36,10 +38,12 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Consumer;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
@@ -215,6 +219,30 @@ public class InputChannelTestUtils {
/** This class is not meant to be instantiated. */
private InputChannelTestUtils() {}
+ public static ResultSubpartitionView createResultSubpartitionView(boolean addBuffer)
+ throws IOException {
+ return addBuffer
+ ? createResultSubpartitionView(createFilledFinishedBufferConsumer(4096))
+ : createResultSubpartitionView();
+ }
+
+ public static ResultSubpartitionView createResultSubpartitionView(BufferConsumer... buffers)
+ throws IOException {
+ int bufferSize = 4096;
+ PipelinedResultPartition parent =
+ (PipelinedResultPartition)
+ PartitionTestUtils.createPartition(
+ ResultPartitionType.PIPELINED,
+ NoOpFileChannelManager.INSTANCE,
+ true,
+ bufferSize);
+ ResultSubpartition subpartition = parent.getAllPartitions()[0];
+ for (BufferConsumer buffer : buffers) {
+ subpartition.add(buffer);
+ }
+ return subpartition.createReadView(() -> {});
+ }
+
/** Test stub for {@link MemorySegmentProvider}. */
public static class StubMemorySegmentProvider implements MemorySegmentProvider {
private static final MemorySegmentProvider INSTANCE = new StubMemorySegmentProvider();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 991ef74..46c005e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
@@ -71,6 +72,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
@@ -118,7 +120,8 @@ public class LocalInputChannelTest {
InputChannelBuilder.newBuilder()
.setPartitionManager(
new TestingResultPartitionManager(
- createResultSubpartitionView(barrierHolder, data)))
+ InputChannelTestUtils.createResultSubpartitionView(
+ barrierHolder, data)))
.setStateWriter(stateWriter)
.buildLocalChannel(new SingleInputGateBuilder().build());
channel.requestSubpartition(0);
@@ -474,7 +477,8 @@ public class LocalInputChannelTest {
*/
@Test
public void testGetNextAfterPartitionReleased() throws Exception {
- ResultSubpartitionView subpartitionView = createResultSubpartitionView(false);
+ ResultSubpartitionView subpartitionView =
+ InputChannelTestUtils.createResultSubpartitionView(false);
TestingResultPartitionManager partitionManager =
new TestingResultPartitionManager(subpartitionView);
LocalInputChannel channel =
@@ -499,7 +503,8 @@ public class LocalInputChannelTest {
/** Verifies that buffer is not compressed when getting from a {@link LocalInputChannel}. */
@Test
public void testGetBufferFromLocalChannelWhenCompressionEnabled() throws Exception {
- ResultSubpartitionView subpartitionView = createResultSubpartitionView(true);
+ ResultSubpartitionView subpartitionView =
+ InputChannelTestUtils.createResultSubpartitionView(true);
TestingResultPartitionManager partitionManager =
new TestingResultPartitionManager(subpartitionView);
LocalInputChannel channel =
@@ -523,6 +528,26 @@ public class LocalInputChannelTest {
localChannel.resumeConsumption();
}
+ @Test(expected = IllegalStateException.class)
+ public void testAnnounceBufferSize() throws Exception {
+ // given: Initialized local input channel.
+ AtomicInteger lastBufferSize = new AtomicInteger(0);
+ TestingResultPartitionManager partitionManager =
+ new TestingResultPartitionManager(
+ InputChannelTestUtils.createResultSubpartitionView(true));
+ SingleInputGate inputGate = createSingleInputGate(1);
+ LocalInputChannel localChannel = createLocalInputChannel(inputGate, partitionManager);
+ localChannel.requestSubpartition(0);
+
+ localChannel.announceBufferSize(10);
+
+ // when: Release all resources.
+ localChannel.releaseAllResources();
+
+ // then: Announcement buffer size should lead to exception.
+ localChannel.announceBufferSize(12);
+ }
+
@Test
public void testEnqueueAvailableChannelWhenResuming() throws IOException, InterruptedException {
PipelinedResultPartition parent =
@@ -647,7 +672,7 @@ public class LocalInputChannelTest {
public void testReceivingBuffersInUseBeforeSubpartitionViewInitialization() throws Exception {
// given: Local input channel without initialized subpartition view.
ResultSubpartitionView subpartitionView =
- createResultSubpartitionView(
+ InputChannelTestUtils.createResultSubpartitionView(
createFilledFinishedBufferConsumer(4096),
createFilledFinishedBufferConsumer(4096),
createFilledFinishedBufferConsumer(4096));
@@ -670,30 +695,6 @@ public class LocalInputChannelTest {
// ---------------------------------------------------------------------------------------------
- private static ResultSubpartitionView createResultSubpartitionView(boolean addBuffer)
- throws IOException {
- return addBuffer
- ? createResultSubpartitionView(createFilledFinishedBufferConsumer(4096))
- : createResultSubpartitionView();
- }
-
- private static ResultSubpartitionView createResultSubpartitionView(BufferConsumer... buffers)
- throws IOException {
- int bufferSize = 4096;
- PipelinedResultPartition parent =
- (PipelinedResultPartition)
- PartitionTestUtils.createPartition(
- ResultPartitionType.PIPELINED,
- NoOpFileChannelManager.INSTANCE,
- true,
- bufferSize);
- ResultSubpartition subpartition = parent.getAllPartitions()[0];
- for (BufferConsumer buffer : buffers) {
- subpartition.add(buffer);
- }
- return subpartition.createReadView(() -> {});
- }
-
/** Returns the configured number of buffers for each channel in a random order. */
private static class TestPartitionProducerBufferSource implements TestProducerSource {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index a098f24..9c0418b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -1408,6 +1408,15 @@ public class RemoteInputChannelTest {
remoteChannel.resumeConsumption();
}
+ @Test(expected = IllegalStateException.class)
+ public void testReleasedChannelAnnounceBufferSize() throws Exception {
+ SingleInputGate inputGate = createSingleInputGate(1);
+ RemoteInputChannel remoteChannel = createRemoteInputChannel(inputGate);
+
+ remoteChannel.releaseAllResources();
+ remoteChannel.announceBufferSize(10);
+ }
+
@Test
public void testOnUpstreamBlockedAndResumed() throws Exception {
BufferPool bufferPool = new TestBufferPool();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index a14d499..09e3bc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -78,6 +78,8 @@ import static java.util.Arrays.asList;
import static org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedNoTimeout;
import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
+import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
+import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultSubpartitionView;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.apache.flink.runtime.io.network.partition.InputGateFairnessTest.setupInputGate;
import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
@@ -854,6 +856,37 @@ public class SingleInputGateTest extends InputGateTestBase {
}
@Test
+ public void testAnnounceBufferSize() throws Exception {
+ final SingleInputGate inputGate = createSingleInputGate(2);
+ final LocalInputChannel localChannel =
+ createLocalInputChannel(
+ inputGate,
+ new TestingResultPartitionManager(createResultSubpartitionView()));
+ RemoteInputChannel remoteInputChannel = createRemoteInputChannel(inputGate, 1);
+
+ inputGate.setInputChannels(localChannel, remoteInputChannel);
+ inputGate.requestPartitions();
+
+ inputGate.announceBufferSize(10);
+
+ // Release all channels and gate one by one.
+
+ localChannel.releaseAllResources();
+
+ inputGate.announceBufferSize(11);
+
+ remoteInputChannel.releaseAllResources();
+
+ inputGate.announceBufferSize(12);
+
+ inputGate.close();
+
+ inputGate.announceBufferSize(13);
+
+ // No exceptions should happen.
+ }
+
+ @Test
public void testInputGateRemovalFromNettyShuffleEnvironment() throws Exception {
NettyShuffleEnvironment network = createNettyShuffleEnvironment();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java
index 5672db9..d1b3dab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java
@@ -96,7 +96,9 @@ public class BufferDebloater {
lastBufferSize = newSize;
for (IndexedInputGate inputGate : inputGates) {
- inputGate.announceBufferSize(newSize);
+ if (!inputGate.isFinished()) {
+ inputGate.announceBufferSize(newSize);
+ }
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java
index dc1dd2d..1fe7b53 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java
@@ -218,7 +218,7 @@ public class BufferDebloaterTest extends TestLogger {
public TestBufferSizeInputGate(int bufferInUseCount) {
// Number of channels don't make sense here because
- super(1, Collections.emptyList());
+ super(1, Collections.emptyList(), false);
this.bufferInUseCount = bufferInUseCount;
}