You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/29 10:23:48 UTC
[flink] 03/03: [FLINK-13245][network] Remove redundant bookkeeping
for already canceled input channel IDs
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7116ab71edc183d34d128453e06a3efc15ad8905
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Fri Jul 26 11:50:55 2019 +0200
[FLINK-13245][network] Remove redundant bookkeeping for already canceled input channel IDs
---
.../runtime/io/network/netty/PartitionRequestQueue.java | 16 ----------------
1 file changed, 16 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index b492ea6..4a845d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
@@ -40,7 +39,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayDeque;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -62,8 +60,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
/** All the readers created for the consumers' partition requests. */
private final ConcurrentMap<InputChannelID, NetworkSequenceViewReader> allReaders = new ConcurrentHashMap<>();
- private final Set<InputChannelID> released = Sets.newHashSet();
-
private boolean fatalError;
private ChannelHandlerContext ctx;
@@ -175,9 +171,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
} else if (msg.getClass() == InputChannelID.class) {
// Release partition view that get a cancel request.
InputChannelID toCancel = (InputChannelID) msg;
- if (released.contains(toCancel)) {
- return;
- }
// remove reader from queue of available readers
availableReaders.removeIf(reader -> reader.getReceiverId().equals(toCancel));
@@ -222,7 +215,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
if (!reader.isReleased()) {
continue;
}
- markAsReleased(reader.getReceiverId());
Throwable cause = reader.getFailureCause();
if (cause != null) {
@@ -312,14 +304,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
reader.notifySubpartitionConsumed();
reader.setRegisteredAsAvailable(false);
reader.releaseAllResources();
- markAsReleased(reader.getReceiverId());
- }
-
- /**
- * Marks a receiver as released.
- */
- private void markAsReleased(InputChannelID receiverId) {
- released.add(receiverId);
}
// This listener is called after an element of the current nonEmptyReader has been