You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/29 10:23:48 UTC

[flink] 03/03: [FLINK-13245][network] Remove redundant bookkeeping for already canceled input channel IDs

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7116ab71edc183d34d128453e06a3efc15ad8905
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Fri Jul 26 11:50:55 2019 +0200

    [FLINK-13245][network] Remove redundant bookkeeping for already canceled input channel IDs
---
 .../runtime/io/network/netty/PartitionRequestQueue.java  | 16 ----------------
 1 file changed, 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index b492ea6..4a845d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
@@ -40,7 +39,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -62,8 +60,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 	/** All the readers created for the consumers' partition requests. */
 	private final ConcurrentMap<InputChannelID, NetworkSequenceViewReader> allReaders = new ConcurrentHashMap<>();
 
-	private final Set<InputChannelID> released = Sets.newHashSet();
-
 	private boolean fatalError;
 
 	private ChannelHandlerContext ctx;
@@ -175,9 +171,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		} else if (msg.getClass() == InputChannelID.class) {
 			// Release partition view that get a cancel request.
 			InputChannelID toCancel = (InputChannelID) msg;
-			if (released.contains(toCancel)) {
-				return;
-			}
 
 			// remove reader from queue of available readers
 			availableReaders.removeIf(reader -> reader.getReceiverId().equals(toCancel));
@@ -222,7 +215,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 					if (!reader.isReleased()) {
 						continue;
 					}
-					markAsReleased(reader.getReceiverId());
 
 					Throwable cause = reader.getFailureCause();
 					if (cause != null) {
@@ -312,14 +304,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		reader.notifySubpartitionConsumed();
 		reader.setRegisteredAsAvailable(false);
 		reader.releaseAllResources();
-		markAsReleased(reader.getReceiverId());
-	}
-
-	/**
-	 * Marks a receiver as released.
-	 */
-	private void markAsReleased(InputChannelID receiverId) {
-		released.add(receiverId);
 	}
 
 	// This listener is called after an element of the current nonEmptyReader has been