You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/06/08 07:15:25 UTC
flink git commit: [FLINK-2177] [runtime] Fix possible NPE when
closing Netty channel, before it is active
Repository: flink
Updated Branches:
refs/heads/master 58b9a3772 -> d433ba9f0
[FLINK-2177] [runtime] Fix possible NPE when closing Netty channel, before it is active
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d433ba9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d433ba9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d433ba9f
Branch: refs/heads/master
Commit: d433ba9f032e5361ae894562b7a8be13cd3efe13
Parents: 58b9a37
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Jun 8 07:14:07 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jun 8 07:14:07 2015 +0200
----------------------------------------------------------------------
.../netty/PartitionRequestClientHandler.java | 6 +++++-
.../netty/PartitionRequestClientHandlerTest.java | 16 ++++++++++++++++
2 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d433ba9f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 508cac9..51b436b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -66,7 +66,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
*/
private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = Maps.newConcurrentMap();
- private ChannelHandlerContext ctx;
+ private volatile ChannelHandlerContext ctx;
// ------------------------------------------------------------------------
// Input channel/receiver registration
@@ -85,6 +85,10 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
}
void cancelRequestFor(InputChannelID inputChannelId) {
+ if (inputChannelId == null || ctx == null) {
+ return;
+ }
+
if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) {
ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelId));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d433ba9f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index b8e9f25..2c08cc5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -142,6 +142,22 @@ public class PartitionRequestClientHandlerTest {
verify(inputChannel, times(1)).onFailedPartitionRequest();
}
+ @Test
+ public void testCancelBeforeActive() throws Exception {
+
+ final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
+ when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID());
+
+ final PartitionRequestClientHandler client = new PartitionRequestClientHandler();
+ client.addInputChannel(inputChannel);
+
+ // Don't throw NPE
+ client.cancelRequestFor(null);
+
+ // Don't throw NPE, because channel is not active yet
+ client.cancelRequestFor(inputChannel.getInputChannelId());
+ }
+
// ---------------------------------------------------------------------------------------------
/**