You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/17 16:06:48 UTC

flink git commit: [FLINK-4387] [runtime] Don't wait on termination future on KvStateServer shutdown

Repository: flink
Updated Branches:
  refs/heads/master 7ce42c2e7 -> f79168052


[FLINK-4387] [runtime] Don't wait on termination future on KvStateServer shutdown

Due to a bug in Netty that was only fixed in 4.0.33.Final it can
happen that waiting on the termination future never succeeds.

Netty issue: https://github.com/netty/netty/issues/4357


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7916805
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7916805
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7916805

Branch: refs/heads/master
Commit: f79168052c776fe36dc2678d63ffccf715403753
Parents: 7ce42c2
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Aug 17 18:04:40 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Aug 17 18:06:21 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/query/netty/KvStateClient.java  | 3 ++-
 .../org/apache/flink/runtime/query/netty/KvStateServer.java  | 8 +-------
 .../apache/flink/runtime/query/netty/KvStateClientTest.java  | 2 --
 3 files changed, 3 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f7916805/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
index 6cfe86b..01093ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
@@ -46,6 +46,7 @@ import java.util.ArrayDeque;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -195,7 +196,7 @@ public class KvStateClient {
 			if (bootstrap != null) {
 				EventLoopGroup group = bootstrap.group();
 				if (group != null) {
-					group.shutdownGracefully();
+					group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7916805/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
index 0c0c5ec..4787390 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
@@ -29,7 +29,6 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.stream.ChunkedWriteHandler;
-import io.netty.util.concurrent.Future;
 import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServerAddress;
@@ -180,12 +179,7 @@ public class KvStateServer {
 		if (bootstrap != null) {
 			EventLoopGroup group = bootstrap.group();
 			if (group != null) {
-				Future<?> shutDownFuture = group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
-				try {
-					shutDownFuture.await();
-				} catch (InterruptedException e) {
-					LOG.error("Interrupted during shut down", e);
-				}
+				group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7916805/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index 4c42318..ac03f94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -44,7 +44,6 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemValueState;
 import org.apache.flink.util.NetUtils;
 import org.junit.AfterClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -516,7 +515,6 @@ public class KvStateClientTest {
 	 * that all ongoing requests are failed.
 	 */
 	@Test
-	@Ignore
 	public void testClientServerIntegration() throws Exception {
 		// Config
 		final int numServers = 2;