You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/06/08 07:15:25 UTC

flink git commit: [FLINK-2177] [runtime] Fix possible NPE when closing Netty channel, before it is active

Repository: flink
Updated Branches:
  refs/heads/master 58b9a3772 -> d433ba9f0


[FLINK-2177] [runtime] Fix possible NPE when closing Netty channel, before it is active


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d433ba9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d433ba9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d433ba9f

Branch: refs/heads/master
Commit: d433ba9f032e5361ae894562b7a8be13cd3efe13
Parents: 58b9a37
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Jun 8 07:14:07 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jun 8 07:14:07 2015 +0200

----------------------------------------------------------------------
 .../netty/PartitionRequestClientHandler.java        |  6 +++++-
 .../netty/PartitionRequestClientHandlerTest.java    | 16 ++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d433ba9f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 508cac9..51b436b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -66,7 +66,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 	 */
 	private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = Maps.newConcurrentMap();
 
-	private ChannelHandlerContext ctx;
+	private volatile ChannelHandlerContext ctx;
 
 	// ------------------------------------------------------------------------
 	// Input channel/receiver registration
@@ -85,6 +85,10 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 	}
 
 	void cancelRequestFor(InputChannelID inputChannelId) {
+		if (inputChannelId == null || ctx == null) {
+			return;
+		}
+
 		if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) {
 			ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelId));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d433ba9f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index b8e9f25..2c08cc5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -142,6 +142,22 @@ public class PartitionRequestClientHandlerTest {
 		verify(inputChannel, times(1)).onFailedPartitionRequest();
 	}
 
+	@Test
+	public void testCancelBeforeActive() throws Exception {
+
+		final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
+		when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID());
+
+		final PartitionRequestClientHandler client = new PartitionRequestClientHandler();
+		client.addInputChannel(inputChannel);
+
+		// Don't throw NPE
+		client.cancelRequestFor(null);
+
+		// Don't throw NPE, because channel is not active yet
+		client.cancelRequestFor(inputChannel.getInputChannelId());
+	}
+
 	// ---------------------------------------------------------------------------------------------
 
 	/**