You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/17 16:06:48 UTC
flink git commit: [FLINK-4387] [runtime] Don't wait on termination
future on KvStateServer shutdown
Repository: flink
Updated Branches:
refs/heads/master 7ce42c2e7 -> f79168052
[FLINK-4387] [runtime] Don't wait on termination future on KvStateServer shutdown
Due to a bug in Netty that was only fixed in 4.0.33.Final it can
happen that waiting on the termination future never succeeds.
Netty issue: https://github.com/netty/netty/issues/4357
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7916805
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7916805
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7916805
Branch: refs/heads/master
Commit: f79168052c776fe36dc2678d63ffccf715403753
Parents: 7ce42c2
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Aug 17 18:04:40 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Aug 17 18:06:21 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/runtime/query/netty/KvStateClient.java | 3 ++-
.../org/apache/flink/runtime/query/netty/KvStateServer.java | 8 +-------
.../apache/flink/runtime/query/netty/KvStateClientTest.java | 2 --
3 files changed, 3 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f7916805/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
index 6cfe86b..01093ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
@@ -46,6 +46,7 @@ import java.util.ArrayDeque;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -195,7 +196,7 @@ public class KvStateClient {
if (bootstrap != null) {
EventLoopGroup group = bootstrap.group();
if (group != null) {
- group.shutdownGracefully();
+ group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7916805/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
index 0c0c5ec..4787390 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
@@ -29,7 +29,6 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.stream.ChunkedWriteHandler;
-import io.netty.util.concurrent.Future;
import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServerAddress;
@@ -180,12 +179,7 @@ public class KvStateServer {
if (bootstrap != null) {
EventLoopGroup group = bootstrap.group();
if (group != null) {
- Future<?> shutDownFuture = group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
- try {
- shutDownFuture.await();
- } catch (InterruptedException e) {
- LOG.error("Interrupted during shut down", e);
- }
+ group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7916805/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index 4c42318..ac03f94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -44,7 +44,6 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemValueState;
import org.apache.flink.util.NetUtils;
import org.junit.AfterClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -516,7 +515,6 @@ public class KvStateClientTest {
* that all ongoing requests are failed.
*/
@Test
- @Ignore
public void testClientServerIntegration() throws Exception {
// Config
final int numServers = 2;