You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/12/06 15:11:49 UTC

[2/3] flink git commit: [FLINK-7974][QS] Wait for QS abstract server to shutdown.

[FLINK-7974][QS] Wait for QS abstract server to shutdown.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74d052bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74d052bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74d052bb

Branch: refs/heads/master
Commit: 74d052bb045031363652116ab8226d8ac00e0cd0
Parents: 5760677
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 9 19:30:29 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Wed Dec 6 14:33:22 2017 +0100

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  |   3 +-
 .../org/apache/flink/util/ExecutorUtils.java    |  77 +++++++++++
 .../MesosApplicationMasterRunner.java           |   3 +-
 .../network/AbstractServerBase.java             |  95 ++++++++++---
 .../network/AbstractServerHandler.java          |  11 +-
 .../client/proxy/KvStateClientProxyHandler.java |  34 +++--
 .../client/proxy/KvStateClientProxyImpl.java    |   8 +-
 .../server/KvStateServerHandler.java            |   4 +-
 .../server/KvStateServerImpl.java               |   8 +-
 .../HAAbstractQueryableStateTestBase.java       |   5 +-
 .../NonHAAbstractQueryableStateTestBase.java    |   4 +-
 .../network/AbstractServerTest.java             | 135 +++++++++++--------
 .../flink/runtime/concurrent/Executors.java     |  49 +------
 .../runtime/taskexecutor/TaskManagerRunner.java |   6 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   6 +-
 .../slotmanager/SlotProtocolTest.java           |   3 +-
 .../handler/legacy/ExecutionGraphCacheTest.java |   4 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   3 +-
 19 files changed, 306 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index e21e94b..8c0462d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointMessagePar
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExecutorUtils;
 
 import javax.annotation.Nullable;
 
@@ -90,7 +91,7 @@ public class RestClusterClient extends ClusterClient {
 			log.error("An error occurred during the client shutdown.", e);
 		}
 		this.restClient.shutdown(Time.seconds(5));
-		org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService);
+		ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
new file mode 100644
index 0000000..d98bdd2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utilities for {@link java.util.concurrent.Executor Executors}.
+ */
+public class ExecutorUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutorUtils.class);
+
+	/**
+	 * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that
+	 * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time,
+	 * they will be shut down hard.
+	 *
+	 * @param timeout to wait for the termination of all ExecutorServices
+	 * @param unit of the timeout
+	 * @param executorServices to shut down
+	 */
+	public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
+		for (ExecutorService executorService: executorServices) {
+			executorService.shutdown();
+		}
+
+		boolean wasInterrupted = false;
+		final long endTime = unit.toMillis(timeout) + System.currentTimeMillis();
+		long timeLeft = unit.toMillis(timeout);
+		boolean hasTimeLeft = timeLeft > 0L;
+
+		for (ExecutorService executorService: executorServices) {
+			if (wasInterrupted || !hasTimeLeft) {
+				executorService.shutdownNow();
+			} else {
+				try {
+					if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
+						LOG.warn("ExecutorService did not terminate in time. Shutting it down now.");
+						executorService.shutdownNow();
+					}
+				} catch (InterruptedException e) {
+					LOG.warn("Interrupted while shutting down executor services. Shutting all " +
+							"remaining ExecutorServices down now.", e);
+					executorService.shutdownNow();
+
+					wasInterrupted = true;
+
+					Thread.currentThread().interrupt();
+				}
+
+				timeLeft = endTime - System.currentTimeMillis();
+				hasTimeLeft = timeLeft > 0L;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 93eb3c6..544150b 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.util.ExecutorUtils;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -439,7 +440,7 @@ public class MesosApplicationMasterRunner {
 			}
 		}
 
-		org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
+		ExecutorUtils.gracefulShutdown(
 			AkkaUtils.getTimeout(config).toMillis(),
 			TimeUnit.MILLISECONDS,
 			futureExecutor,

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 82a05f2..d5afeb3 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.queryablestate.network;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
@@ -45,10 +46,12 @@ import java.net.InetSocketAddress;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * The base class for every server in the queryable state module.
@@ -83,6 +86,9 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	/** The number of threads to be used for query serving. */
 	private final int numQueryThreads;
 
+	/** Atomic shut down future. */
+	private final AtomicReference<CompletableFuture<Void>> serverShutdownFuture = new AtomicReference<>(null);
+
 	/** Netty's ServerBootstrap. */
 	private ServerBootstrap bootstrap;
 
@@ -179,8 +185,8 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	 * @throws Exception If something goes wrong during the bind operation.
 	 */
 	public void start() throws Throwable {
-		Preconditions.checkState(serverAddress == null,
-				serverName + " is already running @ " + serverAddress + '.');
+		Preconditions.checkState(serverAddress == null && serverShutdownFuture.get() == null,
+				serverName + " is already running @ " + serverAddress + ". ");
 
 		Iterator<Integer> portIterator = bindPortRange.iterator();
 		while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {}
@@ -251,7 +257,22 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 			throw future.cause();
 		} catch (BindException e) {
 			log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage());
-			shutdown();
+			try {
+				// we shutdown the server but we reset the future every time because in
+				// case of failure to bind, we will call attemptToBind() here, and not resetting
+				// the flag will interfere with future shutdown attempts.
+
+				shutdownServer()
+						.whenComplete((ignoredV, ignoredT) -> serverShutdownFuture.getAndSet(null))
+						.get();
+			} catch (Exception r) {
+
+				// Here we were seeing this problem:
+				// https://github.com/netty/netty/issues/4357 if we do a get().
+				// this is why we now simply wait a bit so that everything is shut down.
+
+				log.warn("Problem while shutting down {}: {}", serverName, r.getMessage());
+			}
 		}
 		// any other type of exception we let it bubble up.
 		return false;
@@ -259,26 +280,62 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 
 	/**
 	 * Shuts down the server and all related thread pools.
+	 * @return A {@link CompletableFuture} that will be completed upon termination of the shutdown process.
 	 */
-	public void shutdown() {
-		log.info("Shutting down {} @ {}", serverName, serverAddress);
-
-		if (handler != null) {
-			handler.shutdown();
-			handler = null;
-		}
-
-		if (queryExecutor != null) {
-			queryExecutor.shutdown();
-		}
+	public CompletableFuture<Void> shutdownServer() {
+		CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
+		if (serverShutdownFuture.compareAndSet(null, shutdownFuture)) {
+			log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+			final CompletableFuture<Void> groupShutdownFuture = new CompletableFuture<>();
+			if (bootstrap != null) {
+				EventLoopGroup group = bootstrap.group();
+				if (group != null && !group.isShutdown()) {
+					group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS)
+							.addListener(finished -> {
+								if (finished.isSuccess()) {
+									groupShutdownFuture.complete(null);
+								} else {
+									groupShutdownFuture.completeExceptionally(finished.cause());
+								}
+							});
+				} else {
+					groupShutdownFuture.complete(null);
+				}
+			} else {
+				groupShutdownFuture.complete(null);
+			}
 
-		if (bootstrap != null) {
-			EventLoopGroup group = bootstrap.group();
-			if (group != null) {
-				group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
+			final CompletableFuture<Void> handlerShutdownFuture = new CompletableFuture<>();
+			if (handler == null) {
+				handlerShutdownFuture.complete(null);
+			} else {
+				handler.shutdown().whenComplete((result, throwable) -> {
+					if (throwable != null) {
+						handlerShutdownFuture.completeExceptionally(throwable);
+					} else {
+						handlerShutdownFuture.complete(null);
+					}
+				});
 			}
+
+			final CompletableFuture<Void> queryExecShutdownFuture = CompletableFuture.runAsync(() -> {
+				if (queryExecutor != null) {
+					ExecutorUtils.gracefulShutdown(10L, TimeUnit.MINUTES, queryExecutor);
+				}
+			});
+
+			CompletableFuture.allOf(
+					queryExecShutdownFuture, groupShutdownFuture, handlerShutdownFuture
+			).whenComplete((result, throwable) -> {
+				if (throwable != null) {
+					shutdownFuture.completeExceptionally(throwable);
+				} else {
+					shutdownFuture.complete(null);
+				}
+			});
 		}
-		serverAddress = null;
+		return serverShutdownFuture.get();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
index 7e71a11..a514723 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -183,9 +183,16 @@ public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extend
 	public abstract CompletableFuture<RESP> handleRequest(final long requestId, final REQ request);
 
 	/**
-	 * Shuts down any handler specific resources, e.g. thread pools etc.
+	 * Shuts down any handler-specific resources, e.g. thread pools etc and returns
+	 * a {@link CompletableFuture}.
+	 *
+	 * <p>If an exception is thrown during the shutdown process, then that exception
+	 * will be included in the returned future.
+	 *
+	 * @return A {@link CompletableFuture} that will be completed when the shutdown
+	 * process actually finishes.
 	 */
-	public abstract void shutdown();
+	public abstract CompletableFuture<Void> shutdown();
 
 	/**
 	 * Task to execute the actual query against the state instance.

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index af33701..29ee0d7 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -33,7 +33,9 @@ import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
 import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
 import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage;
@@ -42,6 +44,7 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 
+import akka.dispatch.OnComplete;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,6 +144,7 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
 								// KvStateLocation. Therefore we retry this query and
 								// force look up the location.
 
+								LOG.debug("Retrying after failing to retrieve state due to: {}.", throwable.getCause().getMessage());
 								executeActionAsync(result, request, true);
 							} else {
 								result.completeExceptionally(throwable);
@@ -203,20 +207,34 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
 
 		LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName);
 
+		final CompletableFuture<KvStateLocation> location = new CompletableFuture<>();
+		lookupCache.put(cacheKey, location);
 		return proxy.getJobManagerFuture().thenComposeAsync(
 				jobManagerGateway -> {
 					final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
-					final CompletableFuture<KvStateLocation> locationFuture = FutureUtils.toJava(
-							jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
-									.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)));
-
-					lookupCache.put(cacheKey, locationFuture);
-					return locationFuture;
+					jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
+							.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
+							.onComplete(new OnComplete<KvStateLocation>() {
+
+								@Override
+								public void onComplete(Throwable failure, KvStateLocation loc) throws Throwable {
+									if (failure != null) {
+										if (failure instanceof FlinkJobNotFoundException) {
+											// if the jobId was wrong, remove the entry from the cache.
+											lookupCache.remove(cacheKey);
+										}
+										location.completeExceptionally(failure);
+									} else {
+										location.complete(loc);
+									}
+								}
+							}, Executors.directExecutionContext());
+					return location;
 				}, queryExecutor);
 	}
 
 	@Override
-	public void shutdown() {
-		kvStateClient.shutdown();
+	public CompletableFuture<Void> shutdown() {
+		return kvStateClient.shutdown();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
index 6fcaf40..aa5e7b6 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -35,6 +35,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The default implementation of the {@link KvStateClientProxy}.
@@ -96,7 +97,12 @@ public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, K
 
 	@Override
 	public void shutdown() {
-		super.shutdown();
+		try {
+			shutdownServer().get(10L, TimeUnit.SECONDS);
+			log.info("{} was shutdown successfully.", getServerName());
+		} catch (Exception e) {
+			log.warn("{} shutdown failed: {}", getServerName(), e);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
index 476f153..18a2944 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -101,7 +101,7 @@ public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalR
 	}
 
 	@Override
-	public void shutdown() {
-		// do nothing
+	public CompletableFuture<Void> shutdown() {
+		return CompletableFuture.completedFuture(null);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index 3a37a3a..0720268 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -32,6 +32,7 @@ import org.apache.flink.util.Preconditions;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The default implementation of the {@link KvStateServer}.
@@ -101,6 +102,11 @@ public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest
 
 	@Override
 	public void shutdown() {
-		super.shutdown();
+		try {
+			shutdownServer().get(10L, TimeUnit.SECONDS);
+			log.info("{} was shutdown successfully.", getServerName());
+		} catch (Exception e) {
+			log.warn("{} shutdown failed: {}", getServerName(), e);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index 79809b3..b9ce7c2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -88,11 +88,10 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
 		try {
 			zkServer.stop();
 			zkServer.close();
-		} catch (Exception e) {
+			client.shutdownAndWait();
+		} catch (Throwable e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-
-		client.shutdown();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
index 6945cca..a5e24b2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
@@ -69,10 +69,10 @@ public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQuerya
 	public static void tearDown() {
 		try {
 			cluster.stop();
-		} catch (Exception e) {
+			client.shutdownAndWait();
+		} catch (Throwable e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		client.shutdown();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index 3d2ed40..103c638 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -22,7 +22,9 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.queryablestate.network.messages.MessageBody;
 import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
 import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
@@ -37,6 +39,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -60,22 +63,15 @@ public class AbstractServerTest {
 		expectedEx.expect(FlinkRuntimeException.class);
 		expectedEx.expectMessage("Unable to start Test Server 2. All ports in provided range are occupied.");
 
-		TestServer server1 = null;
-		TestServer server2 = null;
-		try {
+		List<Integer> portList = new ArrayList<>();
+		portList.add(7777);
 
-			server1 = startServer("Test Server 1", 7777);
-			Assert.assertEquals(7777L, server1.getServerAddress().getPort());
+		try (TestServer server1 = new TestServer("Test Server 1", new DisabledKvStateRequestStats(), portList.iterator())) {
+			server1.start();
 
-			server2 = startServer("Test Server 2", 7777);
-		} finally {
-
-			if (server1 != null) {
-				server1.shutdown();
-			}
-
-			if (server2 != null) {
-				server2.shutdown();
+			try (TestServer server2 = new TestServer("Test Server 2", new DisabledKvStateRequestStats(),
+					Collections.singletonList(server1.getServerAddress().getPort()).iterator())) {
+				server2.start();
 			}
 		}
 	}
@@ -86,69 +82,81 @@ public class AbstractServerTest {
 	 */
 	@Test
 	public void testPortRangeSuccess() throws Throwable {
-		TestServer server1 = null;
-		TestServer server2 = null;
-		Client<TestMessage, TestMessage> client = null;
 
-		try {
-			server1 = startServer("Test Server 1", 7777, 7778, 7779);
-			Assert.assertEquals(7777L, server1.getServerAddress().getPort());
-
-			server2 = startServer("Test Server 2", 7777, 7778, 7779);
-			Assert.assertEquals(7778L, server2.getServerAddress().getPort());
-
-			client = new Client<>(
-					"Test Client",
-					1,
-					new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()),
-					new DisabledKvStateRequestStats());
+		// this is shared between the two servers.
+		AtomicKvStateRequestStats serverStats = new AtomicKvStateRequestStats();
+		AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
+
+		List<Integer> portList = new ArrayList<>();
+		portList.add(7777);
+		portList.add(7778);
+		portList.add(7779);
+
+		try (
+				TestServer server1 = new TestServer("Test Server 1", serverStats, portList.iterator());
+				TestServer server2 = new TestServer("Test Server 2", serverStats, portList.iterator());
+				TestClient client = new TestClient(
+						"Test Client",
+						1,
+						new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()),
+						clientStats
+				)
+		) {
+			server1.start();
+			Assert.assertTrue(server1.getServerAddress().getPort() >= 7777 && server1.getServerAddress().getPort() <= 7779);
+
+			server2.start();
+			Assert.assertTrue(server2.getServerAddress().getPort() >= 7777 && server2.getServerAddress().getPort() <= 7779);
 
 			TestMessage response1 = client.sendRequest(server1.getServerAddress(), new TestMessage("ping")).join();
 			Assert.assertEquals(server1.getServerName() + "-ping", response1.getMessage());
 
 			TestMessage response2 = client.sendRequest(server2.getServerAddress(), new TestMessage("pong")).join();
 			Assert.assertEquals(server2.getServerName() + "-pong", response2.getMessage());
-		} finally {
 
-			if (server1 != null) {
-				server1.shutdown();
-			}
-
-			if (server2 != null) {
-				server2.shutdown();
-			}
+			// the client connects to both servers and the stats object is shared.
+			Assert.assertEquals(2L, serverStats.getNumConnections());
 
-			if (client != null) {
-				client.shutdown();
-			}
+			Assert.assertEquals(2L, clientStats.getNumConnections());
+			Assert.assertEquals(0L, clientStats.getNumFailed());
+			Assert.assertEquals(2L, clientStats.getNumSuccessful());
+			Assert.assertEquals(2L, clientStats.getNumRequests());
 		}
+
+		Assert.assertEquals(0L, clientStats.getNumConnections());
+		Assert.assertEquals(0L, clientStats.getNumFailed());
+		Assert.assertEquals(2L, clientStats.getNumSuccessful());
+		Assert.assertEquals(2L, clientStats.getNumRequests());
 	}
 
-	/**
-	 * Initializes a {@link TestServer} with the given port range.
-	 * @param serverName the name of the server.
-	 * @param ports a range of ports.
-	 * @return A test server with the given name.
-	 */
-	private TestServer startServer(String serverName, int... ports) throws Throwable {
-		List<Integer> portList = new ArrayList<>(ports.length);
-		for (int p : ports) {
-			portList.add(p);
+	private static class TestClient extends Client<TestMessage, TestMessage> implements AutoCloseable {
+
+		TestClient(
+				String clientName,
+				int numEventLoopThreads,
+				MessageSerializer<TestMessage, TestMessage> serializer,
+				KvStateRequestStats stats) {
+			super(clientName, numEventLoopThreads, serializer, stats);
 		}
 
-		final TestServer server = new TestServer(serverName, portList.iterator());
-		server.start();
-		return server;
+		@Override
+		public void close() throws Exception {
+			shutdown().join();
+			Assert.assertTrue(isEventGroupShutdown());
+		}
 	}
 
 	/**
 	 * A server that receives a {@link TestMessage test message} and returns another test
 	 * message containing the same string as the request with the name of the server prepended.
 	 */
-	private class TestServer extends AbstractServerBase<TestMessage, TestMessage> {
+	private static class TestServer extends AbstractServerBase<TestMessage, TestMessage> implements AutoCloseable {
 
-		protected TestServer(String name, Iterator<Integer> bindPort) throws UnknownHostException {
+		private final KvStateRequestStats requestStats;
+
+		TestServer(String name, KvStateRequestStats stats, Iterator<Integer> bindPort) throws UnknownHostException {
 			super(name, InetAddress.getLocalHost(), bindPort, 1, 1);
+			this.requestStats = stats;
 		}
 
 		@Override
@@ -156,7 +164,7 @@ public class AbstractServerTest {
 			return new AbstractServerHandler<TestMessage, TestMessage>(
 					this,
 					new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()),
-					new DisabledKvStateRequestStats()) {
+					requestStats) {
 
 				@Override
 				public CompletableFuture<TestMessage> handleRequest(long requestId, TestMessage request) {
@@ -165,11 +173,22 @@ public class AbstractServerTest {
 				}
 
 				@Override
-				public void shutdown() {
-					// do nothing
+				public CompletableFuture<Void> shutdown() {
+					return CompletableFuture.completedFuture(null);
 				}
 			};
 		}
+
+		@Override
+		public void close() throws Exception {
+			shutdownServer().get();
+			if (requestStats instanceof AtomicKvStateRequestStats) {
+				AtomicKvStateRequestStats stats = (AtomicKvStateRequestStats) requestStats;
+				Assert.assertEquals(0L, stats.getNumConnections());
+			}
+			Assert.assertTrue(getQueryExecutor().isTerminated());
+			Assert.assertTrue(isEventGroupShutdown());
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 04cdce7..703ac4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -22,14 +22,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.ExecutionContext;
 
 /**
- * Collection of {@link Executor} implementations
+ * Collection of {@link Executor} implementations.
  */
 public class Executors {
 
@@ -94,48 +93,4 @@ public class Executors {
 			return this;
 		}
 	}
-
-	/**
-	 * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that
-	 * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time,
-	 * they will be shut down hard.
-	 *
-	 * @param timeout to wait for the termination of all ExecutorServices
-	 * @param unit of the timeout
-	 * @param executorServices to shut down
-	 */
-	public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
-		for (ExecutorService executorService: executorServices) {
-			executorService.shutdown();
-		}
-
-		boolean wasInterrupted = false;
-		final long endTime = unit.toMillis(timeout) + System.currentTimeMillis();
-		long timeLeft = unit.toMillis(timeout);
-		boolean hasTimeLeft = timeLeft > 0L;
-
-		for (ExecutorService executorService: executorServices) {
-			if (wasInterrupted || !hasTimeLeft) {
-				executorService.shutdownNow();
-			} else {
-				try {
-					if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
-						LOG.warn("ExecutorService did not terminate in time. Shutting it down now.");
-						executorService.shutdownNow();
-					}
-				} catch (InterruptedException e) {
-					LOG.warn("Interrupted while shutting down executor services. Shutting all " +
-						"remaining ExecutorServices down now.", e);
-					executorService.shutdownNow();
-
-					wasInterrupted = true;
-
-					Thread.currentThread().interrupt();
-				}
-
-				timeLeft = endTime - System.currentTimeMillis();
-				hasTimeLeft = timeLeft > 0L;
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index a24daf0..7258e52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -47,6 +46,7 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.Preconditions;
 
 import akka.actor.ActorSystem;
@@ -88,7 +88,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
 	private final MetricRegistryImpl metricRegistry;
 
-	/** Executor used to run future callbacks */
+	/** Executor used to run future callbacks. */
 	private final ExecutorService executor;
 
 	private final TaskExecutor taskManager;
@@ -165,7 +165,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 				exception = ExceptionUtils.firstOrSuppressed(e, exception);
 			}
 
-			Executors.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor);
+			ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor);
 
 			if (exception != null) {
 				throw exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7799a78..5f82159 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.clusterframework.{BootstrapTools, FlinkResourceM
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter, Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
@@ -85,7 +85,7 @@ import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.{FlinkException, InstantiationUtil, NetUtils, SerializedThrowable}
+import org.apache.flink.util._
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -2060,7 +2060,7 @@ object JobManager {
         LOG.warn("Could not properly shut down the metric registry.", t)
     }
 
-    FlinkExecutors.gracefulShutdown(
+    ExecutorUtils.gracefulShutdown(
       timeout.toMillis,
       TimeUnit.MILLISECONDS,
       futureExecutor,

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 227b854..5554061 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -32,7 +32,7 @@ import org.apache.flink.configuration._
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
-import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter, Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter}
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders
 import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
@@ -44,7 +44,7 @@ import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegi
 import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
 import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
-import org.apache.flink.util.NetUtils
+import org.apache.flink.util.{ExecutorUtils, NetUtils}
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -440,7 +440,7 @@ abstract class FlinkMiniCluster(
 
     isRunning = false
 
-    FlinkExecutors.gracefulShutdown(
+    ExecutorUtils.gracefulShutdown(
       timeout.toMillis,
       TimeUnit.MILLISECONDS,
       futureExecutor,

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 97942ea..79e38df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
@@ -64,7 +65,7 @@ public class SlotProtocolTest extends TestLogger {
 
 	@AfterClass
 	public static void afterClass() {
-		Executors.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, scheduledExecutorService);
+		ExecutorUtils.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, scheduledExecutorService);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
index 1a8ea84..ecadaa5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -233,7 +233,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
 
 			verify(jobManagerGateway, times(1)).requestJob(eq(jobId), any(Time.class));
 		} finally {
-			Executors.gracefulShutdown(5000L, TimeUnit.MILLISECONDS, executor);
+			ExecutorUtils.gracefulShutdown(5000L, TimeUnit.MILLISECONDS, executor);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/74d052bb/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 3bdc2ac..279981a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -472,7 +473,7 @@ public class YarnApplicationMasterRunner {
 			}
 		}
 
-		org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
+		ExecutorUtils.gracefulShutdown(
 			AkkaUtils.getTimeout(config).toMillis(),
 			TimeUnit.MILLISECONDS,
 			futureExecutor,