You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/12/06 15:11:48 UTC

[1/3] flink git commit: [FLINK-7975][QS] Wait for QS client to shutdown.

Repository: flink
Updated Branches:
  refs/heads/master 5221a70e0 -> a3fd548e9


[FLINK-7975][QS] Wait for QS client to shutdown.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5760677b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5760677b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5760677b

Branch: refs/heads/master
Commit: 5760677b3bb26245ca4816548833da0257ec7c7a
Parents: 5221a70
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 9 19:21:43 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Wed Dec 6 14:33:16 2017 +0100

----------------------------------------------------------------------
 .../client/QueryableStateClient.java            |  30 +++-
 .../flink/queryablestate/network/Client.java    | 171 +++++++++++++------
 .../queryablestate/network/ClientTest.java      |  88 ++++++++--
 .../query/AbstractQueryableStateOperator.java   |   2 +
 4 files changed, 215 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5760677b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 7abf6bc..f1c69ed 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -108,9 +108,33 @@ public class QueryableStateClient {
 				new DisabledKvStateRequestStats());
 	}
 
-	/** Shuts down the client. */
-	public void shutdown() {
-		client.shutdown();
+	/**
+	 * Shuts down the client and returns a {@link CompletableFuture} that
+	 * will be completed when the shutdown process is completed.
+	 *
+	 * <p>If an exception is thrown for any reason, then the returned future
+	 * will be completed exceptionally with that exception.
+	 *
+	 * @return A {@link CompletableFuture} for further handling of the
+	 * shutdown result.
+	 */
+	public CompletableFuture<?> shutdownAndHandle() {
+		return client.shutdown();
+	}
+
+	/**
+	 * Shuts down the client and waits until shutdown is completed.
+	 *
+	 * <p>If an exception is thrown, a warning is logged containing
+	 * the exception message.
+	 */
+	public void shutdownAndWait() {
+		try {
+			client.shutdown().get();
+			LOG.info("The Queryable State Client was shutdown successfully.");
+		} catch (Exception e) {
+			LOG.warn("The Queryable State Client shutdown failed: ", e);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5760677b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index 12286fa..364f835 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -42,15 +42,19 @@ import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChann
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -64,6 +68,8 @@ import java.util.concurrent.atomic.AtomicReference;
 @Internal
 public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 
+	private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+
 	/** The name of the client. Used for logging and stack traces.*/
 	private final String clientName;
 
@@ -82,8 +88,8 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 	/** Pending connections. */
 	private final Map<InetSocketAddress, PendingConnection> pendingConnections = new ConcurrentHashMap<>();
 
-	/** Atomic shut down flag. */
-	private final AtomicBoolean shutDown = new AtomicBoolean();
+	/** Atomic shut down future. */
+	private final AtomicReference<CompletableFuture<Void>> clientShutdownFuture = new AtomicReference<>(null);
 
 	/**
 	 * Creates a client with the specified number of event loop threads.
@@ -133,7 +139,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 	}
 
 	public CompletableFuture<RESP> sendRequest(final InetSocketAddress serverAddress, final REQ request) {
-		if (shutDown.get()) {
+		if (clientShutdownFuture.get() != null) {
 			return FutureUtils.getFailedFuture(new IllegalStateException(clientName + " is already shut down."));
 		}
 
@@ -166,28 +172,57 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 	 * Shuts down the client and closes all connections.
 	 *
 	 * <p>After a call to this method, all returned futures will be failed.
+	 *
+	 * @return A {@link CompletableFuture} that will be completed when the shutdown process is done.
 	 */
-	public void shutdown() {
-		if (shutDown.compareAndSet(false, true)) {
+	public CompletableFuture<Void> shutdown() {
+		final CompletableFuture<Void> newShutdownFuture = new CompletableFuture<>();
+		if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) {
+
+			final List<CompletableFuture<Void>> connectionFutures = new ArrayList<>();
+
 			for (Map.Entry<InetSocketAddress, EstablishedConnection> conn : establishedConnections.entrySet()) {
 				if (establishedConnections.remove(conn.getKey(), conn.getValue())) {
-					conn.getValue().close();
+					connectionFutures.add(conn.getValue().close());
 				}
 			}
 
 			for (Map.Entry<InetSocketAddress, PendingConnection> conn : pendingConnections.entrySet()) {
 				if (pendingConnections.remove(conn.getKey()) != null) {
-					conn.getValue().close();
+					connectionFutures.add(conn.getValue().close());
 				}
 			}
 
-			if (bootstrap != null) {
-				EventLoopGroup group = bootstrap.group();
-				if (group != null) {
-					group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
+			CompletableFuture.allOf(
+					connectionFutures.toArray(new CompletableFuture<?>[connectionFutures.size()])
+			).whenComplete((result, throwable) -> {
+
+				if (throwable != null) {
+					LOG.warn("Problem while shutting down the connections at the {}: {}", clientName, throwable);
 				}
-			}
+
+				if (bootstrap != null) {
+					EventLoopGroup group = bootstrap.group();
+					if (group != null && !group.isShutdown()) {
+						group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS)
+								.addListener(finished -> {
+									if (finished.isSuccess()) {
+										newShutdownFuture.complete(null);
+									} else {
+										newShutdownFuture.completeExceptionally(finished.cause());
+									}
+								});
+					} else {
+						newShutdownFuture.complete(null);
+					}
+				} else {
+					newShutdownFuture.complete(null);
+				}
+			});
+
+			return newShutdownFuture;
 		}
+		return clientShutdownFuture.get();
 	}
 
 	/**
@@ -209,8 +244,8 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 		/** The established connection after the connect succeeds. */
 		private EstablishedConnection established;
 
-		/** Closed flag. */
-		private boolean closed;
+		/** Atomic shut down future. */
+		private final AtomicReference<CompletableFuture<Void>> connectionShutdownFuture = new AtomicReference<>(null);
 
 		/** Failure cause if something goes wrong. */
 		private Throwable failureCause;
@@ -250,7 +285,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 			synchronized (connectLock) {
 				if (failureCause != null) {
 					return FutureUtils.getFailedFuture(failureCause);
-				} else if (closed) {
+				} else if (connectionShutdownFuture.get() != null) {
 					return FutureUtils.getFailedFuture(new ClosedChannelException());
 				} else {
 					if (established != null) {
@@ -272,7 +307,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 		 */
 		private void handInChannel(Channel channel) {
 			synchronized (connectLock) {
-				if (closed || failureCause != null) {
+				if (connectionShutdownFuture.get() != null || failureCause != null) {
 					// Close the channel and we are done. Any queued requests
 					// are removed on the close/failure call and after that no
 					// new ones can be enqueued.
@@ -300,7 +335,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 					// Check shut down for possible race with shut down. We
 					// don't want any lingering connections after shut down,
 					// which can happen if we don't check this here.
-					if (shutDown.get()) {
+					if (clientShutdownFuture.get() != null) {
 						if (establishedConnections.remove(serverAddress, established)) {
 							established.close();
 						}
@@ -312,32 +347,40 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 		/**
 		 * Close the connecting channel with a ClosedChannelException.
 		 */
-		private void close() {
-			close(new ClosedChannelException());
+		private CompletableFuture<Void> close() {
+			return close(new ClosedChannelException());
 		}
 
 		/**
 		 * Close the connecting channel with an Exception (can be {@code null})
 		 * or forward to the established channel.
 		 */
-		private void close(Throwable cause) {
-			synchronized (connectLock) {
-				if (!closed) {
+		private CompletableFuture<Void> close(Throwable cause) {
+			CompletableFuture<Void> future = new CompletableFuture<>();
+			if (connectionShutdownFuture.compareAndSet(null, future)) {
+				synchronized (connectLock) {
 					if (failureCause == null) {
 						failureCause = cause;
 					}
 
 					if (established != null) {
-						established.close();
+						established.close().whenComplete((result, throwable) -> {
+							if (throwable != null) {
+								future.completeExceptionally(throwable);
+							} else {
+								future.complete(null);
+							}
+						});
 					} else {
 						PendingRequest pending;
 						while ((pending = queuedRequests.poll()) != null) {
 							pending.completeExceptionally(cause);
 						}
+						future.complete(null);
 					}
-					closed = true;
 				}
 			}
+			return connectionShutdownFuture.get();
 		}
 
 		@Override
@@ -347,7 +390,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 						"serverAddress=" + serverAddress +
 						", queuedRequests=" + queuedRequests.size() +
 						", established=" + (established != null) +
-						", closed=" + closed +
+						", closed=" + (connectionShutdownFuture.get() != null) +
 						'}';
 			}
 		}
@@ -383,8 +426,8 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 		/** Current request number used to assign unique request IDs. */
 		private final AtomicLong requestCount = new AtomicLong();
 
-		/** Reference to a failure that was reported by the channel. */
-		private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
+		/** Atomic shut down future. */
+		private final AtomicReference<CompletableFuture<Void>> connectionShutdownFuture = new AtomicReference<>(null);
 
 		/**
 		 * Creates an established connection with the given channel.
@@ -412,8 +455,8 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 		/**
 		 * Close the channel with a ClosedChannelException.
 		 */
-		void close() {
-			close(new ClosedChannelException());
+		CompletableFuture<Void> close() {
+			return close(new ClosedChannelException());
 		}
 
 		/**
@@ -422,20 +465,33 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 		 * @param cause The cause to close the channel with.
 		 * @return Channel close future
 		 */
-		private boolean close(Throwable cause) {
-			if (failureCause.compareAndSet(null, cause)) {
-				channel.close();
-				stats.reportInactiveConnection();
+		private CompletableFuture<Void> close(final Throwable cause) {
+			final CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
 
-				for (long requestId : pendingRequests.keySet()) {
-					TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
-					if (pending != null && pending.completeExceptionally(cause)) {
-						stats.reportFailedRequest();
+			if (connectionShutdownFuture.compareAndSet(null, shutdownFuture)) {
+				channel.close().addListener(finished -> {
+					stats.reportInactiveConnection();
+					for (long requestId : pendingRequests.keySet()) {
+						TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
+						if (pending != null && pending.completeExceptionally(cause)) {
+							stats.reportFailedRequest();
+						}
 					}
-				}
-				return true;
+
+					// when finishing, if netty successfully closes the channel, then the provided exception is used
+					// as the reason for the closing. If there was something wrong at the netty side, then that exception
+					// is prioritized over the provided one.
+					if (finished.isSuccess()) {
+						shutdownFuture.completeExceptionally(cause);
+					} else {
+						LOG.warn("Something went wrong when trying to close connection due to : ", cause);
+						shutdownFuture.completeExceptionally(finished.cause());
+					}
+				});
 			}
-			return false;
+
+			// in case we had a race condition, return the winner of the race.
+			return connectionShutdownFuture.get();
 		}
 
 		/**
@@ -464,16 +520,22 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 					}
 				});
 
-				// Check failure for possible race. We don't want any lingering
+				// Check for possible race. We don't want any lingering
 				// promises after a failure, which can happen if we don't check
 				// this here. Note that close is treated as a failure as well.
-				Throwable failure = failureCause.get();
-				if (failure != null) {
-					// Remove from pending requests to guard against concurrent
-					// removal and to make sure that we only count it once as failed.
+				CompletableFuture<Void> clShutdownFuture = clientShutdownFuture.get();
+				if (clShutdownFuture != null) {
 					TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
-					if (pending != null && pending.completeExceptionally(failure)) {
-						stats.reportFailedRequest();
+					if (pending != null) {
+						clShutdownFuture.whenComplete((ignored, throwable) -> {
+							if (throwable != null && pending.completeExceptionally(throwable)) {
+								stats.reportFailedRequest();
+							} else {
+								// the shutdown future is always completed exceptionally so we should not arrive here.
+								// but in any case, we complete the pending connection request exceptionally.
+								pending.completeExceptionally(new ClosedChannelException());
+							}
+						});
 					}
 				}
 			} catch (Throwable t) {
@@ -486,27 +548,25 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 		@Override
 		public void onRequestResult(long requestId, RESP response) {
 			TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
-			if (pending != null && pending.complete(response)) {
+			if (pending != null && !pending.isDone()) {
 				long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L;
 				stats.reportSuccessfulRequest(durationMillis);
+				pending.complete(response);
 			}
 		}
 
 		@Override
 		public void onRequestFailure(long requestId, Throwable cause) {
 			TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
-			if (pending != null && pending.completeExceptionally(cause)) {
+			if (pending != null && !pending.isDone()) {
 				stats.reportFailedRequest();
+				pending.completeExceptionally(cause);
 			}
 		}
 
 		@Override
 		public void onFailure(Throwable cause) {
-			if (close(cause)) {
-				// Remove from established channels, otherwise future
-				// requests will be handled by this failed channel.
-				establishedConnections.remove(serverAddress, this);
-			}
+			close(cause).handle((cancelled, ignored) -> establishedConnections.remove(serverAddress, this));
 		}
 
 		@Override
@@ -516,7 +576,6 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 					", channel=" + channel +
 					", pendingRequests=" + pendingRequests.size() +
 					", requestCount=" + requestCount +
-					", failureCause=" + failureCause +
 					'}';
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5760677b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 1fa4deb..8638efa 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
@@ -54,7 +55,9 @@ import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
-import org.junit.AfterClass;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,15 +98,20 @@ public class ClientTest {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class);
 
+	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(20L, TimeUnit.SECONDS);
+
 	// Thread pool for client bootstrap (shared between tests)
-	private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
+	private NioEventLoopGroup nioGroup;
 
-	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+	@Before
+	public void setUp() throws Exception {
+		nioGroup = new NioEventLoopGroup();
+	}
 
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (NIO_GROUP != null) {
-			NIO_GROUP.shutdownGracefully();
+	@After
+	public void tearDown() throws Exception {
+		if (nioGroup != null) {
+			nioGroup.shutdownGracefully();
 		}
 	}
 
@@ -218,7 +226,24 @@ public class ClientTest {
 			assertEquals(expectedRequests, stats.getNumFailed());
 		} finally {
 			if (client != null) {
-				client.shutdown();
+				Exception exc = null;
+				try {
+
+					// todo here we were seeing this problem:
+					// https://github.com/netty/netty/issues/4357 if we do a get().
+					// this is why we now simply wait a bit so that everything is
+					// shut down and then we check
+
+					client.shutdown().get(10L, TimeUnit.SECONDS);
+				} catch (Exception e) {
+					exc = e;
+					LOG.error("An exception occurred while shutting down netty.", e);
+				}
+
+				Assert.assertTrue(
+						ExceptionUtils.stringifyException(exc),
+						client.isEventGroupShutdown()
+				);
 			}
 
 			if (serverChannel != null) {
@@ -265,7 +290,12 @@ public class ClientTest {
 			}
 		} finally {
 			if (client != null) {
-				client.shutdown();
+				try {
+					client.shutdown().get(10L, TimeUnit.SECONDS);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+				Assert.assertTrue(client.isEventGroupShutdown());
 			}
 
 			assertEquals("Channel leak", 0L, stats.getNumConnections());
@@ -366,7 +396,12 @@ public class ClientTest {
 			}
 
 			if (client != null) {
-				client.shutdown();
+				try {
+					client.shutdown().get(10L, TimeUnit.SECONDS);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+				Assert.assertTrue(client.isEventGroupShutdown());
 			}
 
 			assertEquals("Channel leak", 0L, stats.getNumConnections());
@@ -467,7 +502,12 @@ public class ClientTest {
 			assertEquals(2L, stats.getNumFailed());
 		} finally {
 			if (client != null) {
-				client.shutdown();
+				try {
+					client.shutdown().get(10L, TimeUnit.SECONDS);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+				Assert.assertTrue(client.isEventGroupShutdown());
 			}
 
 			if (serverChannel != null) {
@@ -548,7 +588,12 @@ public class ClientTest {
 			assertEquals(1L, stats.getNumFailed());
 		} finally {
 			if (client != null) {
-				client.shutdown();
+				try {
+					client.shutdown().get(10L, TimeUnit.SECONDS);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+				Assert.assertTrue(client.isEventGroupShutdown());
 			}
 
 			if (serverChannel != null) {
@@ -661,7 +706,7 @@ public class ClientTest {
 					Collections.shuffle(random);
 
 					// Dispatch queries
-					List<Future<KvStateResponse>> futures = new ArrayList<>(batchSize);
+					List<CompletableFuture<KvStateResponse>> futures = new ArrayList<>(batchSize);
 
 					for (int j = 0; j < batchSize; j++) {
 						int targetServer = random.get(j) % numServers;
@@ -700,8 +745,12 @@ public class ClientTest {
 				LOG.info("Number of requests {}/100_000", numRequests);
 			}
 
-			// Shut down
-			client.shutdown();
+			try {
+				client.shutdown().get(10L, TimeUnit.SECONDS);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+			Assert.assertTrue(client.isEventGroupShutdown());
 
 			for (Future<Void> future : taskFutures) {
 				try {
@@ -739,7 +788,12 @@ public class ClientTest {
 			}
 		} finally {
 			if (client != null) {
-				client.shutdown();
+				try {
+					client.shutdown().get(10L, TimeUnit.SECONDS);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+				Assert.assertTrue(client.isEventGroupShutdown());
 			}
 
 			for (int i = 0; i < numServers; i++) {
@@ -761,7 +815,7 @@ public class ClientTest {
 				// Bind address and port
 				.localAddress(InetAddress.getLocalHost(), 0)
 				// NIO server channels
-				.group(NIO_GROUP)
+				.group(nioGroup)
 				.channel(NioServerSocketChannel.class)
 				// See initializer for pipeline details
 				.childHandler(new ChannelInitializer<SocketChannel>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/5760677b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
index 7522a61..5ca9c1e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
@@ -36,6 +36,8 @@ abstract class AbstractQueryableStateOperator<S extends State, IN>
 		extends AbstractStreamOperator<IN>
 		implements OneInputStreamOperator<IN, IN> {
 
+	private static final long serialVersionUID = 7842489558298787382L;
+
 	/** State descriptor for the queryable state instance. */
 	protected final StateDescriptor<? extends S, ?> stateDescriptor;
 


[3/3] flink git commit: [FLINK-7880][QS] Wait for proper resource cleanup after each ITCase.

Posted by kk...@apache.org.
[FLINK-7880][QS] Wait for proper resource cleanup after each ITCase.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3fd548e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3fd548e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3fd548e

Branch: refs/heads/master
Commit: a3fd548e9c76c67609bbf159d5fb743d756450b1
Parents: 74d052b
Author: kkloudas <kk...@gmail.com>
Authored: Wed Dec 6 14:32:46 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Wed Dec 6 14:33:28 2017 +0100

----------------------------------------------------------------------
 .../itcases/AbstractQueryableStateTestBase.java | 1073 ++++++++----------
 .../HAAbstractQueryableStateTestBase.java       |   22 +-
 .../HAQueryableStateRocksDBBackendITCase.java   |    2 -
 .../NonHAAbstractQueryableStateTestBase.java    |   11 +-
 ...NonHAQueryableStateRocksDBBackendITCase.java |    2 -
 5 files changed, 476 insertions(+), 634 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 65e9bb5..5a28367 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -73,6 +73,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -92,7 +93,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
 
-import scala.concurrent.Await;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
@@ -159,52 +159,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testQueryableState() throws Exception {
-		// Config
+
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final int numKeys = 256;
 
-		JobID jobId = null;
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-		try {
-			//
-			// Test program
-			//
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestKeyRangeSource(numKeys));
-
-			// Reducing state
-			ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
-					"any-name",
-					new SumReduce(),
-					source.getType());
-
-			final String queryName = "hakuna-matata";
-
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 7143749578983540352L;
-
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).asQueryableState(queryName, reducingState);
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestKeyRangeSource(numKeys));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			cluster.submitJobDetached(jobGraph);
+		ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+				"any-name", new SumReduce(), 	source.getType());
+
+		final String queryName = "hakuna-matata";
+
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 7143749578983540352L;
+
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).asQueryableState(queryName, reducingState);
 
-			//
-			// Start querying
-			//
-			jobId = jobGraph.getJobID();
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+			cluster.submitJobDetached(jobGraph);
 
 			final AtomicLongArray counts = new AtomicLongArray(numKeys);
 
@@ -261,16 +249,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 				long count = counts.get(i);
 				assertTrue("Count at position " + i + " is " + count, count > 0);
 			}
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -282,91 +260,94 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final int numKeys = 256;
 
-		JobID jobId = null;
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-		try {
-			//
-			// Test program
-			//
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestKeyRangeSource(numKeys));
-
-			// Reducing state
-			ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
-					"any-name",
-					new SumReduce(),
-					source.getType());
-
-			final String queryName = "duplicate-me";
-
-			final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
-					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						private static final long serialVersionUID = -4126824763829132959L;
-
-						@Override
-						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-							return value.f0;
-						}
-					}).asQueryableState(queryName, reducingState);
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestKeyRangeSource(numKeys));
 
-			final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate =
-					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						private static final long serialVersionUID = -6265024000462809436L;
+		// Reducing state
+		ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+				"any-name",
+				new SumReduce(),
+				source.getType());
 
-						@Override
-						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-							return value.f0;
-						}
-					}).asQueryableState(queryName);
+		final String queryName = "duplicate-me";
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+					private static final long serialVersionUID = -4126824763829132959L;
 
-			CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture = FutureUtils.toJava(
-					cluster.getLeaderGateway(deadline.timeLeft())
-							.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft())
-							.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+					@Override
+					public Integer getKey(Tuple2<Integer, Long> value) {
+						return value.f0;
+					}
+				}).asQueryableState(queryName, reducingState);
 
-			cluster.submitJobDetached(jobGraph);
+		final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate =
+				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+					private static final long serialVersionUID = -6265024000462809436L;
 
-			TestingJobManagerMessages.JobStatusIs jobStatus =
+					@Override
+					public Integer getKey(Tuple2<Integer, Long> value) {
+						return value.f0;
+					}
+				}).asQueryableState(queryName);
+
+		// Submit the job graph
+		final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+		final JobID jobId = jobGraph.getJobID();
+
+		final CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture =
+				notifyWhenJobStatusIs(jobId, JobStatus.FAILED, deadline);
+
+		final CompletableFuture<TestingJobManagerMessages.JobStatusIs> cancellationFuture =
+				notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline);
+
+		cluster.submitJobDetached(jobGraph);
+
+		try {
+			final TestingJobManagerMessages.JobStatusIs jobStatus =
 					failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
 			assertEquals(JobStatus.FAILED, jobStatus.state());
+		} catch (Exception e) {
+
+			// if the assertion fails, it means that the job was (falsely) not cancelled.
+			// in this case, and given that the mini-cluster is shared with other tests,
+			// we cancel the job and wait for the cancellation so that the resources are freed.
 
-			// Get the job and check the cause
-			JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
-					cluster.getLeaderGateway(deadline.timeLeft())
-							.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
-							.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
-					.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-			String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
-
-			assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
-			int causedByIndex = failureCause.indexOf("Caused by: ");
-			String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length());
-			assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException"));
-			assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName));
-		} finally {
-			// Free cluster resources
 			if (jobId != null) {
-				scala.concurrent.Future<CancellationSuccess> cancellation = cluster
-						.getLeaderGateway(deadline.timeLeft())
+				cluster.getLeaderGateway(deadline.timeLeft())
 						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class));
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
 
-				Await.ready(cancellation, deadline.timeLeft());
+				cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			}
+
+			// and we re-throw the exception.
+			throw e;
 		}
+
+		// Get the job and check the cause
+		JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
+				cluster.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
+				.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+		String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
+
+		assertEquals(JobStatus.FAILED, jobFound.executionGraph().getState());
+		assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
+		int causedByIndex = failureCause.indexOf("Caused by: ");
+		String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length());
+		assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException"));
+		assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName));
 	}
 
 	/**
@@ -377,55 +358,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testValueState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			// Value state
-			ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
-					"any",
-					source.getType());
-
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 7662520075515707428L;
-
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).asQueryableState("hakuna", valueState);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
 
-			cluster.submitJobDetached(jobGraph);
+		// Value state
+		ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>("any", source.getType());
 
-			executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 7662520075515707428L;
 
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
 			}
+		}).asQueryableState("hakuna", valueState);
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+			cluster.submitJobDetached(jobGraph);
+
+			executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
 		}
 	}
 
@@ -434,48 +400,36 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 * contains a wrong jobId or wrong queryable state name.
 	 */
 	@Test
+	@Ignore
 	public void testWrongJobIdAndWrongQueryableStateName() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			// Value state
-			ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
-					new ValueStateDescriptor<>("any", source.getType());
-
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 7662520075515707428L;
-
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).asQueryableState("hakuna", valueState);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+		ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>("any", source.getType());
 
-			CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture = FutureUtils.toJava(
-					cluster.getLeaderGateway(deadline.timeLeft())
-							.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft())
-							.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 7662520075515707428L;
 
-			cluster.submitJobDetached(jobGraph);
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).asQueryableState("hakuna", valueState);
+
+		try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(cluster, env, deadline)) {
+
+			// register to be notified when the job is running.
+			CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture =
+					notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline);
+
+			cluster.submitJobDetached(closableJobGraph.getJobGraph());
 
 			// expect for the job to be running
 			TestingJobManagerMessages.JobStatusIs jobStatus =
@@ -486,49 +440,38 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 			CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownJobFuture = client.getKvState(
 					wrongJobId, 						// this is the wrong job id
-					"hankuna",
+					"hakuna",
 					0,
 					BasicTypeInfo.INT_TYPE_INFO,
 					valueState);
 
 			try {
-				unknownJobFuture.get();
-				fail(); // by now the job must have failed.
+				unknownJobFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				fail(); // by now the request must have failed.
 			} catch (ExecutionException e) {
-				Assert.assertTrue(e.getCause() instanceof RuntimeException);
-				Assert.assertTrue(e.getCause().getMessage().contains(
+				Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause() instanceof RuntimeException);
+				Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause().getMessage().contains(
 						"FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")"));
-			} catch (Exception ignored) {
-				fail("Unexpected type of exception.");
+			} catch (Exception f) {
+				fail("Unexpected type of exception: " + f.getMessage());
 			}
 
 			CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName = client.getKvState(
-					jobId,
-					"wrong-hankuna", // this is the wrong name.
+					closableJobGraph.getJobId(),
+					"wrong-hakuna", // this is the wrong name.
 					0,
 					BasicTypeInfo.INT_TYPE_INFO,
 					valueState);
 
 			try {
-				unknownQSName.get();
-				fail(); // by now the job must have failed.
+				unknownQSName.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				fail(); // by now the request must have failed.
 			} catch (ExecutionException e) {
-				Assert.assertTrue(e.getCause() instanceof RuntimeException);
-				Assert.assertTrue(e.getCause().getMessage().contains(
-						"UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hankuna'."));
-			} catch (Exception ignored) {
-				fail("Unexpected type of exception.");
-			}
-
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause() instanceof RuntimeException);
+				Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause().getMessage().contains(
+						"UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hakuna'."));
+			} catch (Exception f) {
+				fail("Unexpected type of exception: " + f.getMessage());
 			}
 		}
 	}
@@ -539,50 +482,44 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testQueryNonStartedJobState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-				.addSource(new TestAscendingValueSource(numElements));
-
-			// Value state
-			ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
-				"any",
-				source.getType(),
-				null);
-
-			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+
+		ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+			"any", source.getType(), 	null);
+
+		QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
 				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+
 					private static final long serialVersionUID = 7480503339992214681L;
 
 					@Override
-					public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					public Integer getKey(Tuple2<Integer, Long> value) {
 						return value.f0;
 					}
 				}).asQueryableState("hakuna", valueState);
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			// Now query
 			long expected = numElements;
 
 			// query once
 			client.getKvState(
-					jobId,
+					autoCancellableJob.getJobId(),
 					queryableState.getQueryableStateName(),
 					0,
 					BasicTypeInfo.INT_TYPE_INFO,
@@ -591,16 +528,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			cluster.submitJobDetached(jobGraph);
 
 			executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected);
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -615,51 +542,37 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	@Test(expected = UnknownKeyOrNamespaceException.class)
 	public void testValueStateDefault() throws Throwable {
 
-		// Config
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
-
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env =
-				StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies
-				.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-				.addSource(new TestAscendingValueSource(numElements));
-
-			// Value state
-			ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
-				new ValueStateDescriptor<>(
-					"any",
-					source.getType(),
-					Tuple2.of(0, 1337L));
-
-			// only expose key "1"
-			QueryableStateStream<Integer, Tuple2<Integer, Long>>
-				queryableState =
-				source.keyBy(
-					new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						private static final long serialVersionUID = 4509274556892655887L;
-
-						@Override
-						public Integer getKey(
-							Tuple2<Integer, Long> value) throws
-							Exception {
-							return 1;
-						}
-					}).asQueryableState("hakuna", valueState);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+
+		ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+				"any", source.getType(), 	Tuple2.of(0, 1337L));
+
+		// only expose key "1"
+		QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = source.keyBy(
+				new KeySelector<Tuple2<Integer, Long>, Integer>() {
+					private static final long serialVersionUID = 4509274556892655887L;
+
+					@Override
+					public Integer getKey(Tuple2<Integer, Long> value) {
+						return 1;
+					}
+				}).asQueryableState("hakuna", valueState);
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
 			cluster.submitJobDetached(jobGraph);
 
@@ -683,17 +596,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 				// exception in an ExecutionException.
 				throw e.getCause();
 			}
-		} finally {
-
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -707,55 +609,41 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testValueStateShortcut() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			// Value state shortcut
-			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
-					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						private static final long serialVersionUID = 9168901838808830068L;
-
-						@Override
-						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-							return value.f0;
-						}
-					}).asQueryableState("matata");
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
 
-			cluster.submitJobDetached(jobGraph);
+		// Value state shortcut
+		final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+					private static final long serialVersionUID = 9168901838808830068L;
 
-			final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc =
-					(ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor();
-			executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
-		} finally {
+					@Override
+					public Integer getKey(Tuple2<Integer, Long> value) {
+						return value.f0;
+					}
+				}).asQueryableState("matata");
 
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(
-						cluster.getLeaderGateway(deadline.timeLeft())
-								.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-								.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+		final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc =
+				(ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor();
 
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+			cluster.submitJobDetached(jobGraph);
+			executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
 		}
 	}
 
@@ -768,50 +656,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testFoldingState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final int numElements = 1024;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			// Folding state
-			FoldingStateDescriptor<Tuple2<Integer, Long>, String> foldingState =
-					new FoldingStateDescriptor<>(
-							"any",
-							"0",
-							new SumFold(),
-							StringSerializer.INSTANCE);
-
-			QueryableStateStream<Integer, String> queryableState =
-					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						private static final long serialVersionUID = -842809958106747539L;
-
-						@Override
-						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-							return value.f0;
-						}
-					}).asQueryableState("pumba", foldingState);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
+
+		FoldingStateDescriptor<Tuple2<Integer, Long>, String> foldingState = new FoldingStateDescriptor<>(
+				"any", "0", new SumFold(), StringSerializer.INSTANCE);
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = -842809958106747539L;
+
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).asQueryableState("pumba", foldingState);
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
 			cluster.submitJobDetached(jobGraph);
 
-			// Now query
-			String expected = Integer.toString(numElements * (numElements + 1) / 2);
+			final String expected = Integer.toString(numElements * (numElements + 1) / 2);
 
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
@@ -840,16 +718,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 				assertTrue("Did not succeed query", success);
 			}
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -861,48 +729,40 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testReducingState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			// Reducing state
-			ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState =
-					new ReducingStateDescriptor<>(
-							"any",
-							new SumReduce(),
-							source.getType());
-
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 8470749712274833552L;
-
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).asQueryableState("jungle", reducingState);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
+				"any", new SumReduce(), source.getType());
+
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 8470749712274833552L;
+
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).asQueryableState("jungle", reducingState);
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
 			cluster.submitJobDetached(jobGraph);
 
-			// Now query
-			long expected = numElements * (numElements + 1L) / 2L;
+			final long expected = numElements * (numElements + 1L) / 2L;
 
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
@@ -931,16 +791,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 				assertTrue("Did not succeed query", success);
 			}
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -952,66 +802,60 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testMapState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			final MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor = new MapStateDescriptor<>(
-					"timon",
-					BasicTypeInfo.INT_TYPE_INFO,
-					source.getType());
-			mapStateDescriptor.setQueryable("timon-queryable");
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 8470749712274833552L;
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
 
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
-				private static final long serialVersionUID = -805125545438296619L;
+		final MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor = new MapStateDescriptor<>(
+				"timon", BasicTypeInfo.INT_TYPE_INFO, source.getType());
+		mapStateDescriptor.setQueryable("timon-queryable");
 
-				private transient MapState<Integer, Tuple2<Integer, Long>> mapState;
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 8470749712274833552L;
 
-				@Override
-				public void open(Configuration parameters) throws Exception {
-					super.open(parameters);
-					mapState = getRuntimeContext().getMapState(mapStateDescriptor);
-				}
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+			private static final long serialVersionUID = -805125545438296619L;
 
-				@Override
-				public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
-					Tuple2<Integer, Long> v = mapState.get(value.f0);
-					if (v == null) {
-						v = new Tuple2<>(value.f0, 0L);
-					}
-					mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
+			private transient MapState<Integer, Tuple2<Integer, Long>> mapState;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+				mapState = getRuntimeContext().getMapState(mapStateDescriptor);
+			}
+
+			@Override
+			public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+				Tuple2<Integer, Long> v = mapState.get(value.f0);
+				if (v == null) {
+					v = new Tuple2<>(value.f0, 0L);
 				}
-			});
+				mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
+			}
+		});
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
 			cluster.submitJobDetached(jobGraph);
 
-			// Now query
-			long expected = numElements * (numElements + 1L) / 2L;
+			final long expected = numElements * (numElements + 1L) / 2L;
 
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
@@ -1039,16 +883,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 				assertTrue("Did not succeed query", success);
 			}
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -1061,62 +895,56 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 */
 	@Test
 	public void testListState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			final ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<Long>(
-					"list",
-					BasicTypeInfo.LONG_TYPE_INFO);
-			listStateDescriptor.setQueryable("list-queryable");
-
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 8470749712274833552L;
-
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
-				private static final long serialVersionUID = -805125545438296619L;
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-				private transient ListState<Long> listState;
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
 
-				@Override
-				public void open(Configuration parameters) throws Exception {
-					super.open(parameters);
-					listState = getRuntimeContext().getListState(listStateDescriptor);
-				}
+		final ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<Long>(
+				"list", BasicTypeInfo.LONG_TYPE_INFO);
+		listStateDescriptor.setQueryable("list-queryable");
 
-				@Override
-				public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
-					listState.add(value.f1);
-				}
-			});
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 8470749712274833552L;
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+			private static final long serialVersionUID = -805125545438296619L;
 
-			cluster.submitJobDetached(jobGraph);
+			private transient ListState<Long> listState;
 
-			// Now query
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+				listState = getRuntimeContext().getListState(listStateDescriptor);
+			}
 
-			Map<Integer, Set<Long>> results = new HashMap<>();
+			@Override
+			public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+				listState.add(value.f1);
+			}
+		});
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+			cluster.submitJobDetached(jobGraph);
+
+			final Map<Integer, Set<Long>> results = new HashMap<>();
 
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
@@ -1159,66 +987,48 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 				}
 			}
 
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
 	@Test
 	public void testAggregatingState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final long numElements = 1024L;
 
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStateBackend(stateBackend);
-			env.setParallelism(maxParallelism);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> aggrStateDescriptor =
-					new AggregatingStateDescriptor<>(
-							"aggregates",
-							new SumAggr(),
-							String.class);
-			aggrStateDescriptor.setQueryable("aggr-queryable");
-
-			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-				private static final long serialVersionUID = 8470749712274833552L;
-
-				@Override
-				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-					return value.f0;
-				}
-			}).transform(
-					"TestAggregatingOperator",
-					BasicTypeInfo.STRING_TYPE_INFO,
-					new AggregatingTestOperator(aggrStateDescriptor)
-			);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStateBackend(stateBackend);
+		env.setParallelism(maxParallelism);
+		// Very important, because cluster is shared between tests and we
+		// don't explicitly check that all slots are available before
+		// submitting.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
+		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));
 
-			cluster.submitJobDetached(jobGraph);
+		final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> aggrStateDescriptor =
+				new AggregatingStateDescriptor<>("aggregates", new SumAggr(), String.class);
+		aggrStateDescriptor.setQueryable("aggr-queryable");
 
-			// Now query
+		source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+			private static final long serialVersionUID = 8470749712274833552L;
+
+			@Override
+			public Integer getKey(Tuple2<Integer, Long> value) {
+				return value.f0;
+			}
+		}).transform(
+				"TestAggregatingOperator",
+				BasicTypeInfo.STRING_TYPE_INFO,
+				new AggregatingTestOperator(aggrStateDescriptor)
+		);
+
+		try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {
+
+			final JobID jobId = autoCancellableJob.getJobId();
+			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
+
+			cluster.submitJobDetached(jobGraph);
 
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
@@ -1246,16 +1056,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 				assertTrue("Did not succeed query", success);
 			}
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
-
-				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			}
 		}
 	}
 
@@ -1316,7 +1116,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 				notifyAll();
 			}
 		}
-
 	}
 
 	/**
@@ -1465,6 +1264,60 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 	/////				General Utility Methods				//////
 
+	/**
+	 * A wrapper of the job graph that makes sure to cancel the job and wait for
+	 * termination after the execution of every test.
+	 */
+	private static class AutoCancellableJob implements AutoCloseable {
+
+		private final FlinkMiniCluster cluster;
+		private final Deadline deadline;
+		private final JobGraph jobGraph;
+
+		private final JobID jobId;
+		private final CompletableFuture<TestingJobManagerMessages.JobStatusIs> cancellationFuture;
+
+		AutoCancellableJob(final FlinkMiniCluster cluster, final StreamExecutionEnvironment env, final Deadline deadline) {
+			Preconditions.checkNotNull(env);
+
+			this.cluster = Preconditions.checkNotNull(cluster);
+			this.jobGraph = env.getStreamGraph().getJobGraph();
+			this.deadline = Preconditions.checkNotNull(deadline);
+
+			this.jobId = jobGraph.getJobID();
+			this.cancellationFuture = notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline);
+		}
+
+		JobGraph getJobGraph() {
+			return jobGraph;
+		}
+
+		JobID getJobId() {
+			return jobId;
+		}
+
+		@Override
+		public void close() throws Exception {
+			// Free cluster resources
+			if (jobId != null) {
+				cluster.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+
+				cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	private static CompletableFuture<TestingJobManagerMessages.JobStatusIs> notifyWhenJobStatusIs(
+			final JobID jobId, final JobStatus status, final Deadline deadline) {
+
+		return FutureUtils.toJava(
+				cluster.getLeaderGateway(deadline.timeLeft())
+						.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, status), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+	}
+
 	private static <K, S extends State, V> CompletableFuture<S> getKvState(
 			final Deadline deadline,
 			final QueryableStateClient client,

http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index b9ce7c2..8767b52 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -31,6 +31,8 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
+
 import static org.junit.Assert.fail;
 
 /**
@@ -79,19 +81,13 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
 	}
 
 	@AfterClass
-	public static void tearDown() {
-		if (cluster != null) {
-			cluster.stop();
-			cluster.awaitTermination();
-		}
+	public static void tearDown() throws IOException {
+		client.shutdownAndWait();
 
-		try {
-			zkServer.stop();
-			zkServer.close();
-			client.shutdownAndWait();
-		} catch (Throwable e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		cluster.stop();
+		cluster.awaitTermination();
+
+		zkServer.stop();
+		zkServer.close();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 18b167f..cae02e2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -22,14 +22,12 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 /**
  * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
  */
-@Ignore
 public class HAQueryableStateRocksDBBackendITCase extends HAAbstractQueryableStateTestBase {
 
 	@Rule

http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
index a5e24b2..2686a29 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
@@ -67,12 +67,9 @@ public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQuerya
 
 	@AfterClass
 	public static void tearDown() {
-		try {
-			cluster.stop();
-			client.shutdownAndWait();
-		} catch (Throwable e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		client.shutdownAndWait();
+
+		cluster.stop();
+		cluster.awaitTermination();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a3fd548e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 39fbe9e..7778a94 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -22,14 +22,12 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 /**
  * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
  */
-@Ignore
 public class NonHAQueryableStateRocksDBBackendITCase extends NonHAAbstractQueryableStateTestBase {
 
 	@Rule


[2/3] flink git commit: [FLINK-7974][QS] Wait for QS abstract server to shutdown.

Posted by kk...@apache.org.
[FLINK-7974][QS] Wait for QS abstract server to shutdown.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74d052bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74d052bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74d052bb

Branch: refs/heads/master
Commit: 74d052bb045031363652116ab8226d8ac00e0cd0
Parents: 5760677
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 9 19:30:29 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Wed Dec 6 14:33:22 2017 +0100

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  |   3 +-
 .../org/apache/flink/util/ExecutorUtils.java    |  77 +++++++++++
 .../MesosApplicationMasterRunner.java           |   3 +-
 .../network/AbstractServerBase.java             |  95 ++++++++++---
 .../network/AbstractServerHandler.java          |  11 +-
 .../client/proxy/KvStateClientProxyHandler.java |  34 +++--
 .../client/proxy/KvStateClientProxyImpl.java    |   8 +-
 .../server/KvStateServerHandler.java            |   4 +-
 .../server/KvStateServerImpl.java               |   8 +-
 .../HAAbstractQueryableStateTestBase.java       |   5 +-
 .../NonHAAbstractQueryableStateTestBase.java    |   4 +-
 .../network/AbstractServerTest.java             | 135 +++++++++++--------
 .../flink/runtime/concurrent/Executors.java     |  49 +------
 .../runtime/taskexecutor/TaskManagerRunner.java |   6 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   6 +-
 .../slotmanager/SlotProtocolTest.java           |   3 +-
 .../handler/legacy/ExecutionGraphCacheTest.java |   4 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   3 +-
 19 files changed, 306 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index e21e94b..8c0462d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointMessagePar
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExecutorUtils;
 
 import javax.annotation.Nullable;
 
@@ -90,7 +91,7 @@ public class RestClusterClient extends ClusterClient {
 			log.error("An error occurred during the client shutdown.", e);
 		}
 		this.restClient.shutdown(Time.seconds(5));
-		org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService);
+		ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
new file mode 100644
index 0000000..d98bdd2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utilities for {@link java.util.concurrent.Executor Executors}.
+ */
+public class ExecutorUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutorUtils.class);
+
+	/**
+	 * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that
+	 * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time,
+	 * they will be shut down hard.
+	 *
+	 * @param timeout to wait for the termination of all ExecutorServices
+	 * @param unit of the timeout
+	 * @param executorServices to shut down
+	 */
+	public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
+		for (ExecutorService executorService: executorServices) {
+			executorService.shutdown();
+		}
+
+		boolean wasInterrupted = false;
+		final long endTime = unit.toMillis(timeout) + System.currentTimeMillis();
+		long timeLeft = unit.toMillis(timeout);
+		boolean hasTimeLeft = timeLeft > 0L;
+
+		for (ExecutorService executorService: executorServices) {
+			if (wasInterrupted || !hasTimeLeft) {
+				executorService.shutdownNow();
+			} else {
+				try {
+					if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
+						LOG.warn("ExecutorService did not terminate in time. Shutting it down now.");
+						executorService.shutdownNow();
+					}
+				} catch (InterruptedException e) {
+					LOG.warn("Interrupted while shutting down executor services. Shutting all " +
+							"remaining ExecutorServices down now.", e);
+					executorService.shutdownNow();
+
+					wasInterrupted = true;
+
+					Thread.currentThread().interrupt();
+				}
+
+				timeLeft = endTime - System.currentTimeMillis();
+				hasTimeLeft = timeLeft > 0L;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 93eb3c6..544150b 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.util.ExecutorUtils;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -439,7 +440,7 @@ public class MesosApplicationMasterRunner {
 			}
 		}
 
-		org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
+		ExecutorUtils.gracefulShutdown(
 			AkkaUtils.getTimeout(config).toMillis(),
 			TimeUnit.MILLISECONDS,
 			futureExecutor,

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 82a05f2..d5afeb3 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.queryablestate.network;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
@@ -45,10 +46,12 @@ import java.net.InetSocketAddress;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * The base class for every server in the queryable state module.
@@ -83,6 +86,9 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	/** The number of threads to be used for query serving. */
 	private final int numQueryThreads;
 
+	/** Atomic shut down future. */
+	private final AtomicReference<CompletableFuture<Void>> serverShutdownFuture = new AtomicReference<>(null);
+
 	/** Netty's ServerBootstrap. */
 	private ServerBootstrap bootstrap;
 
@@ -179,8 +185,8 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	 * @throws Exception If something goes wrong during the bind operation.
 	 */
 	public void start() throws Throwable {
-		Preconditions.checkState(serverAddress == null,
-				serverName + " is already running @ " + serverAddress + '.');
+		Preconditions.checkState(serverAddress == null && serverShutdownFuture.get() == null,
+				serverName + " is already running @ " + serverAddress + ". ");
 
 		Iterator<Integer> portIterator = bindPortRange.iterator();
 		while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {}
@@ -251,7 +257,22 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 			throw future.cause();
 		} catch (BindException e) {
 			log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage());
-			shutdown();
+			try {
+				// we shutdown the server but we reset the future every time because in
+				// case of failure to bind, we will call attemptToBind() here, and not resetting
+				// the flag will interfere with future shutdown attempts.
+
+				shutdownServer()
+						.whenComplete((ignoredV, ignoredT) -> serverShutdownFuture.getAndSet(null))
+						.get();
+			} catch (Exception r) {
+
+				// Here we were seeing this problem:
+				// https://github.com/netty/netty/issues/4357 if we do a get().
+				// this is why we now simply wait a bit so that everything is shut down.
+
+				log.warn("Problem while shutting down {}: {}", serverName, r.getMessage());
+			}
 		}
 		// any other type of exception we let it bubble up.
 		return false;
@@ -259,26 +280,62 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 
 	/**
 	 * Shuts down the server and all related thread pools.
+	 * @return A {@link CompletableFuture} that will be completed upon termination of the shutdown process.
 	 */
-	public void shutdown() {
-		log.info("Shutting down {} @ {}", serverName, serverAddress);
-
-		if (handler != null) {
-			handler.shutdown();
-			handler = null;
-		}
-
-		if (queryExecutor != null) {
-			queryExecutor.shutdown();
-		}
+	public CompletableFuture<Void> shutdownServer() {
+		CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
+		if (serverShutdownFuture.compareAndSet(null, shutdownFuture)) {
+			log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+			final CompletableFuture<Void> groupShutdownFuture = new CompletableFuture<>();
+			if (bootstrap != null) {
+				EventLoopGroup group = bootstrap.group();
+				if (group != null && !group.isShutdown()) {
+					group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS)
+							.addListener(finished -> {
+								if (finished.isSuccess()) {
+									groupShutdownFuture.complete(null);
+								} else {
+									groupShutdownFuture.completeExceptionally(finished.cause());
+								}
+							});
+				} else {
+					groupShutdownFuture.complete(null);
+				}
+			} else {
+				groupShutdownFuture.complete(null);
+			}
 
-		if (bootstrap != null) {
-			EventLoopGroup group = bootstrap.group();
-			if (group != null) {
-				group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
+			final CompletableFuture<Void> handlerShutdownFuture = new CompletableFuture<>();
+			if (handler == null) {
+				handlerShutdownFuture.complete(null);
+			} else {
+				handler.shutdown().whenComplete((result, throwable) -> {
+					if (throwable != null) {
+						handlerShutdownFuture.completeExceptionally(throwable);
+					} else {
+						handlerShutdownFuture.complete(null);
+					}
+				});
 			}
+
+			final CompletableFuture<Void> queryExecShutdownFuture = CompletableFuture.runAsync(() -> {
+				if (queryExecutor != null) {
+					ExecutorUtils.gracefulShutdown(10L, TimeUnit.MINUTES, queryExecutor);
+				}
+			});
+
+			CompletableFuture.allOf(
+					queryExecShutdownFuture, groupShutdownFuture, handlerShutdownFuture
+			).whenComplete((result, throwable) -> {
+				if (throwable != null) {
+					shutdownFuture.completeExceptionally(throwable);
+				} else {
+					shutdownFuture.complete(null);
+				}
+			});
 		}
-		serverAddress = null;
+		return serverShutdownFuture.get();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
index 7e71a11..a514723 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -183,9 +183,16 @@ public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extend
 	public abstract CompletableFuture<RESP> handleRequest(final long requestId, final REQ request);
 
 	/**
-	 * Shuts down any handler specific resources, e.g. thread pools etc.
+	 * Shuts down any handler-specific resources, e.g. thread pools etc and returns
+	 * a {@link CompletableFuture}.
+	 *
+	 * <p>If an exception is thrown during the shutdown process, then that exception
+	 * will be included in the returned future.
+	 *
+	 * @return A {@link CompletableFuture} that will be completed when the shutdown
+	 * process actually finishes.
 	 */
-	public abstract void shutdown();
+	public abstract CompletableFuture<Void> shutdown();
 
 	/**
 	 * Task to execute the actual query against the state instance.

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index af33701..29ee0d7 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -33,7 +33,9 @@ import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
 import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
 import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage;
@@ -42,6 +44,7 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 
+import akka.dispatch.OnComplete;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,6 +144,7 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
 								// KvStateLocation. Therefore we retry this query and
 								// force look up the location.
 
+								LOG.debug("Retrying after failing to retrieve state due to: {}.", throwable.getCause().getMessage());
 								executeActionAsync(result, request, true);
 							} else {
 								result.completeExceptionally(throwable);
@@ -203,20 +207,34 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
 
 		LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName);
 
+		final CompletableFuture<KvStateLocation> location = new CompletableFuture<>();
+		lookupCache.put(cacheKey, location);
 		return proxy.getJobManagerFuture().thenComposeAsync(
 				jobManagerGateway -> {
 					final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
-					final CompletableFuture<KvStateLocation> locationFuture = FutureUtils.toJava(
-							jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
-									.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)));
-
-					lookupCache.put(cacheKey, locationFuture);
-					return locationFuture;
+					jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
+							.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
+							.onComplete(new OnComplete<KvStateLocation>() {
+
+								@Override
+								public void onComplete(Throwable failure, KvStateLocation loc) throws Throwable {
+									if (failure != null) {
+										if (failure instanceof FlinkJobNotFoundException) {
+											// if the jobId was wrong, remove the entry from the cache.
+											lookupCache.remove(cacheKey);
+										}
+										location.completeExceptionally(failure);
+									} else {
+										location.complete(loc);
+									}
+								}
+							}, Executors.directExecutionContext());
+					return location;
 				}, queryExecutor);
 	}
 
 	@Override
-	public void shutdown() {
-		kvStateClient.shutdown();
+	public CompletableFuture<Void> shutdown() {
+		return kvStateClient.shutdown();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
index 6fcaf40..aa5e7b6 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -35,6 +35,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The default implementation of the {@link KvStateClientProxy}.
@@ -96,7 +97,12 @@ public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, K
 
 	@Override
 	public void shutdown() {
-		super.shutdown();
+		try {
+			shutdownServer().get(10L, TimeUnit.SECONDS);
+			log.info("{} was shutdown successfully.", getServerName());
+		} catch (Exception e) {
+			log.warn("{} shutdown failed: {}", getServerName(), e);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
index 476f153..18a2944 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -101,7 +101,7 @@ public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalR
 	}
 
 	@Override
-	public void shutdown() {
-		// do nothing
+	public CompletableFuture<Void> shutdown() {
+		return CompletableFuture.completedFuture(null);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index 3a37a3a..0720268 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -32,6 +32,7 @@ import org.apache.flink.util.Preconditions;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The default implementation of the {@link KvStateServer}.
@@ -101,6 +102,11 @@ public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest
 
 	@Override
 	public void shutdown() {
-		super.shutdown();
+		try {
+			shutdownServer().get(10L, TimeUnit.SECONDS);
+			log.info("{} was shutdown successfully.", getServerName());
+		} catch (Exception e) {
+			log.warn("{} shutdown failed: {}", getServerName(), e);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index 79809b3..b9ce7c2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -88,11 +88,10 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
 		try {
 			zkServer.stop();
 			zkServer.close();
-		} catch (Exception e) {
+			client.shutdownAndWait();
+		} catch (Throwable e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-
-		client.shutdown();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
index 6945cca..a5e24b2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
@@ -69,10 +69,10 @@ public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQuerya
 	public static void tearDown() {
 		try {
 			cluster.stop();
-		} catch (Exception e) {
+			client.shutdownAndWait();
+		} catch (Throwable e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		client.shutdown();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index 3d2ed40..103c638 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -22,7 +22,9 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.queryablestate.network.messages.MessageBody;
 import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
 import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
@@ -37,6 +39,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -60,22 +63,15 @@ public class AbstractServerTest {
 		expectedEx.expect(FlinkRuntimeException.class);
 		expectedEx.expectMessage("Unable to start Test Server 2. All ports in provided range are occupied.");
 
-		TestServer server1 = null;
-		TestServer server2 = null;
-		try {
+		List<Integer> portList = new ArrayList<>();
+		portList.add(7777);
 
-			server1 = startServer("Test Server 1", 7777);
-			Assert.assertEquals(7777L, server1.getServerAddress().getPort());
+		try (TestServer server1 = new TestServer("Test Server 1", new DisabledKvStateRequestStats(), portList.iterator())) {
+			server1.start();
 
-			server2 = startServer("Test Server 2", 7777);
-		} finally {
-
-			if (server1 != null) {
-				server1.shutdown();
-			}
-
-			if (server2 != null) {
-				server2.shutdown();
+			try (TestServer server2 = new TestServer("Test Server 2", new DisabledKvStateRequestStats(),
+					Collections.singletonList(server1.getServerAddress().getPort()).iterator())) {
+				server2.start();
 			}
 		}
 	}
@@ -86,69 +82,81 @@ public class AbstractServerTest {
 	 */
 	@Test
 	public void testPortRangeSuccess() throws Throwable {
-		TestServer server1 = null;
-		TestServer server2 = null;
-		Client<TestMessage, TestMessage> client = null;
 
-		try {
-			server1 = startServer("Test Server 1", 7777, 7778, 7779);
-			Assert.assertEquals(7777L, server1.getServerAddress().getPort());
-
-			server2 = startServer("Test Server 2", 7777, 7778, 7779);
-			Assert.assertEquals(7778L, server2.getServerAddress().getPort());
-
-			client = new Client<>(
-					"Test Client",
-					1,
-					new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()),
-					new DisabledKvStateRequestStats());
+		// this is shared between the two servers.
+		AtomicKvStateRequestStats serverStats = new AtomicKvStateRequestStats();
+		AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
+
+		List<Integer> portList = new ArrayList<>();
+		portList.add(7777);
+		portList.add(7778);
+		portList.add(7779);
+
+		try (
+				TestServer server1 = new TestServer("Test Server 1", serverStats, portList.iterator());
+				TestServer server2 = new TestServer("Test Server 2", serverStats, portList.iterator());
+				TestClient client = new TestClient(
+						"Test Client",
+						1,
+						new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()),
+						clientStats
+				)
+		) {
+			server1.start();
+			Assert.assertTrue(server1.getServerAddress().getPort() >= 7777 && server1.getServerAddress().getPort() <= 7779);
+
+			server2.start();
+			Assert.assertTrue(server2.getServerAddress().getPort() >= 7777 && server2.getServerAddress().getPort() <= 7779);
 
 			TestMessage response1 = client.sendRequest(server1.getServerAddress(), new TestMessage("ping")).join();
 			Assert.assertEquals(server1.getServerName() + "-ping", response1.getMessage());
 
 			TestMessage response2 = client.sendRequest(server2.getServerAddress(), new TestMessage("pong")).join();
 			Assert.assertEquals(server2.getServerName() + "-pong", response2.getMessage());
-		} finally {
 
-			if (server1 != null) {
-				server1.shutdown();
-			}
-
-			if (server2 != null) {
-				server2.shutdown();
-			}
+			// the client connects to both servers and the stats object is shared.
+			Assert.assertEquals(2L, serverStats.getNumConnections());
 
-			if (client != null) {
-				client.shutdown();
-			}
+			Assert.assertEquals(2L, clientStats.getNumConnections());
+			Assert.assertEquals(0L, clientStats.getNumFailed());
+			Assert.assertEquals(2L, clientStats.getNumSuccessful());
+			Assert.assertEquals(2L, clientStats.getNumRequests());
 		}
+
+		Assert.assertEquals(0L, clientStats.getNumConnections());
+		Assert.assertEquals(0L, clientStats.getNumFailed());
+		Assert.assertEquals(2L, clientStats.getNumSuccessful());
+		Assert.assertEquals(2L, clientStats.getNumRequests());
 	}
 
-	/**
-	 * Initializes a {@link TestServer} with the given port range.
-	 * @param serverName the name of the server.
-	 * @param ports a range of ports.
-	 * @return A test server with the given name.
-	 */
-	private TestServer startServer(String serverName, int... ports) throws Throwable {
-		List<Integer> portList = new ArrayList<>(ports.length);
-		for (int p : ports) {
-			portList.add(p);
+	private static class TestClient extends Client<TestMessage, TestMessage> implements AutoCloseable {
+
+		TestClient(
+				String clientName,
+				int numEventLoopThreads,
+				MessageSerializer<TestMessage, TestMessage> serializer,
+				KvStateRequestStats stats) {
+			super(clientName, numEventLoopThreads, serializer, stats);
 		}
 
-		final TestServer server = new TestServer(serverName, portList.iterator());
-		server.start();
-		return server;
+		@Override
+		public void close() throws Exception {
+			shutdown().join();
+			Assert.assertTrue(isEventGroupShutdown());
+		}
 	}
 
 	/**
 	 * A server that receives a {@link TestMessage test message} and returns another test
 	 * message containing the same string as the request with the name of the server prepended.
 	 */
-	private class TestServer extends AbstractServerBase<TestMessage, TestMessage> {
+	private static class TestServer extends AbstractServerBase<TestMessage, TestMessage> implements AutoCloseable {
 
-		protected TestServer(String name, Iterator<Integer> bindPort) throws UnknownHostException {
+		private final KvStateRequestStats requestStats;
+
+		TestServer(String name, KvStateRequestStats stats, Iterator<Integer> bindPort) throws UnknownHostException {
 			super(name, InetAddress.getLocalHost(), bindPort, 1, 1);
+			this.requestStats = stats;
 		}
 
 		@Override
@@ -156,7 +164,7 @@ public class AbstractServerTest {
 			return new AbstractServerHandler<TestMessage, TestMessage>(
 					this,
 					new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()),
-					new DisabledKvStateRequestStats()) {
+					requestStats) {
 
 				@Override
 				public CompletableFuture<TestMessage> handleRequest(long requestId, TestMessage request) {
@@ -165,11 +173,22 @@ public class AbstractServerTest {
 				}
 
 				@Override
-				public void shutdown() {
-					// do nothing
+				public CompletableFuture<Void> shutdown() {
+					return CompletableFuture.completedFuture(null);
 				}
 			};
 		}
+
+		@Override
+		public void close() throws Exception {
+			shutdownServer().get();
+			if (requestStats instanceof AtomicKvStateRequestStats) {
+				AtomicKvStateRequestStats stats = (AtomicKvStateRequestStats) requestStats;
+				Assert.assertEquals(0L, stats.getNumConnections());
+			}
+			Assert.assertTrue(getQueryExecutor().isTerminated());
+			Assert.assertTrue(isEventGroupShutdown());
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 04cdce7..703ac4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -22,14 +22,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.ExecutionContext;
 
 /**
- * Collection of {@link Executor} implementations
+ * Collection of {@link Executor} implementations.
  */
 public class Executors {
 
@@ -94,48 +93,4 @@ public class Executors {
 			return this;
 		}
 	}
-
-	/**
-	 * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that
-	 * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time,
-	 * they will be shut down hard.
-	 *
-	 * @param timeout to wait for the termination of all ExecutorServices
-	 * @param unit of the timeout
-	 * @param executorServices to shut down
-	 */
-	public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
-		for (ExecutorService executorService: executorServices) {
-			executorService.shutdown();
-		}
-
-		boolean wasInterrupted = false;
-		final long endTime = unit.toMillis(timeout) + System.currentTimeMillis();
-		long timeLeft = unit.toMillis(timeout);
-		boolean hasTimeLeft = timeLeft > 0L;
-
-		for (ExecutorService executorService: executorServices) {
-			if (wasInterrupted || !hasTimeLeft) {
-				executorService.shutdownNow();
-			} else {
-				try {
-					if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
-						LOG.warn("ExecutorService did not terminate in time. Shutting it down now.");
-						executorService.shutdownNow();
-					}
-				} catch (InterruptedException e) {
-					LOG.warn("Interrupted while shutting down executor services. Shutting all " +
-						"remaining ExecutorServices down now.", e);
-					executorService.shutdownNow();
-
-					wasInterrupted = true;
-
-					Thread.currentThread().interrupt();
-				}
-
-				timeLeft = endTime - System.currentTimeMillis();
-				hasTimeLeft = timeLeft > 0L;
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index a24daf0..7258e52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -47,6 +46,7 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.Preconditions;
 
 import akka.actor.ActorSystem;
@@ -88,7 +88,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
 	private final MetricRegistryImpl metricRegistry;
 
-	/** Executor used to run future callbacks */
+	/** Executor used to run future callbacks. */
 	private final ExecutorService executor;
 
 	private final TaskExecutor taskManager;
@@ -165,7 +165,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 				exception = ExceptionUtils.firstOrSuppressed(e, exception);
 			}
 
-			Executors.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor);
+			ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor);
 
 			if (exception != null) {
 				throw exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7799a78..5f82159 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.clusterframework.{BootstrapTools, FlinkResourceM
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter, Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
@@ -85,7 +85,7 @@ import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.{FlinkException, InstantiationUtil, NetUtils, SerializedThrowable}
+import org.apache.flink.util._
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -2060,7 +2060,7 @@ object JobManager {
         LOG.warn("Could not properly shut down the metric registry.", t)
     }
 
-    FlinkExecutors.gracefulShutdown(
+    ExecutorUtils.gracefulShutdown(
       timeout.toMillis,
       TimeUnit.MILLISECONDS,
       futureExecutor,

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 227b854..5554061 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -32,7 +32,7 @@ import org.apache.flink.configuration._
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
-import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter, Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter}
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders
 import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
@@ -44,7 +44,7 @@ import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegi
 import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
 import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
-import org.apache.flink.util.NetUtils
+import org.apache.flink.util.{ExecutorUtils, NetUtils}
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -440,7 +440,7 @@ abstract class FlinkMiniCluster(
 
     isRunning = false
 
-    FlinkExecutors.gracefulShutdown(
+    ExecutorUtils.gracefulShutdown(
       timeout.toMillis,
       TimeUnit.MILLISECONDS,
       futureExecutor,

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 97942ea..79e38df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
@@ -64,7 +65,7 @@ public class SlotProtocolTest extends TestLogger {
 
 	@AfterClass
 	public static void afterClass() {
-		Executors.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, scheduledExecutorService);
+		ExecutorUtils.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, scheduledExecutorService);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
index 1a8ea84..ecadaa5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -233,7 +233,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
 
 			verify(jobManagerGateway, times(1)).requestJob(eq(jobId), any(Time.class));
 		} finally {
-			Executors.gracefulShutdown(5000L, TimeUnit.MILLISECONDS, executor);
+			ExecutorUtils.gracefulShutdown(5000L, TimeUnit.MILLISECONDS, executor);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 3bdc2ac..279981a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -472,7 +473,7 @@ public class YarnApplicationMasterRunner {
 			}
 		}
 
-		org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
+		ExecutorUtils.gracefulShutdown(
 			AkkaUtils.getTimeout(config).toMillis(),
 			TimeUnit.MILLISECONDS,
 			futureExecutor,