You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/11 11:48:58 UTC

[1/4] flink git commit: [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway

Repository: flink
Updated Branches:
  refs/heads/master 00d5b6222 -> 9f790d3ef


http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
new file mode 100644
index 0000000..e608aa0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.util.Preconditions;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * JobManagerRetriever implementation for Flip-6 JobManager.
+ */
+public class RpcJobManagerRetriever extends JobManagerRetriever {
+
+	private final RpcService rpcService;
+
+	public RpcJobManagerRetriever(
+		RpcService rpcService) {
+
+		this.rpcService = Preconditions.checkNotNull(rpcService);
+	}
+
+	@Override
+	protected CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception {
+		return rpcService.connect(leaderAddress, JobManagerGateway.class);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e490b48..1616a7b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -82,6 +82,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskexecutor.TaskExecutor
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
+import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.{InstantiationUtil, NetUtils, SerializedThrowable}
@@ -2219,12 +2220,17 @@ object JobManager {
       if (configuration.getInteger(WebOptions.PORT, 0) >= 0) {
         LOG.info("Starting JobManager web frontend")
 
+        val timeout = FutureUtils.toTime(AkkaUtils.getTimeout(configuration))
+
         // start the web frontend. we need to load this dynamically
         // because it is not in the same project/dependencies
         val webServer = WebMonitorUtils.startWebRuntimeMonitor(
           configuration,
           highAvailabilityServices,
-          jobManagerSystem)
+          new AkkaJobManagerRetriever(jobManagerSystem, timeout),
+          new AkkaQueryServiceRetriever(jobManagerSystem, timeout),
+          timeout,
+          jobManagerSystem.dispatcher)
 
         Option(webServer)
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index bc323cc..831c026 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -32,7 +32,7 @@ import org.apache.flink.configuration._
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
-import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader
 import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
+import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.util.NetUtils
 import org.slf4j.LoggerFactory
@@ -389,6 +390,8 @@ abstract class FlinkMiniCluster(
       config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
         config.getInteger(WebOptions.PORT, 0) >= 0) {
 
+      val flinkTimeout = FutureUtils.toTime(timeout)
+
       LOG.info("Starting JobManger web frontend")
       // start the new web frontend. we need to load this dynamically
       // because it is not in the same project/dependencies
@@ -396,7 +399,10 @@ abstract class FlinkMiniCluster(
         WebMonitorUtils.startWebRuntimeMonitor(
           config,
           highAvailabilityServices,
-          actorSystem)
+          new AkkaJobManagerRetriever(actorSystem, flinkTimeout),
+          new AkkaQueryServiceRetriever(actorSystem, flinkTimeout),
+          flinkTimeout,
+          actorSystem.dispatcher)
       )
 
       webServer.foreach(_.start(jobManagerAkkaURL))

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 858bbbb..ddbb82d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -62,6 +62,8 @@ object TestingUtils {
 
   val TESTING_TIMEOUT = 1 minute
 
+  val TIMEOUT = Time.minutes(1L)
+
   val DEFAULT_AKKA_ASK_TIMEOUT = "200 s"
 
   def getDefaultTestingActorSystemConfigString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 88cc585..9130901 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -44,6 +45,8 @@ import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -358,11 +361,16 @@ public class YarnApplicationMasterRunner {
 			// 2: the web monitor
 			LOG.debug("Starting Web Frontend");
 
+			Time webMonitorTimeout = Time.milliseconds(config.getLong(WebOptions.TIMEOUT));
+
 			webMonitor = BootstrapTools.startWebMonitorIfConfigured(
 				config,
 				highAvailabilityServices,
-				actorSystem,
-				jobManager,
+				new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout),
+				new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
+				webMonitorTimeout,
+				futureExecutor,
+				AkkaUtils.getAkkaURL(actorSystem, jobManager),
 				LOG);
 
 			String protocol = "http://";


[3/4] flink git commit: [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index d1aeea4..e2437e6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -45,7 +45,7 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index c8ec689..3526734 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -19,8 +19,9 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
@@ -33,17 +34,23 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler {
 	private static final String JOB_STOPPING_REST_PATH = "/jobs/:jobid/stop";
 	private static final String JOB_STOPPING_YARN_REST_PATH = "/jobs/:jobid/yarn-stop";
 
+	private final Time timeout;
+
+	public JobStoppingHandler(Time timeout) {
+		this.timeout = Preconditions.checkNotNull(timeout);
+	}
+
 	@Override
 	public String[] getPaths() {
 		return new String[]{JOB_STOPPING_REST_PATH, JOB_STOPPING_YARN_REST_PATH};
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		try {
-			JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-			if (jobManager != null) {
-				jobManager.tell(new JobManagerMessages.StopJob(jobid));
+			JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+			if (jobManagerGateway != null) {
+				jobManagerGateway.stopJob(jobId, timeout);
 				return "{}";
 			}
 			else {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 8646df9..079be8f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.webmonitor.NotFoundException;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
 
@@ -40,16 +41,16 @@ public interface RequestHandler {
 	 *
 	 * @param pathParams The map of REST path parameters, decoded by the router.
 	 * @param queryParams The map of query parameters.
-	 * @param jobManager The JobManager actor.
+	 * @param jobManagerGateway to talk to the JobManager.
 	 *
 	 * @return The full http response.
 	 *
 	 * @throws Exception Handlers may forward exceptions. Exceptions of type
-	 *         {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
+	 *         {@link NotFoundException} will cause a HTTP 404
 	 *         response with the exception message, other exceptions will cause a HTTP 500 response
 	 *         with the exception stack trace.
 	 */
-	FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception;
+	FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception;
 
 	/**
 	 * Returns an array of REST URL's under which this handler can be registered.

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 3562874..b7fee2d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -33,13 +33,11 @@ import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -62,7 +60,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
 import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
 import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
 
-import akka.dispatch.Mapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,13 +71,10 @@ import java.net.InetSocketAddress;
 import java.nio.channels.FileChannel;
 import java.util.HashMap;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-
-import scala.Option;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -115,9 +109,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 	/** Indicates which log file should be displayed. */
 	private FileMode fileMode;
 
-	private final ExecutionContextExecutor executor;
-
-	private final Time timeTimeout;
+	private final Executor executor;
 
 	private final BlobView blobView;
 
@@ -129,9 +121,9 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 
 	public TaskManagerLogHandler(
 		JobManagerRetriever retriever,
-		ExecutionContextExecutor executor,
-		scala.concurrent.Future<String> localJobManagerAddressPromise,
-		FiniteDuration timeout,
+		Executor executor,
+		CompletableFuture<String> localJobManagerAddressPromise,
+		Time timeout,
 		FileMode fileMode,
 		Configuration config,
 		boolean httpsEnabled,
@@ -143,8 +135,6 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 		this.fileMode = fileMode;
 
 		this.blobView = Preconditions.checkNotNull(blobView, "blobView");
-
-		timeTimeout = Time.milliseconds(timeout.toMillis());
 	}
 
 	@Override
@@ -162,20 +152,18 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 	 * Response when running with leading JobManager.
 	 */
 	@Override
-	protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed routed, final ActorGateway jobManager) {
+	protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed routed, final JobManagerGateway jobManagerGateway) {
 		if (cache == null) {
-			scala.concurrent.Future<Object> portFuture = jobManager.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
-			scala.concurrent.Future<BlobCache> cacheFuture = portFuture.map(new Mapper<Object, BlobCache>() {
-				@Override
-				public BlobCache checkedApply(Object result) throws IOException {
-					Option<String> hostOption = jobManager.actor().path().address().host();
-					String host = hostOption.isDefined() ? hostOption.get() : "localhost";
-					int port = (int) result;
-					return new BlobCache(new InetSocketAddress(host, port), config, blobView);
-				}
-			}, executor);
-
-			cache = FutureUtils.toJava(cacheFuture);
+			CompletableFuture<Integer> blobPortFuture = jobManagerGateway.requestBlobServerPort(timeout);
+			cache = blobPortFuture.thenApplyAsync(
+				(Integer port) -> {
+					try {
+						return new BlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView);
+					} catch (IOException e) {
+						throw new FlinkFutureException("Could not create BlobCache.", e);
+					}
+				},
+				executor);
 		}
 
 		final String taskManagerID = routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
@@ -185,22 +173,18 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 		if (lastRequestPending.putIfAbsent(taskManagerID, true) == null) {
 			try {
 				InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
-				scala.concurrent.Future<JobManagerMessages.TaskManagerInstance> scalaTaskManagerFuture = jobManager
-					.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout)
-					.mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class));
-
-				CompletableFuture<JobManagerMessages.TaskManagerInstance> taskManagerFuture = FutureUtils.toJava(scalaTaskManagerFuture);
+				CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
 
 				CompletableFuture<BlobKey> blobKeyFuture = taskManagerFuture.thenCompose(
-					taskManagerInstance -> {
-						Instance taskManager = taskManagerInstance.instance().get();
-
+					(Optional<Instance> optTMInstance) -> {
+						Instance taskManagerInstance = optTMInstance.orElseThrow(
+							() -> new FlinkFutureException("Could not find instance with " + instanceID + '.'));
 						switch (fileMode) {
 							case LOG:
-								return taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout);
+								return taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout);
 							case STDOUT:
 							default:
-								return taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout);
+								return taskManagerInstance.getTaskManagerGateway().requestTaskManagerStdout(timeout);
 						}
 					}
 				);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index 6ad490e..a8ab7a3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers;
-import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 import org.apache.flink.util.StringUtils;
@@ -32,12 +30,12 @@ import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.Objects.requireNonNull;
 
@@ -51,11 +49,11 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 
 	public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
 	private final MetricFetcher fetcher;
 
-	public TaskManagersHandler(FiniteDuration timeout, MetricFetcher fetcher) {
+	public TaskManagersHandler(Time timeout, MetricFetcher fetcher) {
 		this.timeout = requireNonNull(timeout);
 		this.fetcher = fetcher;
 	}
@@ -66,9 +64,9 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		try {
-			if (jobManager != null) {
+			if (jobManagerGateway != null) {
 				// whether one task manager's metrics are requested, or all task manager, we
 				// return them in an array. This avoids unnecessary code complexity.
 				// If only one task manager is requested, we only fetch one task manager metrics.
@@ -76,20 +74,21 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 				if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
 					try {
 						InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
-						Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
-						TaskManagerInstance instance = (TaskManagerInstance) Await.result(future, timeout);
-						if (instance.instance().nonEmpty()) {
-							instances.add(instance.instance().get());
-						}
+						CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
+
+						Optional<Instance> instance = tmInstanceFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+						instance.ifPresent(instances::add);
 					}
 					// this means the id string was invalid. Keep the list empty.
 					catch (IllegalArgumentException e){
 						// do nothing.
 					}
 				} else {
-					Future<Object> future = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
-					RegisteredTaskManagers taskManagers = (RegisteredTaskManagers) Await.result(future, timeout);
-					instances.addAll(taskManagers.asJavaCollection());
+					CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+					Collection<Instance> tmInstances = tmInstancesFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+					instances.addAll(tmInstances);
 				}
 
 				StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index f96e0c2..d116c56 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractJobVertexRequestHandler;
@@ -71,8 +71,8 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
 	public String handleJsonRequest(
 		Map<String, String> pathParams,
 		Map<String, String> queryParams,
-		ActorGateway jobManager) throws Exception {
-		return super.handleJsonRequest(pathParams, queryParams, jobManager);
+		JobManagerGateway jobManagerGateway) throws Exception {
+		return super.handleJsonRequest(pathParams, queryParams, jobManagerGateway);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index 94b135d..b95f2c4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
 import org.apache.flink.util.Preconditions;
@@ -48,7 +48,7 @@ public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		fetcher.update();
 		String requestedMetricsList = queryParams.get("get");
 		return requestedMetricsList != null

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 95398b5..3af9c56 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -18,37 +18,29 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.Preconditions;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.OnFailure;
-import akka.dispatch.OnSuccess;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
 
@@ -61,20 +53,25 @@ import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr
 public class MetricFetcher {
 	private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
 
-	private final ActorSystem actorSystem;
 	private final JobManagerRetriever retriever;
-	private final ExecutionContext ctx;
-	private final FiniteDuration timeout = new FiniteDuration(Duration.create(AkkaOptions.ASK_TIMEOUT.defaultValue()).toMillis(), TimeUnit.MILLISECONDS);
+	private final MetricQueryServiceRetriever queryServiceRetriever;
+	private final Executor executor;
+	private final Time timeout;
 
 	private MetricStore metrics = new MetricStore();
 	private MetricDumpDeserializer deserializer = new MetricDumpDeserializer();
 
 	private long lastUpdateTime;
 
-	public MetricFetcher(ActorSystem actorSystem, JobManagerRetriever retriever, ExecutionContext ctx) {
-		this.actorSystem = Preconditions.checkNotNull(actorSystem);
+	public MetricFetcher(
+			JobManagerRetriever retriever,
+			MetricQueryServiceRetriever queryServiceRetriever,
+			Executor executor,
+			Time timeout) {
 		this.retriever = Preconditions.checkNotNull(retriever);
-		this.ctx = Preconditions.checkNotNull(ctx);
+		this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever);
+		this.executor = Preconditions.checkNotNull(executor);
+		this.timeout = Preconditions.checkNotNull(timeout);
 	}
 
 	/**
@@ -101,38 +98,38 @@ public class MetricFetcher {
 
 	private void fetchMetrics() {
 		try {
-			Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
-			if (jobManagerGatewayAndWebPort.isDefined()) {
-				ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1();
+			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
+			if (optJobManagerGateway.isPresent()) {
+				final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
 
 				/**
 				 * Remove all metrics that belong to a job that is not running and no longer archived.
 				 */
-				Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout);
-				jobDetailsFuture
-					.onSuccess(new OnSuccess<Object>() {
-						@Override
-						public void onSuccess(Object result) throws Throwable {
-							MultipleJobsDetails details = (MultipleJobsDetails) result;
+				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
+
+				jobDetailsFuture.whenCompleteAsync(
+					(MultipleJobsDetails jobDetails, Throwable throwable) -> {
+						if (throwable != null) {
+							LOG.debug("Fetching of JobDetails failed.", throwable);
+						} else {
 							ArrayList<String> toRetain = new ArrayList<>();
-							for (JobDetails job : details.getRunningJobs()) {
+							for (JobDetails job : jobDetails.getRunningJobs()) {
 								toRetain.add(job.getJobId().toString());
 							}
-							for (JobDetails job : details.getFinishedJobs()) {
+							for (JobDetails job : jobDetails.getFinishedJobs()) {
 								toRetain.add(job.getJobId().toString());
 							}
 							synchronized (metrics) {
 								metrics.jobs.keySet().retainAll(toRetain);
 							}
 						}
-					}, ctx);
-				logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed.");
+					},
+					executor);
 
-				String jobManagerPath = jobManager.path();
-				String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
-				ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath);
+				String jobManagerPath = jobManagerGateway.getAddress();
+				String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
 
-				queryMetrics(jobManagerQueryService);
+				retrieveAndQueryMetrics(jmQueryServicePath);
 
 				/**
 				 * We first request the list of all registered task managers from the job manager, and then
@@ -140,88 +137,75 @@ public class MetricFetcher {
 				 *
 				 * <p>All stored metrics that do not belong to a registered task manager will be removed.
 				 */
-				Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
-				registeredTaskManagersFuture
-					.onSuccess(new OnSuccess<Object>() {
-						@Override
-						public void onSuccess(Object result) throws Throwable {
-							Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable();
-							List<String> activeTaskManagers = new ArrayList<>();
-							for (Instance taskManager : taskManagers) {
-								activeTaskManagers.add(taskManager.getId().toString());
-
-								String taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
-								String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString();
-								ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath);
-
-								queryMetrics(taskManagerQueryService);
-							}
-							synchronized (metrics) { // remove all metrics belonging to unregistered task managers
+				CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+				taskManagersFuture.whenCompleteAsync(
+					(Collection<Instance> taskManagers, Throwable throwable) -> {
+						if (throwable != null) {
+							LOG.debug("Fetching list of registered TaskManagers failed.", throwable);
+						} else {
+							List<String> activeTaskManagers = taskManagers.stream().map(
+								taskManagerInstance -> {
+									final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress();
+									final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString();
+
+									retrieveAndQueryMetrics(tmQueryServicePath);
+
+									return taskManagerInstance.getId().toString();
+								}).collect(Collectors.toList());
+
+							synchronized (metrics) {
 								metrics.taskManagers.keySet().retainAll(activeTaskManagers);
 							}
 						}
-					}, ctx);
-				logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed.");
+					},
+					executor);
 			}
 		} catch (Exception e) {
 			LOG.warn("Exception while fetching metrics.", e);
 		}
 	}
 
-	private void logErrorOnFailure(Future<Object> future, final String message) {
-		future.onFailure(new OnFailure() {
-			@Override
-			public void onFailure(Throwable failure) throws Throwable {
-				LOG.debug(message, failure);
-			}
-		}, ctx);
-	}
-
 	/**
-	 * Requests a metric dump from the given actor.
+	 * Retrieves and queries the specified QueryServiceGateway.
 	 *
-	 * @param actor ActorRef to request the dump from
-     */
-	private void queryMetrics(ActorRef actor) {
-		Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
-		metricQueryFuture
-			.onSuccess(new OnSuccess<Object>() {
-				@Override
-				public void onSuccess(Object result) throws Throwable {
-					addMetrics(result);
+	 * @param queryServicePath specifying the QueryServiceGateway
+	 */
+	private void retrieveAndQueryMetrics(String queryServicePath) {
+		final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
+
+		queryServiceGatewayFuture.whenCompleteAsync(
+			(MetricQueryServiceGateway queryServiceGateway, Throwable t) -> {
+				if (t != null) {
+					LOG.debug("Could not retrieve QueryServiceGateway.", t);
+				} else {
+					queryMetrics(queryServiceGateway);
 				}
-			}, ctx);
-		logErrorOnFailure(metricQueryFuture, "Fetching metrics failed.");
-	}
-
-	private void addMetrics(Object result) {
-		MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result;
-		List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
-		for (MetricDump metric : dumpedMetrics) {
-			metrics.add(metric);
-		}
+			},
+			executor);
 	}
 
 	/**
-	 * Helper class that allows mocking of the answer.
-     */
-	static class BasicGateway {
-		private final ActorRef actor;
-
-		private BasicGateway(ActorRef actor) {
-			this.actor = actor;
-		}
-
-		/**
-		 * Sends a message asynchronously and returns its response. The response to the message is
-		 * returned as a future.
-		 *
-		 * @param message Message to be sent
-		 * @param timeout Timeout until the Future is completed with an AskTimeoutException
-		 * @return Future which contains the response to the sent message
-		 */
-		public Future<Object> ask(Object message, FiniteDuration timeout) {
-			return Patterns.ask(actor, message, new Timeout(timeout));
-		}
+	 * Query the metrics from the given QueryServiceGateway.
+	 *
+	 * @param queryServiceGateway to query for metrics
+	 */
+	private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
+		queryServiceGateway
+			.queryMetrics(timeout)
+			.whenCompleteAsync(
+				(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
+					if (t != null) {
+						LOG.debug("Fetching metrics failed.", t);
+					} else {
+						List<MetricDump> dumpedMetrics = deserializer.deserialize(result);
+						synchronized (metrics) {
+							for (MetricDump metric : dumpedMetrics) {
+								metrics.add(metric);
+							}
+						}
+					}
+				},
+				executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 7cd2932..b3ce135 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -158,25 +158,20 @@ public class WebFrontendITCase extends TestLogger {
 	}
 
 	@Test
-	public void getTaskmanagers() {
-		try {
-			String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+	public void getTaskmanagers() throws Exception {
+		String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
 
-			ObjectMapper mapper = new ObjectMapper();
-			JsonNode parsed = mapper.readTree(json);
-			ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode parsed = mapper.readTree(json);
+		ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
 
-			assertNotNull(taskManagers);
-			assertEquals(cluster.numTaskManagers(), taskManagers.size());
+		assertNotNull(taskManagers);
+		assertEquals(cluster.numTaskManagers(), taskManagers.size());
 
-			JsonNode taskManager = taskManagers.get(0);
-			assertNotNull(taskManager);
-			assertEquals(NUM_SLOTS, taskManager.get("slotsNumber").asInt());
-			assertTrue(taskManager.get("freeSlots").asInt() <= NUM_SLOTS);
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		JsonNode taskManager = taskManagers.get(0);
+		assertNotNull(taskManager);
+		assertEquals(NUM_SLOTS, taskManager.get("slotsNumber").asInt());
+		assertTrue(taskManager.get("freeSlots").asInt() <= NUM_SLOTS);
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index fe16445..5829d1c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.WebOptions;
@@ -27,12 +28,15 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
 import org.apache.flink.util.TestLogger;
 
@@ -44,11 +48,12 @@ import org.apache.curator.test.TestingServer;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.powermock.reflect.Whitebox;
 
 import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Scanner;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -71,7 +76,9 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-	private static final FiniteDuration TestTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
+	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(2L, TimeUnit.MINUTES);
+
+	private static final Time TIMEOUT = Time.milliseconds(TEST_TIMEOUT.toMillis());
 
 	private final String mainResourcesPath = getClass().getResource("/web").getPath();
 
@@ -80,7 +87,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 	 */
 	@Test
 	public void testStandaloneWebRuntimeMonitor() throws Exception {
-		final Deadline deadline = TestTimeout.fromNow();
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
 		TestingCluster flink = null;
 		WebRuntimeMonitor webMonitor = null;
@@ -89,7 +96,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			// Flink w/o a web monitor
 			flink = new TestingCluster(new Configuration());
 			flink.start(true);
-			webMonitor = startWebRuntimeMonitor(flink);
+			webMonitor = startWebRuntimeMonitor(flink, TIMEOUT);
 
 			try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
 				String expected = new Scanner(new File(mainResourcesPath + "/index.html"))
@@ -129,10 +136,11 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 	 */
 	@Test
 	public void testRedirectToLeader() throws Exception {
-		final Deadline deadline = TestTimeout.fromNow();
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
 		ActorSystem[] jobManagerSystem = new ActorSystem[2];
 		WebRuntimeMonitor[] webMonitor = new WebRuntimeMonitor[2];
+		AkkaJobManagerRetriever[] jobManagerRetrievers = new AkkaJobManagerRetriever[2];
 		HighAvailabilityServices highAvailabilityServices = null;
 
 		try (TestingServer zooKeeper = new TestingServer()) {
@@ -157,11 +165,16 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			}
 
 			for (int i = 0; i < webMonitor.length; i++) {
+				jobManagerRetrievers[i] = new AkkaJobManagerRetriever(jobManagerSystem[i], TIMEOUT);
+
 				webMonitor[i] = new WebRuntimeMonitor(
 					config,
 					highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
 					highAvailabilityServices.createBlobStore(),
-					jobManagerSystem[i]);
+					jobManagerRetrievers[i],
+					new AkkaQueryServiceRetriever(jobManagerSystem[i], TIMEOUT),
+					TIMEOUT,
+					TestingUtils.defaultExecutor());
 			}
 
 			ActorRef[] jobManager = new ActorRef[2];
@@ -196,22 +209,18 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			int followerIndex = (leaderIndex + 1) % 2;
 
 			ActorSystem leadingSystem = jobManagerSystem[leaderIndex];
-			ActorSystem followerSystem = jobManagerSystem[followerIndex];
 
 			WebMonitor leadingWebMonitor = webMonitor[leaderIndex];
 			WebMonitor followerWebMonitor = webMonitor[followerIndex];
 
 			// For test stability reason we have to wait until we are sure that both leader
 			// listeners have been notified.
-			JobManagerRetriever leadingRetriever = Whitebox
-					.getInternalState(leadingWebMonitor, "retriever");
-
-			JobManagerRetriever followerRetriever = Whitebox
-					.getInternalState(followerWebMonitor, "retriever");
+			AkkaJobManagerRetriever leadingRetriever = jobManagerRetrievers[leaderIndex];
+			AkkaJobManagerRetriever followerRetriever = jobManagerRetrievers[followerIndex];
 
 			// Wait for the initial notifications
-			waitForLeaderNotification(leadingSystem, jobManager[leaderIndex], leadingRetriever, deadline);
-			waitForLeaderNotification(leadingSystem, jobManager[leaderIndex], followerRetriever, deadline);
+			waitForLeaderNotification(jobManager[leaderIndex].path().toString(), leadingRetriever, deadline);
+			waitForLeaderNotification(AkkaUtils.getAkkaURL(leadingSystem, jobManager[leaderIndex]), followerRetriever, deadline);
 
 			try (
 					HttpTestClient leaderClient = new HttpTestClient(
@@ -241,7 +250,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				leadingSystem.shutdown();
 
 				// Wait for the notification of the follower
-				waitForLeaderNotification(followerSystem, jobManager[followerIndex], followerRetriever, deadline);
+				waitForLeaderNotification(jobManager[followerIndex].path().toString(), followerRetriever, deadline);
 
 				// Same request to the new leader
 				followingClient.sendGetRequest("index.html", deadline.timeLeft());
@@ -282,7 +291,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 
 	@Test
 	public void testLeaderNotAvailable() throws Exception {
-		final Deadline deadline = TestTimeout.fromNow();
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
 		ActorSystem actorSystem = null;
 		WebRuntimeMonitor webRuntimeMonitor = null;
@@ -305,7 +314,10 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				config,
 				mock(LeaderRetrievalService.class),
 				mock(BlobView.class),
-				actorSystem);
+				new AkkaJobManagerRetriever(actorSystem, TIMEOUT),
+				new AkkaQueryServiceRetriever(actorSystem, TIMEOUT),
+				TIMEOUT,
+				TestingUtils.defaultExecutor());
 
 			webRuntimeMonitor.start("akka://schmakka");
 
@@ -343,7 +355,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 	 */
 	@Test
 	public void testNoEscape() throws Exception {
-		final Deadline deadline = TestTimeout.fromNow();
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
 		TestingCluster flink = null;
 		WebRuntimeMonitor webMonitor = null;
@@ -351,7 +363,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		try {
 			flink = new TestingCluster(new Configuration());
 			flink.start(true);
-			webMonitor = startWebRuntimeMonitor(flink);
+			webMonitor = startWebRuntimeMonitor(flink, TIMEOUT);
 
 			try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
 				String expectedIndex = new Scanner(new File(mainResourcesPath + "/index.html"))
@@ -405,7 +417,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 	 */
 	@Test
 	public void testNoCopyFromJar() throws Exception {
-		final Deadline deadline = TestTimeout.fromNow();
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
 		TestingCluster flink = null;
 		WebRuntimeMonitor webMonitor = null;
@@ -413,7 +425,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		try {
 			flink = new TestingCluster(new Configuration());
 			flink.start(true);
-			webMonitor = startWebRuntimeMonitor(flink);
+			webMonitor = startWebRuntimeMonitor(flink, TIMEOUT);
 
 			try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
 				String expectedIndex = new Scanner(new File(mainResourcesPath + "/index.html"))
@@ -459,7 +471,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 	}
 
 	private WebRuntimeMonitor startWebRuntimeMonitor(
-		TestingCluster flink) throws Exception {
+			TestingCluster flink,
+			Time timeout) throws Exception {
 
 		ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
 		ActorRef jmActor = flink.jobManagerActors().get().head();
@@ -482,7 +495,10 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			config,
 			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
 			highAvailabilityServices.createBlobStore(),
-			jmActorSystem);
+			new AkkaJobManagerRetriever(jmActorSystem, timeout),
+			new AkkaQueryServiceRetriever(jmActorSystem, timeout),
+			timeout,
+			TestingUtils.defaultExecutor());
 
 		webMonitor.start(jobManagerAddress);
 		flink.waitForActorsToBeAlive();
@@ -492,17 +508,14 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	private void waitForLeaderNotification(
-			ActorSystem system,
-			ActorRef expectedLeader,
-			JobManagerRetriever retriever,
+			String expectedJobManagerURL,
+			AkkaJobManagerRetriever retriever,
 			Deadline deadline) throws Exception {
 
-		String expectedJobManagerUrl = AkkaUtils.getAkkaURL(system, expectedLeader);
-
 		while (deadline.hasTimeLeft()) {
-			ActorRef leaderRef = retriever.awaitJobManagerGatewayAndWebPort()._1().actor();
+			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
 
-			if (AkkaUtils.getAkkaURL(system, leaderRef).equals(expectedJobManagerUrl)) {
+			if (optJobManagerGateway.isPresent() && Objects.equals(expectedJobManagerURL, optJobManagerGateway.get().getAddress())) {
 				return;
 			}
 			else {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
index 19e8a49..865385f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
@@ -18,20 +18,18 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
+
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
-
 /**
  * Tests for the ClusterOverviewHandler.
  */
 public class ClusterOverviewHandlerTest {
 	@Test
 	public void testGetPaths() {
-		ClusterOverviewHandler handler = new ClusterOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS));
+		ClusterOverviewHandler handler = new ClusterOverviewHandler(Time.seconds(0L));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/overview", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
index e108774..ea26f5d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
@@ -18,20 +18,18 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
+
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
-
 /**
  * Tests for the CurrentJobIdsHandler.
  */
 public class CurrentJobIdsHandlerTest {
 	@Test
 	public void testGetPaths() {
-		CurrentJobIdsHandler handler = new CurrentJobIdsHandler(new FiniteDuration(0, TimeUnit.SECONDS));
+		CurrentJobIdsHandler handler = new CurrentJobIdsHandler(Time.seconds(0L));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
index 9f3d362..64360d3 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -35,9 +36,6 @@ import org.junit.Test;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Tests for the CurrentJobsOverviewHandler.
@@ -68,17 +66,17 @@ public class CurrentJobsOverviewHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, true);
+		CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Time.seconds(0L), true, true);
 		String[] pathsAll = handlerAll.getPaths();
 		Assert.assertEquals(1, pathsAll.length);
 		Assert.assertEquals("/joboverview", pathsAll[0]);
 
-		CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, false);
+		CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Time.seconds(0L), true, false);
 		String[] pathsRunning = handlerRunning.getPaths();
 		Assert.assertEquals(1, pathsRunning.length);
 		Assert.assertEquals("/joboverview/running", pathsRunning[0]);
 
-		CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), false, true);
+		CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Time.seconds(0L), false, true);
 		String[] pathsCompleted = handlerCompleted.getPaths();
 		Assert.assertEquals(1, pathsCompleted.length);
 		Assert.assertEquals("/joboverview/completed", pathsCompleted[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
index 4ddddca..ac8d934 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
@@ -18,41 +18,54 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import scala.Tuple2;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the HandlerRedirectUtils.
  */
-public class HandlerRedirectUtilsTest {
+public class HandlerRedirectUtilsTest extends TestLogger {
 
 	private static final String localJobManagerAddress = "akka.tcp://flink@127.0.0.1:1234/user/foobar";
-	private static final String remoteURL = "127.0.0.2:1235";
+	private static final String remoteHostname = "127.0.0.2";
+	private static final int webPort = 1235;
+	private static final String remoteURL = remoteHostname + ':' + webPort;
 	private static final String remotePath = "akka.tcp://flink@" + remoteURL + "/user/jobmanager";
 
 	@Test
 	public void testGetRedirectAddressWithLocalAkkaPath() throws Exception {
-		ActorGateway leaderGateway = new DummyActorGateway("akka://flink/user/foobar");
-
-		Tuple2<ActorGateway, Integer> leader = new Tuple2<>(leaderGateway, 1235);
+		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
+		when(jobManagerGateway.getAddress()).thenReturn("akka://flink/user/foobar");
 
-		String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(localJobManagerAddress, leader);
+		String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
+			localJobManagerAddress,
+			jobManagerGateway,
+			Time.seconds(3L));
 
 		Assert.assertNull(redirectingAddress);
 	}
 
 	@Test
 	public void testGetRedirectAddressWithRemoteAkkaPath() throws Exception {
-		ActorGateway leaderGateway = new DummyActorGateway(remotePath);
-
-		Tuple2<ActorGateway, Integer> leader = new Tuple2<>(leaderGateway, 1235);
+		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
+		when(jobManagerGateway.getAddress()).thenReturn(remotePath);
+		when(jobManagerGateway.getHostname()).thenReturn(remoteHostname);
+		when(jobManagerGateway.requestWebPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(webPort));
 
-		String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(localJobManagerAddress, leader);
+		String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
+			localJobManagerAddress,
+			jobManagerGateway,
+			Time.seconds(3L));
 
 		Assert.assertEquals(remoteURL, redirectingAddress);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index fcbfa02..82aa87a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -27,7 +30,7 @@ import org.junit.Test;
 public class JarRunHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JarRunHandler handler = new JarRunHandler(null, null, null);
+		JarRunHandler handler = new JarRunHandler(null, Time.seconds(0L), new Configuration());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jars/:jarid/run", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
index 25fca9b..fe55f51 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -31,6 +32,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobAccumulatorsHandler.
  */
@@ -51,7 +54,7 @@ public class JobAccumulatorsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobAccumulatorsHandler handler = new JobAccumulatorsHandler(null);
+		JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
index ed54000..98d9353 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
 import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Test;
@@ -30,7 +32,7 @@ import java.util.List;
 public class JobCancellationHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JobCancellationHandler handler = new JobCancellationHandler();
+		JobCancellationHandler handler = new JobCancellationHandler(TestingUtils.TIMEOUT());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(2, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
index 64a07c8..b48ee66 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
@@ -19,21 +19,20 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
-import akka.dispatch.ExecutionContexts$;
-import akka.dispatch.Futures;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.junit.Assert;
@@ -45,15 +44,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -62,13 +60,13 @@ import static org.mockito.Mockito.when;
 /**
  * Tests for the JobCancellationWithSavepointHandler.
  */
-public class JobCancellationWithSavepointHandlersTest {
+public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 
-	private static final ExecutionContext EC = ExecutionContexts$.MODULE$.fromExecutor(Executors.directExecutor());
+	private static final Executor executor = Executors.directExecutor();
 
 	@Test
 	public void testGetPaths() {
-		JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), EC);
+		JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), executor);
 
 		JobCancellationWithSavepointHandlers.TriggerHandler triggerHandler = handler.getTriggerHandler();
 		String[] triggerPaths = triggerHandler.getPaths();
@@ -94,25 +92,23 @@ public class JobCancellationWithSavepointHandlersTest {
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
 		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph);
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
 		when(graph.getCheckpointCoordinator()).thenReturn(coord);
 		when(coord.getCheckpointTimeout()).thenReturn(timeout);
 
-		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC);
+		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
 		JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
 
 		Map<String, String> params = new HashMap<>();
 		params.put("jobid", jobId.toString());
 		params.put("targetDirectory", "placeholder");
 
-		ActorGateway jobManager = mock(ActorGateway.class);
-
-		Future<Object> future = Futures.successful((Object) new CancellationSuccess(jobId, null));
-		when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future);
+		JobManagerGateway jobManager = mock(JobManagerGateway.class);
+		when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
 
-		handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		handler.handleRequest(params, Collections.emptyMap(), jobManager);
 
-		verify(jobManager).ask(any(CancelJobWithSavepoint.class), eq(FiniteDuration.apply(timeout, "ms")));
+		verify(jobManager).cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class));
 	}
 
 	/**
@@ -125,36 +121,34 @@ public class JobCancellationWithSavepointHandlersTest {
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
 		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph);
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
 		when(graph.getCheckpointCoordinator()).thenReturn(coord);
 		when(coord.getCheckpointTimeout()).thenReturn(timeout);
 
-		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC, "the-default-directory");
+		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory");
 		JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
 
 		Map<String, String> params = new HashMap<>();
 		params.put("jobid", jobId.toString());
 
-		ActorGateway jobManager = mock(ActorGateway.class);
-
-		Future<Object> future = Futures.successful((Object) new CancellationSuccess(jobId, null));
-		when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future);
+		JobManagerGateway jobManager = mock(JobManagerGateway.class);
+		when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
 
 		// 1. Use targetDirectory path param
 		params.put("targetDirectory", "custom-directory");
 		handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
 
-		verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), eq(FiniteDuration.apply(timeout, "ms")));
+		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
 
 		// 2. Use default
 		params.remove("targetDirectory");
 
 		handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
 
-		verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "the-default-directory")), eq(FiniteDuration.apply(timeout, "ms")));
+		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("the-default-directory"), any(Time.class));
 
 		// 3. Throw Exception
-		handlers = new JobCancellationWithSavepointHandlers(holder, EC, null);
+		handlers = new JobCancellationWithSavepointHandlers(holder, executor, null);
 		handler = handlers.getTriggerHandler();
 
 		try {
@@ -175,10 +169,10 @@ public class JobCancellationWithSavepointHandlersTest {
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
 		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph);
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
 		when(graph.getCheckpointCoordinator()).thenReturn(coord);
 
-		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC);
+		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
 		JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();
 		JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler();
 
@@ -186,16 +180,16 @@ public class JobCancellationWithSavepointHandlersTest {
 		params.put("jobid", jobId.toString());
 		params.put("targetDirectory", "custom-directory");
 
-		ActorGateway jobManager = mock(ActorGateway.class);
+		JobManagerGateway jobManager = mock(JobManagerGateway.class);
 
 		// Successful
-		Promise<Object> promise = new Promise.DefaultPromise<>();
-		when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(promise);
+		CompletableFuture<String> successfulCancelWithSavepoint = new CompletableFuture<>();
+		when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(successfulCancelWithSavepoint);
 
 		// Trigger
 		FullHttpResponse response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
 
-		verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class));
+		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
 
 		String location = String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId);
 
@@ -226,7 +220,7 @@ public class JobCancellationWithSavepointHandlersTest {
 		assertEquals(location, root.get("location").asText());
 
 		// Only single actual request
-		verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class));
+		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
 
 		// Query progress
 		params.put("requestId", "1");
@@ -243,7 +237,7 @@ public class JobCancellationWithSavepointHandlersTest {
 		assertEquals("1", root.get("request-id").asText());
 
 		// Complete
-		promise.success(new CancellationSuccess(jobId, "_path-savepoint_"));
+		successfulCancelWithSavepoint.complete("_path-savepoint_");
 
 		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
 
@@ -301,10 +295,10 @@ public class JobCancellationWithSavepointHandlersTest {
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
 		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph);
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
 		when(graph.getCheckpointCoordinator()).thenReturn(coord);
 
-		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC);
+		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
 		JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();
 		JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler();
 
@@ -312,15 +306,15 @@ public class JobCancellationWithSavepointHandlersTest {
 		params.put("jobid", jobId.toString());
 		params.put("targetDirectory", "custom-directory");
 
-		ActorGateway jobManager = mock(ActorGateway.class);
+		JobManagerGateway jobManager = mock(JobManagerGateway.class);
 
 		// Successful
-		Future<Object> future = Futures.failed(new Exception("Test Exception"));
-		when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future);
+		CompletableFuture<String> unsuccessfulCancelWithSavepoint = FutureUtils.completedExceptionally(new Exception("Test Exception"));
+		when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(unsuccessfulCancelWithSavepoint);
 
 		// Trigger
 		trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
-		verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class));
+		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
 
 		// Query progress
 		params.put("requestId", "1");

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
index ad9da6b..104b0a3 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -32,6 +33,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobConfigHandler.
  */
@@ -52,7 +55,7 @@ public class JobConfigHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobConfigHandler handler = new JobConfigHandler(null);
+		JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/config", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
index d830707..f3f5943 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -39,6 +40,8 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobDetailsHandler.
  */
@@ -64,7 +67,7 @@ public class JobDetailsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobDetailsHandler handler = new JobDetailsHandler(null, null);
+		JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(2, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
index 6016d01..f54ab06 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -35,6 +36,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobExceptionsHandler.
  */
@@ -55,7 +58,7 @@ public class JobExceptionsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobExceptionsHandler handler = new JobExceptionsHandler(null);
+		JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
index a5ea2b3..17b4c44 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -28,6 +29,8 @@ import org.junit.Test;
 
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobPlanHandler.
  */
@@ -48,7 +51,7 @@ public class JobPlanHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobPlanHandler handler = new JobPlanHandler(null);
+		JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/plan", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
index cac0b10..ee47ee9 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
 import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Test;
@@ -27,10 +30,10 @@ import java.util.List;
 /**
  * Tests for the JobStoppingHandler.
  */
-public class JobStoppingHandlerTest {
+public class JobStoppingHandlerTest extends TestLogger {
 	@Test
 	public void testGetPaths() {
-		JobStoppingHandler handler = new JobStoppingHandler();
+		JobStoppingHandler handler = new JobStoppingHandler(TestingUtils.TIMEOUT());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(2, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
index c57aa09..b7af323 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -33,6 +34,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobVertexAccumulatorsHandler.
  */
@@ -54,7 +57,7 @@ public class JobVertexAccumulatorsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(null);
+		JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
index 8985d89..d2ac0d6 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
@@ -46,7 +46,7 @@ import static org.mockito.Mockito.when;
 public class JobVertexBackPressureHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(null, mock(BackPressureStatsTracker.class), 0);
+		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), mock(BackPressureStatsTracker.class), 0);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
index bde6a84..bc4fe9c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -35,6 +36,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobVertexDetailsHandler.
  */
@@ -56,7 +59,7 @@ public class JobVertexDetailsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobVertexDetailsHandler handler = new JobVertexDetailsHandler(null, null);
+		JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
index 8954844..d5d877a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -36,6 +37,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobVertexTaskManagersHandler.
  */
@@ -58,7 +61,7 @@ public class JobVertexTaskManagersHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(null, null);
+		JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);


[2/4] flink git commit: [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
index f419908..d992b85 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -18,16 +18,20 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the SubtaskCurrentAttemptDetailsHandler.
  */
 public class SubtaskCurrentAttemptDetailsHandlerTest {
 	@Test
 	public void testGetPaths() {
-		SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(null, null);
+		SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index 74a19a9..ce8e72f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -33,6 +34,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the SubtaskExecutionAttemptAccumulatorsHandler.
  */
@@ -61,7 +64,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(null);
+		SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
index a9161b3..e1fbf92 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -34,6 +35,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the SubtaskExecutionAttemptDetailsHandler.
  */
@@ -70,7 +73,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(null, null);
+		SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
index 6022be2..f33da80 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -33,6 +34,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the SubtasksAllAccumulatorsHandler.
  */
@@ -55,7 +58,7 @@ public class SubtasksAllAccumulatorsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(null);
+		SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
index 22a2d27..548efaf 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -34,6 +35,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the SubtasksTimesHandler.
  */
@@ -56,7 +59,7 @@ public class SubtasksTimesHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		SubtasksTimesHandler handler = new SubtasksTimesHandler(null);
+		SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
index 5846d75..cf59f05 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -21,17 +21,16 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -46,19 +45,12 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 
-import scala.Option;
-import scala.collection.JavaConverters;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Future$;
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.isA;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -72,9 +64,9 @@ public class TaskManagerLogHandlerTest {
 	public void testGetPaths() {
 		TaskManagerLogHandler handlerLog = new TaskManagerLogHandler(
 			mock(JobManagerRetriever.class),
-			mock(ExecutionContextExecutor.class),
-			Future$.MODULE$.successful("/jm/address"),
-			AkkaUtils.getDefaultClientTimeout(),
+			Executors.directExecutor(),
+			CompletableFuture.completedFuture("/jm/address"),
+			TestingUtils.TIMEOUT(),
 			TaskManagerLogHandler.FileMode.LOG,
 			new Configuration(),
 			false,
@@ -85,9 +77,9 @@ public class TaskManagerLogHandlerTest {
 
 		TaskManagerLogHandler handlerOut = new TaskManagerLogHandler(
 			mock(JobManagerRetriever.class),
-			mock(ExecutionContextExecutor.class),
-			Future$.MODULE$.successful("/jm/address"),
-			AkkaUtils.getDefaultClientTimeout(),
+			Executors.directExecutor(),
+			CompletableFuture.completedFuture("/jm/address"),
+			TestingUtils.TIMEOUT(),
 			TaskManagerLogHandler.FileMode.STDOUT,
 			new Configuration(),
 			false,
@@ -115,27 +107,21 @@ public class TaskManagerLogHandlerTest {
 
 		// ========= setup JobManager ==================================================================================
 
-		ActorGateway jobManagerGateway = mock(ActorGateway.class);
-		Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers(
-			JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
-
-		when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class)))
-			.thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
-		when(jobManagerGateway.ask(isA(JobManagerMessages.getRequestBlobManagerPort().getClass()), any(FiniteDuration.class)))
-			.thenReturn(Future$.MODULE$.successful((Object) 5));
-		when(jobManagerGateway.ask(isA(JobManagerMessages.RequestTaskManagerInstance.class), any(FiniteDuration.class)))
-			.thenReturn(Future$.MODULE$.successful((Object) new JobManagerMessages.TaskManagerInstance(Option.apply(taskManager))));
-		when(jobManagerGateway.path()).thenReturn("/jm/address");
+		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
+		when(jobManagerGateway.requestBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(1337));
+		when(jobManagerGateway.getHostname()).thenReturn("localhost");
+		when(jobManagerGateway.requestTaskManagerInstance(any(InstanceID.class), any(Time.class))).thenReturn(
+			CompletableFuture.completedFuture(Optional.of(taskManager)));
 
 		JobManagerRetriever retriever = mock(JobManagerRetriever.class);
-		when(retriever.getJobManagerGatewayAndWebPort())
-			.thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0)));
+		when(retriever.getJobManagerGatewayNow())
+			.thenReturn(Optional.of(jobManagerGateway));
 
 		TaskManagerLogHandler handler = new TaskManagerLogHandler(
 			retriever,
-			ExecutionContext$.MODULE$.fromExecutor(Executors.directExecutor()),
-			Future$.MODULE$.successful("/jm/address"),
-			AkkaUtils.getDefaultClientTimeout(),
+			Executors.directExecutor(),
+			CompletableFuture.completedFuture("/jm/address"),
+			TestingUtils.TIMEOUT(),
 			TaskManagerLogHandler.FileMode.LOG,
 			new Configuration(),
 			false,

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
index 17e7e9d..2992d91 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
+
 import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Tests for the TaskManagersHandler.
@@ -33,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration;
 public class TaskManagersHandlerTest {
 	@Test
 	public void testGetPaths() {
-		TaskManagersHandler handler = new TaskManagersHandler(new FiniteDuration(0, TimeUnit.SECONDS), null);
+		TaskManagersHandler handler = new TaskManagersHandler(Time.seconds(0L), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(2, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index b032061..90e032d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -18,17 +18,17 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorSystem;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import scala.concurrent.ExecutionContext;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -42,7 +42,11 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 	 */
 	@Test
 	public void testHandleRequest() throws Exception {
-		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(JobManagerRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
 		MetricStoreTest.setupStore(fetcher.getMetricStore());
 
 		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
@@ -91,7 +95,11 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 	 */
 	@Test
 	public void testInvalidListDoesNotFail() {
-		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(JobManagerRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
 		MetricStoreTest.setupStore(fetcher.getMetricStore());
 
 		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
@@ -117,7 +125,11 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 	 */
 	@Test
 	public void testInvalidGetDoesNotFail() {
-		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(JobManagerRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
 		MetricStoreTest.setupStore(fetcher.getMetricStore());
 
 		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
index 97c2055..994fc5e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorSystem;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import scala.concurrent.ExecutionContext;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -48,7 +48,11 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
 
 	@Test
 	public void getMapFor() {
-		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(JobManagerRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
 		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
 
 		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher);
@@ -62,7 +66,11 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
 
 	@Test
 	public void getMapForNull() {
-		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(JobManagerRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
 		MetricStore store = fetcher.getMetricStore();
 
 		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
index 53666eb..a35af22 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorSystem;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import scala.concurrent.ExecutionContext;
-
 import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -49,7 +49,11 @@ public class JobMetricsHandlerTest extends TestLogger {
 
 	@Test
 	public void getMapFor() throws Exception {
-		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(JobManagerRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
 		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
 
 		JobMetricsHandler handler = new JobMetricsHandler(fetcher);
@@ -64,7 +68,11 @@ public class JobMetricsHandlerTest extends TestLogger {
 
 	@Test
 	public void getMapForNull() {
-		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(JobManagerRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
 		MetricStore store = fetcher.getMetricStore();
 
 		JobMetricsHandler handler = new JobMetricsHandler(fetcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
index 5f68c6f..e84b11d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorSystem;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import scala.concurrent.ExecutionContext;
-
 import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
 import static org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
 import static org.junit.Assert.assertEquals;
@@ -50,7 +50,11 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
 
 	@Test
 	public void getMapFor() throws Exception {
-		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(JobManagerRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
 		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
 
 		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
@@ -68,7 +72,11 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
 
 	@Test
 	public void getMapForNull() {
-		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(JobManagerRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
 		MetricStore store = fetcher.getMetricStore();
 
 		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index 369e8aa..4c91997 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.metrics;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -26,49 +27,39 @@ import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Executor;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
-import scala.Option;
-import scala.collection.JavaConverters;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Future$;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.runtime.metrics.dump.MetricQueryService.METRIC_QUERY_SERVICE_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
 import static org.powermock.api.mockito.PowerMockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 /**
  * Tests for the MetricFetcher.
@@ -78,6 +69,8 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
 public class MetricFetcherTest extends TestLogger {
 	@Test
 	public void testUpdate() throws Exception {
+		final Time timeout = Time.seconds(10L);
+
 		// ========= setup TaskManager =================================================================================
 		JobID jobID = new JobID();
 		InstanceID tmID = new InstanceID();
@@ -94,58 +87,40 @@ public class MetricFetcherTest extends TestLogger {
 		JobDetails details = mock(JobDetails.class);
 		when(details.getJobId()).thenReturn(jobID);
 
-		ActorGateway jobManagerGateway = mock(ActorGateway.class);
-		Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers(
-			JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
+		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
 
-		when(jobManagerGateway.ask(isA(RequestJobDetails.class), any(FiniteDuration.class)))
-			.thenReturn(Future$.MODULE$.successful((Object) new MultipleJobsDetails(new JobDetails[0], new JobDetails[0])));
-		when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class)))
-			.thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
-		when(jobManagerGateway.path()).thenReturn("/jm/address");
+		when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(new JobDetails[0], new JobDetails[0])));
+		when(jobManagerGateway.requestTaskManagerInstances(any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager)));
+		when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
+		when(jobManagerGateway.requestWebPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(0));
 
-		JobManagerRetriever retriever = mock(JobManagerRetriever.class);
-		when(retriever.getJobManagerGatewayAndWebPort())
-			.thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0)));
+		AkkaJobManagerRetriever retriever = mock(AkkaJobManagerRetriever.class);
+		when(retriever.getJobManagerGatewayNow())
+			.thenReturn(Optional.of(jobManagerGateway));
 
 		// ========= setup QueryServices ================================================================================
-		Object requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID);
-
-		final ActorRef jmQueryService = mock(ActorRef.class);
-		final ActorRef tmQueryService = mock(ActorRef.class);
-
-		ActorSystem actorSystem = mock(ActorSystem.class);
-		when(actorSystem.actorFor(eq("/jm/" + METRIC_QUERY_SERVICE_NAME))).thenReturn(jmQueryService);
-		when(actorSystem.actorFor(eq("/tm/" + METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(tmQueryService);
-
-		MetricFetcher.BasicGateway jmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
-		when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
-			.thenReturn(Future$.MODULE$.successful((Object) new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
-
-		MetricFetcher.BasicGateway tmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
-		when(tmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
-			.thenReturn(Future$.MODULE$.successful(requestMetricsAnswer));
-
-		whenNew(MetricFetcher.BasicGateway.class)
-			.withArguments(eq(new Object() {
-				@Override
-				public boolean equals(Object o) {
-					return o == jmQueryService;
-				}
-			}))
-			.thenReturn(jmQueryServiceGateway);
-		whenNew(MetricFetcher.BasicGateway.class)
-			.withArguments(eq(new Object() {
-				@Override
-				public boolean equals(Object o) {
-					return o == tmQueryService;
-				}
-			}))
-			.thenReturn(tmQueryServiceGateway);
+		MetricQueryServiceGateway jmQueryService = mock(MetricQueryServiceGateway.class);
+		MetricQueryServiceGateway tmQueryService = mock(MetricQueryServiceGateway.class);
+
+		MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID);
+
+		when(jmQueryService.queryMetrics(any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
+		when(tmQueryService.queryMetrics(any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
+
+		MetricQueryServiceRetriever queryServiceRetriever = mock(MetricQueryServiceRetriever.class);
+		when(queryServiceRetriever.retrieveService(eq("/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
+		when(queryServiceRetriever.retrieveService(eq("/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
 
 		// ========= start MetricFetcher testing =======================================================================
-		ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(new CurrentThreadExecutor());
-		MetricFetcher fetcher = new MetricFetcher(actorSystem, retriever, context);
+		MetricFetcher fetcher = new MetricFetcher(
+			retriever,
+			queryServiceRetriever,
+			Executors.directExecutor(),
+			timeout);
 
 		// verify that update fetches metrics and updates the store
 		fetcher.update();
@@ -170,13 +145,7 @@ public class MetricFetcherTest extends TestLogger {
 		}
 	}
 
-	private static class CurrentThreadExecutor implements Executor {
-		public void execute(Runnable r) {
-			r.run();
-		}
-	}
-
-	private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException {
+	private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) {
 		Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
 		Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
 		Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
index 4333f04..c20ea98 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorSystem;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import scala.concurrent.ExecutionContext;
-
 import static org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler.TASK_MANAGER_ID_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -49,7 +49,11 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
 
 	@Test
 	public void getMapFor() throws Exception {
-		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(JobManagerRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
 		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
 
 		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher);
@@ -64,7 +68,11 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
 
 	@Test
 	public void getMapForNull() {
-		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(JobManagerRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
 		MetricStore store = fetcher.getMetricStore();
 
 		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 6ee78dd..bbc5889 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -22,13 +22,23 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collection;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -37,7 +47,8 @@ import scala.Option;
 import scala.reflect.ClassTag$;
 
 /**
- * Implementation of the {@link JobManagerGateway} for the {@link ActorGateway}.
+ * Implementation of the {@link JobManagerGateway} for old JobManager code based
+ * on Akka actors and the {@link ActorGateway}.
  */
 public class AkkaJobManagerGateway implements JobManagerGateway {
 
@@ -48,7 +59,6 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 		this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway);
 
 		final Option<String> optHostname = jobManagerGateway.actor().path().address().host();
-
 		hostname = optHostname.isDefined() ? optHostname.get() : "localhost";
 	}
 
@@ -63,25 +73,6 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 	}
 
 	@Override
-	public CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout) {
-		return FutureUtils
-			.toJava(jobManagerGateway
-				.ask(
-					new JobManagerMessages.RequestClassloadingProps(jobId),
-					FutureUtils.toFiniteDuration(timeout)))
-			.thenApply(
-				(Object response) -> {
-					if (response instanceof JobManagerMessages.ClassloadingProps) {
-						return Optional.of(((JobManagerMessages.ClassloadingProps) response));
-					} else if (response instanceof JobManagerMessages.JobNotFound) {
-						return Optional.empty();
-					} else {
-						throw new FlinkFutureException("Unknown response: " + response + '.');
-					}
-				});
-	}
-
-	@Override
 	public CompletableFuture<Integer> requestBlobServerPort(Time timeout) {
 		return FutureUtils.toJava(
 			jobManagerGateway
@@ -90,6 +81,21 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 	}
 
 	@Override
+	public CompletableFuture<Integer> requestWebPort(Time timeout) {
+		CompletableFuture<JobManagerMessages.ResponseWebMonitorPort> portResponseFuture = FutureUtils.toJava(
+			jobManagerGateway
+				.ask(JobManagerMessages.getRequestWebMonitorPort(), FutureUtils.toFiniteDuration(timeout))
+				.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.ResponseWebMonitorPort.class)));
+
+		return portResponseFuture.thenApply(
+			JobManagerMessages.ResponseWebMonitorPort::port);
+	}
+
+	//--------------------------------------------------------------------------------
+	// Job control
+	//--------------------------------------------------------------------------------
+
+	@Override
 	public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout) {
 		return FutureUtils
 			.toJava(
@@ -119,4 +125,146 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 				}
 			);
 	}
+
+	@Override
+	public CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, String savepointPath, Time timeout) {
+		CompletableFuture<JobManagerMessages.CancellationResponse> cancellationFuture = FutureUtils.toJava(
+			jobManagerGateway
+				.ask(new JobManagerMessages.CancelJobWithSavepoint(jobId, savepointPath), FutureUtils.toFiniteDuration(timeout))
+				.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));
+
+		return cancellationFuture.thenApply(
+			(JobManagerMessages.CancellationResponse response) -> {
+				if (response instanceof JobManagerMessages.CancellationSuccess) {
+					return ((JobManagerMessages.CancellationSuccess) response).savepointPath();
+				} else {
+					throw new FlinkFutureException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) response).cause());
+				}
+			});
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
+		CompletableFuture<JobManagerMessages.CancellationResponse> responseFuture = FutureUtils.toJava(
+			jobManagerGateway
+				.ask(new JobManagerMessages.CancelJob(jobId), FutureUtils.toFiniteDuration(timeout))
+				.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));
+
+		return responseFuture.thenApply(
+			(JobManagerMessages.CancellationResponse response) -> {
+				if (response instanceof JobManagerMessages.CancellationSuccess) {
+					return Acknowledge.get();
+				} else {
+					throw new FlinkFutureException("Cancel job failed " + jobId + '.', ((JobManagerMessages.CancellationFailure) response).cause());
+				}
+			});
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout) {
+		CompletableFuture<JobManagerMessages.StoppingResponse> responseFuture = FutureUtils.toJava(
+			jobManagerGateway
+				.ask(new JobManagerMessages.StopJob(jobId), FutureUtils.toFiniteDuration(timeout))
+				.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.StoppingResponse.class)));
+
+		return responseFuture.thenApply(
+			(JobManagerMessages.StoppingResponse response) -> {
+				if (response instanceof JobManagerMessages.StoppingSuccess) {
+					return Acknowledge.get();
+				} else {
+					throw new FlinkFutureException("Stop job failed " + jobId + '.', ((JobManagerMessages.StoppingFailure) response).cause());
+				}
+			});
+	}
+
+	//--------------------------------------------------------------------------------
+	// JobManager information
+	//--------------------------------------------------------------------------------
+
+	@Override
+	public CompletableFuture<Optional<Instance>> requestTaskManagerInstance(InstanceID instanceId, Time timeout) {
+		return FutureUtils.toJava(
+			jobManagerGateway
+				.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceId), FutureUtils.toFiniteDuration(timeout))
+				.mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class)))
+			.thenApply(
+				(JobManagerMessages.TaskManagerInstance taskManagerResponse) -> {
+					if (taskManagerResponse.instance().isDefined()) {
+						return Optional.of(taskManagerResponse.instance().get());
+					} else {
+						return Optional.empty();
+					}
+				});
+	}
+
+	@Override
+	public CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout) {
+		CompletableFuture<JobManagerMessages.RegisteredTaskManagers> taskManagersFuture = FutureUtils.toJava(
+			jobManagerGateway
+				.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), FutureUtils.toFiniteDuration(timeout))
+				.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.RegisteredTaskManagers.class)));
+
+		return taskManagersFuture.thenApply(
+			JobManagerMessages.RegisteredTaskManagers::asJavaCollection);
+	}
+
+	@Override
+	public CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout) {
+		return FutureUtils
+			.toJava(jobManagerGateway
+				.ask(
+					new JobManagerMessages.RequestClassloadingProps(jobId),
+					FutureUtils.toFiniteDuration(timeout)))
+			.thenApply(
+				(Object response) -> {
+					if (response instanceof JobManagerMessages.ClassloadingProps) {
+						return Optional.of(((JobManagerMessages.ClassloadingProps) response));
+					} else if (response instanceof JobManagerMessages.JobNotFound) {
+						return Optional.empty();
+					} else {
+						throw new FlinkFutureException("Unknown response: " + response + '.');
+					}
+				});
+	}
+
+	@Override
+	public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean includeRunning, boolean includeFinished, Time timeout) {
+		return FutureUtils.toJava(
+			jobManagerGateway
+				.ask(new RequestJobDetails(true, true), FutureUtils.toFiniteDuration(timeout))
+				.mapTo(ClassTag$.MODULE$.apply(MultipleJobsDetails.class)));
+	}
+
+	@Override
+	public CompletableFuture<Optional<AccessExecutionGraph>> requestJob(JobID jobId, Time timeout) {
+		CompletableFuture<JobManagerMessages.JobResponse> jobResponseFuture = FutureUtils.toJava(
+			jobManagerGateway
+				.ask(new JobManagerMessages.RequestJob(jobId), FutureUtils.toFiniteDuration(timeout))
+				.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobResponse.class)));
+
+		return jobResponseFuture.thenApply(
+			(JobManagerMessages.JobResponse jobResponse) -> {
+				if (jobResponse instanceof JobManagerMessages.JobFound) {
+					return Optional.of(((JobManagerMessages.JobFound) jobResponse).executionGraph());
+				} else {
+					return Optional.empty();
+				}
+			});
+	}
+
+	@Override
+	public CompletableFuture<StatusOverview> requestStatusOverview(Time timeout) {
+		return FutureUtils.toJava(
+			jobManagerGateway
+				.ask(RequestStatusOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
+				.mapTo(ClassTag$.MODULE$.apply(StatusOverview.class)));
+	}
+
+	@Override
+	public CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout) {
+		return FutureUtils.toJava(
+			jobManagerGateway
+				.ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
+				.mapTo(ClassTag$.MODULE$.apply(JobsWithIDsOverview.class)));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 562e697..19f0e2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -421,7 +421,9 @@ public class JobClient {
 
 		LOG.info("Checking and uploading JAR files");
 
-		final CompletableFuture<InetSocketAddress> blobServerAddressFuture = retrieveBlobServerAddress(jobManagerGateway, timeout);
+		final CompletableFuture<InetSocketAddress> blobServerAddressFuture = retrieveBlobServerAddress(
+			jobManagerGateway,
+			timeout);
 
 		final InetSocketAddress blobServerAddress;
 
@@ -448,7 +450,7 @@ public class JobClient {
 				"JobManager did not respond within " + timeout, e);
 		} catch (Throwable throwable) {
 			Throwable stripped = ExceptionUtils.stripExecutionException(throwable);
-
+			
 			try {
 				ExceptionUtils.tryDeserializeAndThrow(stripped, classLoader);
 			} catch (JobExecutionException jee) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index f204393..d24a3d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.runtime.clusterframework;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.actor.Address;
 import com.typesafe.config.Config;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.lang3.StringUtils;
+
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.NetUtils;
 
 import org.slf4j.Logger;
@@ -52,6 +54,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Tools for starting JobManager and TaskManager processes, including the
@@ -171,7 +174,11 @@ public class BootstrapTools {
 	 *
 	 * @param config The Flink config.
 	 * @param highAvailabilityServices Service factory for high availability services
-	 * @param actorSystem The ActorSystem to start the web frontend in.
+	 * @param jobManagerRetriever to retrieve the leading JobManagerGateway
+	 * @param queryServiceRetriever to resolve a query service
+	 * @param timeout for asynchronous operations
+	 * @param executor to run asynchronous operations
+	 * @param jobManagerAddress the address of the JobManager for which the WebMonitor is started
 	 * @param logger Logger for log output
 	 * @return WebMonitor instance.
 	 * @throws Exception
@@ -179,16 +186,13 @@ public class BootstrapTools {
 	public static WebMonitor startWebMonitorIfConfigured(
 			Configuration config,
 			HighAvailabilityServices highAvailabilityServices,
-			ActorSystem actorSystem,
-			ActorRef jobManager,
+			JobManagerRetriever jobManagerRetriever,
+			MetricQueryServiceRetriever queryServiceRetriever,
+			Time timeout,
+			Executor executor,
+			String jobManagerAddress,
 			Logger logger) throws Exception {
 
-
-		// this ensures correct values are present in the web frontend
-		final Address address = AkkaUtils.getAddress(actorSystem);
-		config.setString(JobManagerOptions.ADDRESS, address.host().get());
-		config.setInteger(JobManagerOptions.PORT, Integer.parseInt(address.port().get().toString()));
-
 		if (config.getInteger(WebOptions.PORT, 0) >= 0) {
 			logger.info("Starting JobManager Web Frontend");
 
@@ -197,12 +201,14 @@ public class BootstrapTools {
 			WebMonitor monitor = WebMonitorUtils.startWebRuntimeMonitor(
 				config,
 				highAvailabilityServices,
-				actorSystem);
+				jobManagerRetriever,
+				queryServiceRetriever,
+				timeout,
+				executor);
 
 			// start the web monitor
 			if (monitor != null) {
-				String jobManagerAkkaURL = AkkaUtils.getAkkaURL(actorSystem, jobManager);
-				monitor.start(jobManagerAkkaURL);
+				monitor.start(jobManagerAddress);
 			}
 			return monitor;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 043c603..5c6439d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -305,6 +305,16 @@ public class FutureUtils {
 		return new FiniteDuration(time.toMilliseconds(), TimeUnit.MILLISECONDS);
 	}
 
+	/**
+	 * Converts {@link FiniteDuration} into Flink time.
+	 *
+	 * @param finiteDuration to convert into Flink time
+	 * @return Flink time with the length of the given finite duration
+	 */
+	public static Time toTime(FiniteDuration finiteDuration) {
+		return Time.milliseconds(finiteDuration.toMillis());
+	}
+
 	// ------------------------------------------------------------------------
 	//  Converting futures
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index cba7b06..a4d0d11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -21,11 +21,20 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.rpc.RpcGateway;
 
+import javax.annotation.Nullable;
+
+import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -38,21 +47,19 @@ import java.util.concurrent.CompletableFuture;
 public interface JobManagerGateway extends RpcGateway {
 
 	/**
-	 * Requests the class loading properties for the given JobID.
+	 * Requests the BlobServer port.
 	 *
-	 * @param jobId for which the class loading properties are requested
 	 * @param timeout for this operation
-	 * @return Future containing the optional class loading properties if they could be retrieved from the JobManager.
+	 * @return Future containing the BlobServer port
 	 */
-	CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout);
+	CompletableFuture<Integer> requestBlobServerPort(Time timeout);
 
 	/**
-	 * Requests the BlobServer port.
+	 * Returns the port of the web runtime monitor serving requests for the JobManager endpoint.
 	 *
-	 * @param timeout for this operation
-	 * @return Future containing the BlobServer port
+	 * @return Port of the WebRuntimeMonitor responsible for the JobManager endpoint
 	 */
-	CompletableFuture<Integer> requestBlobServerPort(Time timeout);
+	CompletableFuture<Integer> requestWebPort(Time timeout);
 
 	/**
 	 * Submits a job to the JobManager.
@@ -63,4 +70,103 @@ public interface JobManagerGateway extends RpcGateway {
 	 * @return Future containing an Acknowledge message if the submission succeeded
 	 */
 	CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout);
+
+	/**
+	 * Cancels the given job after taking a savepoint and returning its path.
+	 *
+	 * If the savepointPath is null, then the JobManager will use the default savepoint directory
+	 * to store the savepoint in. After the savepoint has been taken and the job has been canceled
+	 * successfully, the path of the savepoint is returned.
+	 *
+	 * @param jobId identifying the job to cancel
+	 * @param savepointPath Optional path for the savepoint to be stored under; if null, then the default path is
+	 *                      taken
+	 * @param timeout for the asynchronous operation
+	 * @return Future containing the savepoint path of the taken savepoint or an Exception if the operation failed
+	 */
+	CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, @Nullable String savepointPath, Time timeout);
+
+	/**
+	 * Cancels the given job.
+	 *
+	 * @param jobId identifying the job to cancel
+	 * @param timeout for the asynchronous operation
+	 * @return Future containing Acknowledge or an Exception if the operation failed
+	 */
+	CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout);
+
+	/**
+	 * Stops the given job.
+	 *
+	 * @param jobId identifying the job to cancel
+	 * @param timeout for the asynchronous operation
+	 * @return Future containing Acknowledge or an Exception if the operation failed
+	 */
+	CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout);
+
+	/**
+	 * Requests the class loading properties for the given JobID.
+	 *
+	 * @param jobId for which the class loading properties are requested
+	 * @param timeout for this operation
+	 * @return Future containing the optional class loading properties if they could be retrieved from the JobManager.
+	 */
+	CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout);
+
+	/**
+	 * Requests the TaskManager instance registered under the given instanceId from the JobManager.
+	 * If there is no Instance registered, then {@link Optional#empty()} is returned.
+	 *
+	 * @param instanceId for which to retrieve the Instance
+	 * @param timeout for the asynchronous operation
+	 * @return Future containing the TaskManager instance registered under instanceId, otherwise {@link Optional#empty()}
+	 */
+	CompletableFuture<Optional<Instance>> requestTaskManagerInstance(InstanceID instanceId, Time timeout);
+
+	/**
+	 * Requests all currently registered TaskManager instances from the JobManager.
+	 *
+	 * @param timeout for the asynchronous operation
+	 * @return Future containing the collection of all currently registered TaskManager instances
+	 */
+	CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout);
+
+	/**
+	 * Requests job details currently being executed by the JobManager.
+	 *
+	 * @param includeRunning true if running jobs shall be included, otherwise false
+	 * @param includeFinished true if finished jobs shall be included, otherwise false
+	 * @param timeout for the asynchronous operation
+	 * @return Future containing the job details
+	 */
+	CompletableFuture<MultipleJobsDetails> requestJobDetails(
+		boolean includeRunning,
+		boolean includeFinished,
+		Time timeout);
+
+	/**
+	 * Requests the AccessExecutionGraph for the given jobId. If there is no such graph, then
+	 * {@link Optional#empty()} is returned.
+	 *
+	 * @param jobId identifying the job whose AccessExecutionGraph is requested
+	 * @param timeout for the asynchronous operation
+	 * @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link Optional#empty()}
+	 */
+	CompletableFuture<Optional<AccessExecutionGraph>> requestJob(JobID jobId, Time timeout);
+
+	/**
+	 * Requests the status overview from the JobManager.
+	 *
+	 * @param timeout for the asynchronous operation
+	 * @return Future containing the status overview
+	 */
+	CompletableFuture<StatusOverview> requestStatusOverview(Time timeout);
+
+	/**
+	 * Requests the job overview from the JobManager.
+	 *
+	 * @param timeout for the asynchronous operation
+	 * @return Future containing the job overview
+	 */
+	CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index e173522..1791fe1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -25,6 +25,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 import org.apache.flink.util.Preconditions;

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 9ebb126..9493696 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.Path;
@@ -31,8 +32,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
-import akka.actor.ActorSystem;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -47,6 +49,7 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Utilities for the web runtime monitor. This class contains for example methods to build
@@ -119,27 +122,39 @@ public final class WebMonitorUtils {
 	 *
 	 * @param config The configuration for the runtime monitor.
 	 * @param highAvailabilityServices HighAvailabilityServices used to start the WebRuntimeMonitor
-	 * @param actorSystem ActorSystem used to connect to the JobManager
-	 *
+	 * @param jobManagerRetriever which retrieves the currently leading JobManager
+	 * @param queryServiceRetriever which retrieves the query service
+	 * @param timeout for asynchronous operations
+	 * @param executor to run asynchronous operations
 	 */
 	public static WebMonitor startWebRuntimeMonitor(
 			Configuration config,
 			HighAvailabilityServices highAvailabilityServices,
-			ActorSystem actorSystem) {
+			JobManagerRetriever jobManagerRetriever,
+			MetricQueryServiceRetriever queryServiceRetriever,
+			Time timeout,
+			Executor executor) {
 		// try to load and instantiate the class
 		try {
 			String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
 			Class<? extends WebMonitor> clazz = Class.forName(classname).asSubclass(WebMonitor.class);
 
-			Constructor<? extends WebMonitor> constructor = clazz.getConstructor(Configuration.class,
+			Constructor<? extends WebMonitor> constructor = clazz.getConstructor(
+				Configuration.class,
 				LeaderRetrievalService.class,
 				BlobView.class,
-				ActorSystem.class);
+				JobManagerRetriever.class,
+				MetricQueryServiceRetriever.class,
+				Time.class,
+				Executor.class);
 			return constructor.newInstance(
 				config,
 				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
 				highAvailabilityServices.createBlobStore(),
-				actorSystem);
+				jobManagerRetriever,
+				queryServiceRetriever,
+				timeout,
+				executor);
 		} catch (ClassNotFoundException e) {
 			LOG.error("Could not load web runtime monitor. " +
 					"Probably reason: flink-runtime-web is not in the classpath");

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
new file mode 100644
index 0000000..2eade48
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Retrieves and stores the JobManagerGateway for the current leading JobManager.
+ */
+public abstract class JobManagerRetriever implements LeaderRetrievalListener {
+
+	private final Logger log = LoggerFactory.getLogger(getClass());
+
+	// False if we have to create a new JobManagerGateway future when being notified
+	// about a new leader address
+	private final AtomicBoolean firstTimeUsage;
+
+	private volatile CompletableFuture<JobManagerGateway> jobManagerGatewayFuture;
+
+	public JobManagerRetriever() {
+		firstTimeUsage = new AtomicBoolean(true);
+		jobManagerGatewayFuture = new CompletableFuture<>();
+	}
+
+	/**
+	 * Returns the currently known leading job manager gateway and its web monitor port.
+	 */
+	public Optional<JobManagerGateway> getJobManagerGatewayNow() throws Exception {
+		if (jobManagerGatewayFuture != null) {
+			CompletableFuture<JobManagerGateway> jobManagerGatewayFuture = this.jobManagerGatewayFuture;
+
+			if (jobManagerGatewayFuture.isDone()) {
+				return Optional.of(jobManagerGatewayFuture.get());
+			} else {
+				return Optional.empty();
+			}
+		} else {
+			return Optional.empty();
+		}
+	}
+
+	/**
+	 * Returns the current JobManagerGateway future.
+	 */
+	public CompletableFuture<JobManagerGateway> getJobManagerGateway() throws Exception {
+		return jobManagerGatewayFuture;
+	}
+
+	@Override
+	public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
+		if (leaderAddress != null && !leaderAddress.equals("")) {
+			try {
+				final CompletableFuture<JobManagerGateway> newJobManagerGatewayFuture;
+
+				if (firstTimeUsage.compareAndSet(true, false)) {
+					newJobManagerGatewayFuture = jobManagerGatewayFuture;
+				} else {
+					newJobManagerGatewayFuture = new CompletableFuture<>();
+					jobManagerGatewayFuture = newJobManagerGatewayFuture;
+				}
+
+				log.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
+
+				createJobManagerGateway(leaderAddress, leaderSessionID).whenComplete(
+					(JobManagerGateway jobManagerGateway, Throwable throwable) -> {
+						if (throwable != null) {
+							newJobManagerGatewayFuture.completeExceptionally(new FlinkException("Could not retrieve" +
+								"the current job manager gateway.", throwable));
+						} else {
+							newJobManagerGatewayFuture.complete(jobManagerGateway);
+						}
+					}
+				);
+			}
+			catch (Exception e) {
+				handleError(e);
+			}
+		}
+	}
+
+	@Override
+	public void handleError(Exception exception) {
+		log.error("Received error from LeaderRetrievalService.", exception);
+
+		jobManagerGatewayFuture.completeExceptionally(exception);
+	}
+
+	/**
+	 * Create a JobManagerGateway for the given leader address and leader id.
+	 *
+	 * @param leaderAddress to connect against
+	 * @param leaderId the endpoint currently uses
+	 * @return Future containing the resolved JobManagerGateway
+	 * @throws Exception if the JobManagerGateway creation failed
+	 */
+	protected abstract CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
new file mode 100644
index 0000000..c79bf5d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceGateway;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Gateway to communicate with a QueryService.
+ *
+ * <p>Currently there is only one implementation working with a Akka based
+ * MetricQueryService {@link AkkaQueryServiceGateway}.
+ */
+public interface MetricQueryServiceGateway {
+
+	CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java
new file mode 100644
index 0000000..7bb9b44
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Retriever for {@link MetricQueryServiceGateway}.
+ */
+public interface MetricQueryServiceRetriever {
+
+	/**
+	 * Retrieves for the given query service path a {@link MetricQueryServiceGateway}.
+	 *
+	 * @param queryServicePath under which the QueryService can be reached
+	 * @return Future containing the resolved QueryServiceGateway
+	 */
+	CompletableFuture<MetricQueryServiceGateway> retrieveService(String queryServicePath);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
new file mode 100644
index 0000000..027b42a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link JobManagerRetriever} implementation for Akka based JobManagers.
+ */
+public class AkkaJobManagerRetriever extends JobManagerRetriever {
+
+	private final ActorSystem actorSystem;
+	private final Time timeout;
+
+	public AkkaJobManagerRetriever(
+			ActorSystem actorSystem,
+			Time timeout) {
+
+		this.actorSystem = Preconditions.checkNotNull(actorSystem);
+		this.timeout = Preconditions.checkNotNull(timeout);
+	}
+
+	@Override
+	protected CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception {
+		return FutureUtils.toJava(
+			AkkaUtils.getActorRefFuture(
+				leaderAddress,
+				actorSystem,
+				FutureUtils.toFiniteDuration(timeout)))
+			.thenApplyAsync(
+				(ActorRef jobManagerRef) -> {
+					ActorGateway leaderGateway = new AkkaActorGateway(
+						jobManagerRef, leaderId);
+
+					return new AkkaJobManagerGateway(leaderGateway);
+				},
+				actorSystem.dispatcher());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
new file mode 100644
index 0000000..8985205
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+
+import java.util.concurrent.CompletableFuture;
+
+import scala.reflect.ClassTag$;
+
+/**
+ * {@link MetricQueryServiceGateway} implementation for Akka based {@link MetricQueryService}.
+ */
+public class AkkaQueryServiceGateway implements MetricQueryServiceGateway {
+
+	private final ActorRef queryServiceActorRef;
+
+	public AkkaQueryServiceGateway(ActorRef queryServiceActorRef) {
+		this.queryServiceActorRef = Preconditions.checkNotNull(queryServiceActorRef);
+	}
+
+	@Override
+	public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout) {
+		return FutureUtils.toJava(
+			Patterns.ask(queryServiceActorRef, MetricQueryService.getCreateDump(), timeout.toMilliseconds())
+				.mapTo(ClassTag$.MODULE$.apply(MetricDumpSerialization.MetricSerializationResult.class))
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java
new file mode 100644
index 0000000..7de436a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link MetricQueryServiceRetriever} implementation for Akka based {@link MetricQueryService}.
+ */
+public class AkkaQueryServiceRetriever implements MetricQueryServiceRetriever {
+	private final ActorSystem actorSystem;
+	private final Time lookupTimeout;
+
+	public AkkaQueryServiceRetriever(ActorSystem actorSystem, Time lookupTimeout) {
+		this.actorSystem = Preconditions.checkNotNull(actorSystem);
+		this.lookupTimeout = Preconditions.checkNotNull(lookupTimeout);
+	}
+
+	@Override
+	public CompletableFuture<MetricQueryServiceGateway> retrieveService(String queryServicePath) {
+		ActorSelection selection = actorSystem.actorSelection(queryServicePath);
+
+		return FutureUtils.toJava(selection.resolveOne(FutureUtils.toFiniteDuration(lookupTimeout))).thenApply(AkkaQueryServiceGateway::new);
+	}
+}


[4/4] flink git commit: [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway

Posted by tr...@apache.org.
[FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway

This PR decouples the WebRuntimeMonitor from the ActorGateway by introducing
the JobManagerGateway interface which can have multiple implementations. This
is a preliminary step for the integration of the existing WebRuntimeMonitor
with the Flip-6 JobMaster.

Add time unit for web.timeout

This closes #4492.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f790d3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f790d3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f790d3e

Branch: refs/heads/master
Commit: 9f790d3efe5e8267da05eb97bbe07ca8a0f859fe
Parents: 00d5b62
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 2 18:43:00 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Aug 11 13:48:14 2017 +0200

----------------------------------------------------------------------
 docs/ops/config.md                              |   2 +
 .../apache/flink/configuration/WebOptions.java  |   7 +
 .../MesosApplicationMasterRunner.java           |  13 +-
 .../webmonitor/ExecutionGraphHolder.java        |  59 ++----
 .../runtime/webmonitor/JobManagerRetriever.java | 197 ------------------
 .../webmonitor/RuntimeMonitorHandler.java       |  16 +-
 .../webmonitor/RuntimeMonitorHandlerBase.java   |  36 ++--
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  74 ++++---
 .../files/StaticFileServerHandler.java          |  35 ++--
 .../AbstractExecutionGraphRequestHandler.java   |  22 +-
 .../handlers/AbstractJsonRequestHandler.java    |  10 +-
 .../handlers/ClusterOverviewHandler.java        |  23 +--
 .../handlers/CurrentJobIdsHandler.java          |  22 +-
 .../handlers/CurrentJobsOverviewHandler.java    |  24 +--
 .../handlers/DashboardConfigHandler.java        |   4 +-
 .../handlers/HandlerRedirectUtils.java          |  41 ++--
 .../handlers/JarAccessDeniedHandler.java        |   4 +-
 .../webmonitor/handlers/JarDeleteHandler.java   |   4 +-
 .../webmonitor/handlers/JarListHandler.java     |   4 +-
 .../webmonitor/handlers/JarPlanHandler.java     |   4 +-
 .../webmonitor/handlers/JarRunHandler.java      |  20 +-
 .../webmonitor/handlers/JarUploadHandler.java   |   8 +-
 .../handlers/JobCancellationHandler.java        |  19 +-
 .../JobCancellationWithSavepointHandlers.java   | 124 ++++++------
 .../handlers/JobManagerConfigHandler.java       |   4 +-
 .../webmonitor/handlers/JobStoppingHandler.java |  19 +-
 .../webmonitor/handlers/RequestHandler.java     |   9 +-
 .../handlers/TaskManagerLogHandler.java         |  66 +++---
 .../handlers/TaskManagersHandler.java           |  39 ++--
 .../CheckpointStatsDetailsSubtasksHandler.java  |   6 +-
 .../metrics/AbstractMetricsHandler.java         |   4 +-
 .../webmonitor/metrics/MetricFetcher.java       | 202 +++++++++----------
 .../runtime/webmonitor/WebFrontendITCase.java   |  27 +--
 .../webmonitor/WebRuntimeMonitorITCase.java     |  73 ++++---
 .../handlers/ClusterOverviewHandlerTest.java    |   8 +-
 .../handlers/CurrentJobIdsHandlerTest.java      |   8 +-
 .../CurrentJobsOverviewHandlerTest.java         |  10 +-
 .../handlers/HandlerRedirectUtilsTest.java      |  39 ++--
 .../webmonitor/handlers/JarRunHandlerTest.java  |   5 +-
 .../handlers/JobAccumulatorsHandlerTest.java    |   5 +-
 .../handlers/JobCancellationHandlerTest.java    |   4 +-
 ...obCancellationWithSavepointHandlersTest.java |  82 ++++----
 .../handlers/JobConfigHandlerTest.java          |   5 +-
 .../handlers/JobDetailsHandlerTest.java         |   5 +-
 .../handlers/JobExceptionsHandlerTest.java      |   5 +-
 .../webmonitor/handlers/JobPlanHandlerTest.java |   5 +-
 .../handlers/JobStoppingHandlerTest.java        |   7 +-
 .../JobVertexAccumulatorsHandlerTest.java       |   5 +-
 .../JobVertexBackPressureHandlerTest.java       |   2 +-
 .../handlers/JobVertexDetailsHandlerTest.java   |   5 +-
 .../JobVertexTaskManagersHandlerTest.java       |   5 +-
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |   6 +-
 ...ExecutionAttemptAccumulatorsHandlerTest.java |   5 +-
 ...btaskExecutionAttemptDetailsHandlerTest.java |   5 +-
 .../SubtasksAllAccumulatorsHandlerTest.java     |   5 +-
 .../handlers/SubtasksTimesHandlerTest.java      |   5 +-
 .../handlers/TaskManagerLogHandlerTest.java     |  54 ++---
 .../handlers/TaskManagersHandlerTest.java       |   7 +-
 .../metrics/AbstractMetricsHandlerTest.java     |  26 ++-
 .../metrics/JobManagerMetricsHandlerTest.java   |  20 +-
 .../metrics/JobMetricsHandlerTest.java          |  20 +-
 .../metrics/JobVertexMetricsHandlerTest.java    |  20 +-
 .../webmonitor/metrics/MetricFetcherTest.java   | 111 ++++------
 .../metrics/TaskManagerMetricsHandlerTest.java  |  20 +-
 .../runtime/akka/AkkaJobManagerGateway.java     | 190 +++++++++++++++--
 .../apache/flink/runtime/client/JobClient.java  |   6 +-
 .../clusterframework/BootstrapTools.java        |  34 ++--
 .../flink/runtime/concurrent/FutureUtils.java   |  10 +
 .../runtime/jobmaster/JobManagerGateway.java    | 122 ++++++++++-
 .../metrics/dump/MetricDumpSerialization.java   |   1 +
 .../runtime/webmonitor/WebMonitorUtils.java     |  29 ++-
 .../retriever/JobManagerRetriever.java          | 123 +++++++++++
 .../retriever/MetricQueryServiceGateway.java    |  36 ++++
 .../retriever/MetricQueryServiceRetriever.java  |  35 ++++
 .../retriever/impl/AkkaJobManagerRetriever.java |  69 +++++++
 .../retriever/impl/AkkaQueryServiceGateway.java |  53 +++++
 .../impl/AkkaQueryServiceRetriever.java         |  51 +++++
 .../retriever/impl/RpcJobManagerRetriever.java  |  46 +++++
 .../flink/runtime/jobmanager/JobManager.scala   |  10 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  10 +-
 .../runtime/testingUtils/TestingUtils.scala     |   2 +
 .../flink/yarn/YarnApplicationMasterRunner.java |  12 +-
 82 files changed, 1551 insertions(+), 1018 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index c8d5c92..4138b4d 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -389,6 +389,8 @@ These parameters allow for advanced tuning. The default values are sufficient wh
 
 - `web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`.
 
+- `web.timeout`: Timeout for asynchronous operation executed by the web frontend in milliseconds (DEFAULT: `10000`, 10 s)
+
 ### File Systems
 
 The parameters define the behavior of tasks that create result files.

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
index f499045..3733244 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
@@ -149,6 +149,13 @@ public class WebOptions {
 			.defaultValue(50)
 			.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples");
 
+	/**
+	 * Timeout for asynchronous operations by the WebRuntimeMonitor in milliseconds.
+	 */
+	public static final ConfigOption<Long> TIMEOUT = ConfigOptions
+		.key("web.timeout")
+		.defaultValue(10L * 1000L);
+
 
 	private WebOptions() {
 		throw new IllegalAccessError();

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 260b7f3..7891386 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
@@ -52,6 +54,8 @@ import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -319,11 +323,16 @@ public class MesosApplicationMasterRunner {
 			// 2: the web monitor
 			LOG.debug("Starting Web Frontend");
 
+			Time webMonitorTimeout = Time.milliseconds(config.getLong(WebOptions.TIMEOUT));
+
 			webMonitor = BootstrapTools.startWebMonitorIfConfigured(
 				config,
 				highAvailabilityServices,
-				actorSystem,
-				jobManager,
+				new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout),
+				new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
+				webMonitorTimeout,
+				futureExecutor,
+				AkkaUtils.getAkkaURL(actorSystem, jobManager),
 				LOG);
 			if (webMonitor != null) {
 				final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 75b0475..739b375 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -19,20 +19,19 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
 import java.util.WeakHashMap;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -48,7 +47,7 @@ public class ExecutionGraphHolder {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
 	private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>();
 
@@ -56,50 +55,36 @@ public class ExecutionGraphHolder {
 		this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
 	}
 
-	public ExecutionGraphHolder(FiniteDuration timeout) {
+	public ExecutionGraphHolder(Time timeout) {
 		this.timeout = checkNotNull(timeout);
 	}
 
 	/**
-	 * Retrieves the execution graph with {@link JobID} jid or null if it cannot be found.
+	 * Retrieves the execution graph with {@link JobID} jid wrapped in {@link Optional} or
+	 * {@link Optional#empty()} if it cannot be found.
 	 *
 	 * @param jid jobID of the execution graph to be retrieved
-	 * @return the retrieved execution graph or null if it is not retrievable
+	 * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph
+	 * @throws Exception if the ExecutionGraph retrieval failed.
 	 */
-	public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
+	public Optional<AccessExecutionGraph> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) throws Exception {
 		AccessExecutionGraph cached = cache.get(jid);
 		if (cached != null) {
 			if (cached.getState() == JobStatus.SUSPENDED) {
 				cache.remove(jid);
 			} else {
-				return cached;
+				return Optional.of(cached);
 			}
 		}
 
-		try {
-			if (jobManager != null) {
-				Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
-				Object result = Await.result(future, timeout);
-
-				if (result instanceof JobManagerMessages.JobNotFound) {
-					return null;
-				}
-				else if (result instanceof JobManagerMessages.JobFound) {
-					AccessExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
-					cache.put(jid, eg);
-					return eg;
-				}
-				else {
-					throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
-				}
-			}
-			else {
-				LOG.warn("No connection to the leading JobManager.");
-				return null;
-			}
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Error requesting execution graph", e);
-		}
+		CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);
+
+		Optional<AccessExecutionGraph> result = executionGraphFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+		return result.map((executionGraph) -> {
+			cache.put(jid, executionGraph);
+
+			return executionGraph;
+		});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
deleted file mode 100644
index 175a4b8..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.ResponseWebMonitorPort;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Retrieves and stores the actor gateway to the current leading JobManager. In case of an error,
- * the {@link WebRuntimeMonitor} to which this instance is associated will be stopped.
- *
- * <p>The job manager gateway only works if the web monitor and the job manager run in the same
- * actor system, because many execution graph structures are not serializable. This breaks the nice
- * leader retrieval abstraction and we have a special code path in case that another job manager is
- * leader (see {@link org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils}. In such a
- * case, we get the address of the web monitor of the leading job manager and redirect to it
- * (instead of directly communicating with it).
- */
-public class JobManagerRetriever implements LeaderRetrievalListener {
-
-	private static final Logger LOG = LoggerFactory.getLogger(JobManagerRetriever.class);
-
-	private final Object waitLock = new Object();
-
-	private final WebMonitor webMonitor;
-	private final ActorSystem actorSystem;
-	private final FiniteDuration lookupTimeout;
-	private final FiniteDuration timeout;
-
-	private volatile Future<Tuple2<ActorGateway, Integer>> leaderGatewayPortFuture;
-
-	public JobManagerRetriever(
-			WebMonitor webMonitor,
-			ActorSystem actorSystem,
-			FiniteDuration lookupTimeout,
-			FiniteDuration timeout) {
-
-		this.webMonitor = checkNotNull(webMonitor);
-		this.actorSystem = checkNotNull(actorSystem);
-		this.lookupTimeout = checkNotNull(lookupTimeout);
-		this.timeout = checkNotNull(timeout);
-	}
-
-	/**
-	 * Returns the currently known leading job manager gateway and its web monitor port.
-	 */
-	public Option<Tuple2<ActorGateway, Integer>> getJobManagerGatewayAndWebPort() throws Exception {
-		if (leaderGatewayPortFuture != null) {
-			Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = leaderGatewayPortFuture;
-
-			if (gatewayPortFuture.isCompleted()) {
-				Tuple2<ActorGateway, Integer> gatewayPort = Await.result(gatewayPortFuture, timeout);
-
-				return Option.apply(gatewayPort);
-			} else {
-				return Option.empty();
-			}
-		} else {
-			return Option.empty();
-		}
-	}
-
-	/**
-	 * Awaits the leading job manager gateway and its web monitor port.
-	 */
-	public Tuple2<ActorGateway, Integer> awaitJobManagerGatewayAndWebPort() throws Exception {
-		Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = null;
-		Deadline deadline = timeout.fromNow();
-
-		while (!deadline.isOverdue()) {
-			synchronized (waitLock) {
-				gatewayPortFuture = leaderGatewayPortFuture;
-
-				if (gatewayPortFuture != null) {
-					break;
-				}
-
-				waitLock.wait(deadline.timeLeft().toMillis());
-			}
-		}
-
-		if (gatewayPortFuture == null) {
-			throw new TimeoutException("There is no JobManager available.");
-		} else {
-			return Await.result(gatewayPortFuture, deadline.timeLeft());
-		}
-	}
-
-	@Override
-	public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
-		if (leaderAddress != null && !leaderAddress.equals("")) {
-			try {
-				final Promise<Tuple2<ActorGateway, Integer>> leaderGatewayPortPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-				synchronized (waitLock) {
-					leaderGatewayPortFuture = leaderGatewayPortPromise.future();
-					waitLock.notifyAll();
-				}
-
-				LOG.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
-
-				AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, lookupTimeout)
-						// Resolve the actor ref
-						.flatMap(new Mapper<ActorRef, Future<Tuple2<ActorGateway, Object>>>() {
-							@Override
-							public Future<Tuple2<ActorGateway, Object>> apply(ActorRef jobManagerRef) {
-								ActorGateway leaderGateway = new AkkaActorGateway(
-										jobManagerRef, leaderSessionID);
-
-								Future<Object> webMonitorPort = leaderGateway.ask(
-									JobManagerMessages.getRequestWebMonitorPort(),
-									timeout);
-
-								return Futures.successful(leaderGateway).zip(webMonitorPort);
-							}
-						}, actorSystem.dispatcher())
-								// Request the web monitor port
-						.onComplete(new OnComplete<Tuple2<ActorGateway, Object>>() {
-							@Override
-							public void onComplete(Throwable failure, Tuple2<ActorGateway, Object> success) throws Throwable {
-								if (failure == null) {
-									if (success._2() instanceof ResponseWebMonitorPort) {
-										int webMonitorPort = ((ResponseWebMonitorPort) success._2()).port();
-
-										leaderGatewayPortPromise.success(new Tuple2<>(success._1(), webMonitorPort));
-									} else {
-										leaderGatewayPortPromise.failure(new Exception("Received the message " +
-										success._2() + " as response to " + JobManagerMessages.getRequestWebMonitorPort() +
-											". But a message of type " + ResponseWebMonitorPort.class + " was expected."));
-									}
-								} else {
-									LOG.warn("Failed to retrieve leader gateway and port.", failure);
-									leaderGatewayPortPromise.failure(failure);
-								}
-							}
-						}, actorSystem.dispatcher());
-			}
-			catch (Exception e) {
-				handleError(e);
-			}
-		}
-	}
-
-	@Override
-	public void handleError(Exception exception) {
-		LOG.error("Received error from LeaderRetrievalService.", exception);
-
-		try {
-			// stop associated webMonitor
-			webMonitor.stop();
-		}
-		catch (Exception e) {
-			LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 4777202..35d13dd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -43,9 +45,7 @@ import java.net.URLDecoder;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
-
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -72,8 +72,8 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 			WebMonitorConfig cfg,
 			RequestHandler handler,
 			JobManagerRetriever retriever,
-			Future<String> localJobManagerAddressFuture,
-			FiniteDuration timeout,
+			CompletableFuture<String> localJobManagerAddressFuture,
+			Time timeout,
 			boolean httpsEnabled) {
 
 		super(retriever, localJobManagerAddressFuture, timeout, httpsEnabled);
@@ -87,7 +87,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 	}
 
 	@Override
-	protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) {
+	protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) {
 		FullHttpResponse response;
 
 		try {
@@ -106,7 +106,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 			queryParams.put(WEB_MONITOR_ADDRESS_KEY,
 				(httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort());
 
-			response = handler.handleRequest(pathParams, queryParams, jobManager);
+			response = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
 		}
 		catch (NotFoundException e) {
 			// this should result in a 404 error code (not found)

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
index d524632..4cb55f1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.webmonitor;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -29,11 +31,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -48,9 +48,9 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
 
 	private final JobManagerRetriever retriever;
 
-	protected final Future<String> localJobManagerAddressFuture;
+	protected final CompletableFuture<String> localJobManagerAddressFuture;
 
-	protected final FiniteDuration timeout;
+	protected final Time timeout;
 
 	/** Whether the web service has https enabled. */
 	protected final boolean httpsEnabled;
@@ -59,8 +59,8 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
 
 	public RuntimeMonitorHandlerBase(
 		JobManagerRetriever retriever,
-		Future<String> localJobManagerAddressFuture,
-		FiniteDuration timeout,
+		CompletableFuture<String> localJobManagerAddressFuture,
+		Time timeout,
 		boolean httpsEnabled) {
 
 		this.retriever = checkNotNull(retriever);
@@ -78,17 +78,17 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
 
 	@Override
 	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
-		if (localJobManagerAddressFuture.isCompleted()) {
+		if (localJobManagerAddressFuture.isDone()) {
 			if (localJobManagerAddress == null) {
-				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
+				localJobManagerAddress = localJobManagerAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			}
 
-			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
+			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
 
-			if (jobManager.isDefined()) {
-				Tuple2<ActorGateway, Integer> gatewayPort = jobManager.get();
+			if (optJobManagerGateway.isPresent()) {
+				JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
 				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
-					localJobManagerAddress, gatewayPort);
+					localJobManagerAddress, jobManagerGateway, timeout);
 
 				if (redirectAddress != null) {
 					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path(),
@@ -96,7 +96,7 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
 					KeepAliveWrite.flush(ctx, routed.request(), redirect);
 				}
 				else {
-					respondAsLeader(ctx, routed, gatewayPort._1());
+					respondAsLeader(ctx, routed, jobManagerGateway);
 				}
 			} else {
 				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
@@ -106,5 +106,5 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
 		}
 	}
 
-	protected abstract void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager);
+	protected abstract void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index e27a15f..17f02f0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -71,12 +70,14 @@ import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 
-import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,16 +86,11 @@ import javax.net.ssl.SSLContext;
 import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -108,7 +104,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class WebRuntimeMonitor implements WebMonitor {
 
 	/** By default, all requests to the JobManager have a timeout of 10 seconds. */
-	public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
+	public static final Time DEFAULT_REQUEST_TIMEOUT = Time.seconds(10L);
 
 	/** Logger for web frontend startup / shutdown messages. */
 	private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
@@ -120,14 +116,15 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private final LeaderRetrievalService leaderRetrievalService;
 
-	/** LeaderRetrievalListener which stores the currently leading JobManager and its archive. */
+	/** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */
 	private final JobManagerRetriever retriever;
 
 	private final SSLContext serverSSLContext;
 
-	private final Promise<String> jobManagerAddressPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+	private final CompletableFuture<String> jobManagerAddressFuture = new CompletableFuture<>();
+
+	private final Time timeout;
 
-	private final FiniteDuration timeout;
 	private final WebFrontendBootstrap netty;
 
 	private final File webRootDir;
@@ -142,7 +139,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private AtomicBoolean cleanedUp = new AtomicBoolean();
 
-	private ExecutorService executorService;
 
 	private MetricFetcher metricFetcher;
 
@@ -150,11 +146,15 @@ public class WebRuntimeMonitor implements WebMonitor {
 			Configuration config,
 			LeaderRetrievalService leaderRetrievalService,
 			BlobView blobView,
-			ActorSystem actorSystem) throws IOException, InterruptedException {
+			JobManagerRetriever jobManagerRetriever,
+			MetricQueryServiceRetriever queryServiceRetriever,
+			Time timeout,
+			Executor executor) throws IOException, InterruptedException {
 
 		this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
-		this.timeout = AkkaUtils.getTimeout(config);
-		this.retriever = new JobManagerRetriever(this, actorSystem, AkkaUtils.getTimeout(config), timeout);
+		this.retriever = Preconditions.checkNotNull(jobManagerRetriever);
+		this.timeout = Preconditions.checkNotNull(timeout);
+
 		this.cfg = new WebMonitorConfig(config);
 
 		final String configuredAddress = cfg.getWebFrontendAddress();
@@ -191,7 +191,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 		// - Back pressure stats ----------------------------------------------
 
-		stackTraceSamples = new StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000);
+		stackTraceSamples = new StackTraceSampleCoordinator(executor, 60000);
 
 		// Back pressure stats tracker config
 		int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
@@ -209,10 +209,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 		// --------------------------------------------------------------------
 
-		executorService = new ForkJoinPool();
-
-		ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService);
-
 		// Config to enable https access to the web-ui
 		boolean enableSSL = config.getBoolean(WebOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config);
 
@@ -226,11 +222,11 @@ public class WebRuntimeMonitor implements WebMonitor {
 		} else {
 			serverSSLContext = null;
 		}
-		metricFetcher = new MetricFetcher(actorSystem, retriever, context);
+		metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, executor, timeout);
 
 		String defaultSavepointDir = config.getString(CoreOptions.SAVEPOINT_DIRECTORY);
 
-		JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, context, defaultSavepointDir);
+		JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, executor, defaultSavepointDir);
 		RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler());
 		RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler());
 
@@ -274,8 +270,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 		get(router,
 			new TaskManagerLogHandler(
 				retriever,
-				context,
-				jobManagerAddressPromise.future(),
+				executor,
+				jobManagerAddressFuture,
 				timeout,
 				TaskManagerLogHandler.FileMode.LOG,
 				config,
@@ -284,8 +280,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 		get(router,
 			new TaskManagerLogHandler(
 				retriever,
-				context,
-				jobManagerAddressPromise.future(),
+				executor,
+				jobManagerAddressFuture,
 				timeout,
 				TaskManagerLogHandler.FileMode.STDOUT,
 				config,
@@ -296,27 +292,27 @@ public class WebRuntimeMonitor implements WebMonitor {
 		router
 			// log and stdout
 			.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
-				new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile,
+				new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.logFile,
 					enableSSL))
 
 			.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
-				new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile,
+				new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.stdOutFile,
 					enableSSL));
 
 		get(router, new JobManagerMetricsHandler(metricFetcher));
 
 		// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
-		get(router, new JobCancellationHandler());
+		get(router, new JobCancellationHandler(timeout));
 		// DELETE is the preferred way of canceling a job (Rest-conform)
-		delete(router, new JobCancellationHandler());
+		delete(router, new JobCancellationHandler(timeout));
 
 		get(router, triggerHandler);
 		get(router, inProgressHandler);
 
 		// stop a job via GET (for proper integration with YARN this has to be performed via GET)
-		get(router, new JobStoppingHandler());
+		get(router, new JobStoppingHandler(timeout));
 		// DELETE is the preferred way of stopping a job (Rest-conform)
-		delete(router, new JobStoppingHandler());
+		delete(router, new JobStoppingHandler(timeout));
 
 		int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
 		CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
@@ -351,7 +347,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		}
 
 		// this handler serves all the static contents
-		router.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir,
+		router.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, webRootDir,
 			enableSSL));
 
 		// add shutdown hook for deleting the directories and remaining temp files on shutdown
@@ -387,7 +383,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 	 * @return array of all JsonArchivists relevant for the history server
 	 */
 	public static JsonArchivist[] getJsonArchivists() {
-		JsonArchivist[] archivists = new JsonArchivist[]{
+		JsonArchivist[] archivists = {
 			new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),
 
 			new JobPlanHandler.JobPlanJsonArchivist(),
@@ -418,7 +414,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		LOG.info("Starting with JobManager {} on port {}", jobManagerAkkaUrl, getServerPort());
 
 		synchronized (startupShutdownLock) {
-			jobManagerAddressPromise.success(jobManagerAkkaUrl);
+			jobManagerAddressFuture.complete(jobManagerAkkaUrl);
 			leaderRetrievalService.start(retriever);
 
 			long delay = backPressureStatsTracker.getCleanUpInterval();
@@ -451,8 +447,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 			backPressureStatsTracker.shutDown();
 
-			executorService.shutdownNow();
-
 			cleanup();
 		}
 	}
@@ -522,7 +516,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 	// ------------------------------------------------------------------------
 
 	private RuntimeMonitorHandler handler(RequestHandler handler) {
-		return new RuntimeMonitorHandler(cfg, handler, retriever, jobManagerAddressPromise.future(), timeout,
+		return new RuntimeMonitorHandler(cfg, handler, retriever, jobManagerAddressFuture, timeout,
 			serverSSLContext !=  null);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index be6928e..15acb00 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -26,9 +26,10 @@ package org.apache.flink.runtime.webmonitor.files;
  * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
  *****************************************************************************/
 
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -70,13 +71,9 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.Locale;
+import java.util.Optional;
 import java.util.TimeZone;
-
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
@@ -118,9 +115,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 
 	private final JobManagerRetriever retriever;
 
-	private final Future<String> localJobManagerAddressFuture;
+	private final CompletableFuture<String> localJobManagerAddressFuture;
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
 	/** The path in which the static documents are. */
 	private final File rootPath;
@@ -135,8 +132,8 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 
 	public StaticFileServerHandler(
 			JobManagerRetriever retriever,
-			Future<String> localJobManagerAddressPromise,
-			FiniteDuration timeout,
+			CompletableFuture<String> localJobManagerAddressPromise,
+			Time timeout,
 			File rootPath,
 			boolean httpsEnabled) throws IOException {
 
@@ -145,8 +142,8 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 
 	public StaticFileServerHandler(
 			JobManagerRetriever retriever,
-			Future<String> localJobManagerAddressFuture,
-			FiniteDuration timeout,
+			CompletableFuture<String> localJobManagerAddressFuture,
+			Time timeout,
 			File rootPath,
 			boolean httpsEnabled,
 			Logger logger) throws IOException {
@@ -165,9 +162,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 
 	@Override
 	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
-		if (localJobManagerAddressFuture.isCompleted()) {
+		if (localJobManagerAddressFuture.isDone()) {
 			if (localJobManagerAddress == null) {
-				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
+				localJobManagerAddress = localJobManagerAddressFuture.get();
 			}
 
 			final HttpRequest request = routed.request();
@@ -183,12 +180,12 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 				requestPath = "";
 			}
 
-			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
+			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
 
-			if (jobManager.isDefined()) {
+			if (optJobManagerGateway.isPresent()) {
 				// Redirect to leader if necessary
 				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
-					localJobManagerAddress, jobManager.get());
+					localJobManagerAddress, optJobManagerGateway.get(), timeout);
 
 				if (redirectAddress != null) {
 					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index d6c17af..89108db 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.NotFoundException;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Base class for request handlers whose response depends on an ExecutionGraph
@@ -35,11 +38,11 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
 	private final ExecutionGraphHolder executionGraphHolder;
 
 	public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder) {
-		this.executionGraphHolder = executionGraphHolder;
+		this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder);
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		String jidString = pathParams.get("jobid");
 		if (jidString == null) {
 			throw new RuntimeException("JobId parameter missing");
@@ -53,12 +56,17 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
 			throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage());
 		}
 
-		AccessExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager);
-		if (eg == null) {
-			throw new NotFoundException("Could not find job with id " + jid);
+		final Optional<AccessExecutionGraph> optGraph;
+
+		try {
+			optGraph = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
+		} catch (Exception e) {
+			throw new FlinkException("Could not retrieve ExecutionGraph for job with jobId " + jid + " from the JobManager.", e);
 		}
 
-		return handleRequest(eg, pathParams);
+		final AccessExecutionGraph graph = optGraph.orElseThrow(() -> new NotFoundException("Could not find job with jobId " + jid + '.'));
+
+		return handleRequest(graph, pathParams);
 	}
 
 	public abstract String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
index 2b4a45f..266ffb0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -38,8 +38,8 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
 	private static final Charset ENCODING = Charset.forName("UTF-8");
 
 	@Override
-	public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
-		String result = handleJsonRequest(pathParams, queryParams, jobManager);
+	public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
+		String result = handleJsonRequest(pathParams, queryParams, jobManagerGateway);
 		byte[] bytes = result.getBytes(ENCODING);
 
 		DefaultFullHttpResponse response = new DefaultFullHttpResponse(
@@ -57,7 +57,7 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
 	 *
 	 * @param pathParams The map of REST path parameters, decoded by the router.
 	 * @param queryParams The map of query parameters.
-	 * @param jobManager The JobManager actor.
+	 * @param jobManagerGateway to communicate with the JobManager.
 	 *
 	 * @return The JSON string that is the HTTP response.
 	 *
@@ -69,6 +69,6 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
 	public abstract String handleJsonRequest(
 			Map<String, String> pathParams,
 			Map<String, String> queryParams,
-			ActorGateway jobManager) throws Exception;
+			JobManagerGateway jobManagerGateway) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index 816ef24..4ebc4e7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
@@ -27,10 +27,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.StringWriter;
 import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -46,9 +44,9 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 
 	private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId;
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
-	public ClusterOverviewHandler(FiniteDuration timeout) {
+	public ClusterOverviewHandler(Time timeout) {
 		this.timeout = checkNotNull(timeout);
 	}
 
@@ -58,12 +56,13 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		// we need no parameters, get all requests
 		try {
-			if (jobManager != null) {
-				Future<Object> future = jobManager.ask(RequestStatusOverview.getInstance(), timeout);
-				StatusOverview overview = (StatusOverview) Await.result(future, timeout);
+			if (jobManagerGateway != null) {
+				CompletableFuture<StatusOverview> overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
+
+				StatusOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 				StringWriter writer = new StringWriter();
 				JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 9d0b863..778a300 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -19,18 +19,16 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.StringWriter;
 import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.Objects.requireNonNull;
 
@@ -43,9 +41,9 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 
 	private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
-	public CurrentJobIdsHandler(FiniteDuration timeout) {
+	public CurrentJobIdsHandler(Time timeout) {
 		this.timeout = requireNonNull(timeout);
 	}
 
@@ -55,12 +53,12 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		// we need no parameters, get all requests
 		try {
-			if (jobManager != null) {
-				Future<Object> future = jobManager.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
-				JobsWithIDsOverview overview = (JobsWithIDsOverview) Await.result(future, timeout);
+			if (jobManagerGateway != null) {
+				CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+				JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 				StringWriter writer = new StringWriter();
 				JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index d0518c8..b324426 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -35,10 +35,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -51,13 +49,13 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 	private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running";
 	private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed";
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
 	private final boolean includeRunningJobs;
 	private final boolean includeFinishedJobs;
 
 	public CurrentJobsOverviewHandler(
-			FiniteDuration timeout,
+			Time timeout,
 			boolean includeRunningJobs,
 			boolean includeFinishedJobs) {
 
@@ -79,13 +77,11 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		try {
-			if (jobManager != null) {
-				Future<Object> future = jobManager.ask(
-						new RequestJobDetails(includeRunningJobs, includeFinishedJobs), timeout);
-
-				MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout);
+			if (jobManagerGateway != null) {
+				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
+				MultipleJobsDetails result = jobDetailsFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 				final long now = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index 312c890..fe1d06b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -55,7 +55,7 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		return this.configString;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index 9fbafb8..e27d125 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -30,13 +31,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import scala.Tuple2;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -49,35 +45,26 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class HandlerRedirectUtils {
 
-	private static final Logger LOG = LoggerFactory.getLogger(HandlerRedirectUtils.class);
-
-	/** Pattern to extract the host from an remote Akka URL. */
-	private static final Pattern LeaderAddressHostPattern = Pattern.compile("^.+@(.+):([0-9]+)/user/.+$");
-
 	public static String getRedirectAddress(
 			String localJobManagerAddress,
-			Tuple2<ActorGateway, Integer> leader) throws Exception {
+			JobManagerGateway jobManagerGateway,
+			Time timeout) throws Exception {
 
-		final String leaderAddress = leader._1().path();
-		final int webMonitorPort = leader._2();
+		final String leaderAddress = jobManagerGateway.getAddress();
 
 		final String jobManagerName = localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1);
 
 		if (!localJobManagerAddress.equals(leaderAddress) &&
 			!leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) {
 			// We are not the leader and need to redirect
-			Matcher matcher = LeaderAddressHostPattern.matcher(leaderAddress);
-
-			if (matcher.matches()) {
-				String redirectAddress = String.format("%s:%d", matcher.group(1), webMonitorPort);
-				return redirectAddress;
-			}
-			else {
-				LOG.warn("Unexpected leader address pattern {}. Cannot extract host.", leaderAddress);
-			}
-		}
+			final String hostname = jobManagerGateway.getHostname();
 
-		return null;
+			final CompletableFuture<Integer> webMonitorPortFuture = jobManagerGateway.requestWebPort(timeout);
+			final int webMonitorPort = webMonitorPortFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+			return String.format("%s:%d", hostname, webMonitorPort);
+		} else {
+			return null;
+		}
 	}
 
 	public static HttpResponse getRedirectResponse(String redirectAddress, String path, boolean httpsEnabled) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index 4a21fec..db55169 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import java.util.Map;
 
@@ -42,7 +42,7 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		return ERROR_MESSAGE;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 2572a76..73771bd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -46,7 +46,7 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		final String file = pathParams.get("jarid");
 		try {
 			File[] list = jarDir.listFiles(new FilenameFilter() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 4dd20b1..4f9b188 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -51,7 +51,7 @@ public class JarListHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		try {
 			StringWriter writer = new StringWriter();
 			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 1b25e7f..b239160 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -45,7 +45,7 @@ public class JarPlanHandler extends JarActionHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		try {
 			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
 			JobGraph graph = getJobGraphAndClassLoader(config).f0;

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 282fea8..12ffa4f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -34,8 +34,6 @@ import java.io.File;
 import java.io.StringWriter;
 import java.util.Map;
 
-import scala.concurrent.duration.FiniteDuration;
-
 /**
  * This handler handles requests to fetch plan for a jar.
  */
@@ -43,13 +41,13 @@ public class JarRunHandler extends JarActionHandler {
 
 	static final String JAR_RUN_REST_PATH = "/jars/:jarid/run";
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 	private final Configuration clientConfig;
 
-	public JarRunHandler(File jarDirectory, FiniteDuration timeout, Configuration clientConfig) {
+	public JarRunHandler(File jarDirectory, Time timeout, Configuration clientConfig) {
 		super(jarDirectory);
-		this.timeout = timeout;
-		this.clientConfig = clientConfig;
+		this.timeout = Preconditions.checkNotNull(timeout);
+		this.clientConfig = Preconditions.checkNotNull(clientConfig);
 	}
 
 	@Override
@@ -58,17 +56,17 @@ public class JarRunHandler extends JarActionHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		try {
 			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
 			Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
 
 			try {
 				JobClient.submitJobDetached(
-					new AkkaJobManagerGateway(jobManager),
+					jobManagerGateway,
 					clientConfig,
 					graph.f0,
-					Time.milliseconds(timeout.toMillis()),
+					timeout,
 					graph.f1);
 			} catch (JobExecutionException e) {
 				throw new ProgramInvocationException("Failed to submit the job to the job manager", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 745a110..705c321 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import java.io.File;
 import java.util.Map;
@@ -44,9 +44,9 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
 
 	@Override
 	public String handleJsonRequest(
-				Map<String, String> pathParams,
-				Map<String, String> queryParams,
-				ActorGateway jobManager) throws Exception {
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			JobManagerGateway jobManagerGateway) throws Exception {
 
 		String tempFilePath = queryParams.get("filepath");
 		String filename = queryParams.get("filename");

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index d9de7d7..513dc08 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -19,8 +19,9 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
@@ -33,17 +34,23 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
 	private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel";
 	private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel";
 
+	private final Time timeout;
+
+	public JobCancellationHandler(Time timeout) {
+		this.timeout = Preconditions.checkNotNull(timeout);
+	}
+
 	@Override
 	public String[] getPaths() {
 		return new String[]{JOB_CONCELLATION_REST_PATH, JOB_CONCELLATION_YARN_REST_PATH};
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		try {
-			JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-			if (jobManager != null) {
-				jobManager.tell(new JobManagerMessages.CancelJob(jobid));
+			JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+			if (jobManagerGateway != null) {
+				jobManagerGateway.cancelJob(jobId, timeout);
 				return "{}";
 			}
 			else {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index 7dd4a52..9b474aa 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -19,16 +19,17 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.NotFoundException;
+import org.apache.flink.util.FlinkException;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -37,7 +38,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
 
-import akka.dispatch.OnComplete;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import javax.annotation.Nullable;
@@ -48,10 +48,9 @@ import java.nio.charset.Charset;
 import java.util.ArrayDeque;
 import java.util.HashMap;
 import java.util.Map;
-
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -92,16 +91,16 @@ public class JobCancellationWithSavepointHandlers {
 
 	public JobCancellationWithSavepointHandlers(
 			ExecutionGraphHolder currentGraphs,
-			ExecutionContext executionContext) {
-		this(currentGraphs, executionContext, null);
+			Executor executor) {
+		this(currentGraphs, executor, null);
 	}
 
 	public JobCancellationWithSavepointHandlers(
 			ExecutionGraphHolder currentGraphs,
-			ExecutionContext executionContext,
+			Executor executor,
 			@Nullable String defaultSavepointDirectory) {
 
-		this.triggerHandler = new TriggerHandler(currentGraphs, executionContext);
+		this.triggerHandler = new TriggerHandler(currentGraphs, executor);
 		this.inProgressHandler = new InProgressHandler();
 		this.defaultSavepointDirectory = defaultSavepointDirectory;
 	}
@@ -127,11 +126,11 @@ public class JobCancellationWithSavepointHandlers {
 		private final ExecutionGraphHolder currentGraphs;
 
 		/** Execution context for futures. */
-		private final ExecutionContext executionContext;
+		private final Executor executor;
 
-		public TriggerHandler(ExecutionGraphHolder currentGraphs, ExecutionContext executionContext) {
+		public TriggerHandler(ExecutionGraphHolder currentGraphs, Executor executor) {
 			this.currentGraphs = checkNotNull(currentGraphs);
-			this.executionContext = checkNotNull(executionContext);
+			this.executor = checkNotNull(executor);
 		}
 
 		@Override
@@ -144,35 +143,40 @@ public class JobCancellationWithSavepointHandlers {
 		public FullHttpResponse handleRequest(
 				Map<String, String> pathParams,
 				Map<String, String> queryParams,
-				ActorGateway jobManager) throws Exception {
+				JobManagerGateway jobManagerGateway) throws Exception {
 
 			try {
-				if (jobManager != null) {
+				if (jobManagerGateway != null) {
 					JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+					final Optional<AccessExecutionGraph> optGraph;
 
-					AccessExecutionGraph graph = currentGraphs.getExecutionGraph(jobId, jobManager);
-					if (graph == null) {
-						throw new Exception("Cannot find ExecutionGraph for job.");
-					} else {
-						CheckpointCoordinator coord = graph.getCheckpointCoordinator();
-						if (coord == null) {
-							throw new Exception("Cannot find CheckpointCoordinator for job.");
-						}
+					try {
+						optGraph = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
+					} catch (Exception e) {
+						throw new FlinkException("Could not retrieve the execution with jobId " + jobId + " from the JobManager.", e);
+					}
 
-						String targetDirectory = pathParams.get("targetDirectory");
-						if (targetDirectory == null) {
-							if (defaultSavepointDirectory == null) {
-								throw new IllegalStateException("No savepoint directory configured. " +
-										"You can either specify a directory when triggering this savepoint or " +
-										"configure a cluster-wide default via key '" +
-										CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
-							} else {
-								targetDirectory = defaultSavepointDirectory;
-							}
-						}
+					final AccessExecutionGraph graph = optGraph.orElseThrow(
+						() -> new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'));
 
-						return handleNewRequest(jobManager, jobId, targetDirectory, coord.getCheckpointTimeout());
+					CheckpointCoordinator coord = graph.getCheckpointCoordinator();
+					if (coord == null) {
+						throw new Exception("Cannot find CheckpointCoordinator for job.");
 					}
+
+					String targetDirectory = pathParams.get("targetDirectory");
+					if (targetDirectory == null) {
+						if (defaultSavepointDirectory == null) {
+							throw new IllegalStateException("No savepoint directory configured. " +
+									"You can either specify a directory when triggering this savepoint or " +
+									"configure a cluster-wide default via key '" +
+									CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
+						} else {
+							targetDirectory = defaultSavepointDirectory;
+						}
+					}
+
+					return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
 				} else {
 					throw new Exception("No connection to the leading JobManager.");
 				}
@@ -182,7 +186,7 @@ public class JobCancellationWithSavepointHandlers {
 		}
 
 		@SuppressWarnings("unchecked")
-		private FullHttpResponse handleNewRequest(ActorGateway jobManager, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException {
+		private FullHttpResponse handleNewRequest(JobManagerGateway jobManagerGateway, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException {
 			// Check whether a request exists
 			final long requestId;
 			final boolean isNewRequest;
@@ -202,35 +206,21 @@ public class JobCancellationWithSavepointHandlers {
 
 				try {
 					// Trigger cancellation
-					Object msg = new CancelJobWithSavepoint(jobId, targetDirectory);
-					Future<Object> cancelFuture = jobManager
-							.ask(msg, FiniteDuration.apply(checkpointTimeout, "ms"));
-
-					cancelFuture.onComplete(new OnComplete<Object>() {
-						@Override
-						public void onComplete(Throwable failure, Object resp) throws Throwable {
-							synchronized (lock) {
-								try {
-									if (resp != null) {
-										if (resp.getClass() == CancellationSuccess.class) {
-											String path = ((CancellationSuccess) resp).savepointPath();
-											completed.put(requestId, path);
-										} else if (resp.getClass() == CancellationFailure.class) {
-											Throwable cause = ((CancellationFailure) resp).cause();
-											completed.put(requestId, cause);
-										} else {
-											Throwable cause = new IllegalStateException("Unexpected CancellationResponse of type " + resp.getClass());
-											completed.put(requestId, cause);
-										}
-									} else {
-										completed.put(requestId, failure);
-									}
-								} finally {
-									inProgress.remove(jobId);
+					CompletableFuture<String> cancelJobFuture = jobManagerGateway
+						.cancelJobWithSavepoint(jobId, targetDirectory, Time.milliseconds(checkpointTimeout));
+
+					cancelJobFuture.whenCompleteAsync(
+						(String path, Throwable throwable) -> {
+							try {
+								if (throwable != null) {
+									completed.put(requestId, throwable);
+								} else {
+									completed.put(requestId, path);
 								}
+							} finally {
+								inProgress.remove(jobId);
 							}
-						}
-					}, executionContext);
+						}, executor);
 
 					success = true;
 				} finally {
@@ -298,9 +288,9 @@ public class JobCancellationWithSavepointHandlers {
 
 		@Override
 		@SuppressWarnings("unchecked")
-		public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+		public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 			try {
-				if (jobManager != null) {
+				if (jobManagerGateway != null) {
 					JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
 					long requestId = Long.parseLong(pathParams.get("requestId"));