You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/11 11:49:01 UTC

[4/4] flink git commit: [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway

[FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway

This PR decouples the WebRuntimeMonitor from the ActorGateway by introducing
the JobManagerGateway interface which can have multiple implementations. This
is a preliminary step for the integration of the existing WebRuntimeMonitor
with the Flip-6 JobMaster.

Add time unit for web.timeout

This closes #4492.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f790d3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f790d3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f790d3e

Branch: refs/heads/master
Commit: 9f790d3efe5e8267da05eb97bbe07ca8a0f859fe
Parents: 00d5b62
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 2 18:43:00 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Aug 11 13:48:14 2017 +0200

----------------------------------------------------------------------
 docs/ops/config.md                              |   2 +
 .../apache/flink/configuration/WebOptions.java  |   7 +
 .../MesosApplicationMasterRunner.java           |  13 +-
 .../webmonitor/ExecutionGraphHolder.java        |  59 ++----
 .../runtime/webmonitor/JobManagerRetriever.java | 197 ------------------
 .../webmonitor/RuntimeMonitorHandler.java       |  16 +-
 .../webmonitor/RuntimeMonitorHandlerBase.java   |  36 ++--
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  74 ++++---
 .../files/StaticFileServerHandler.java          |  35 ++--
 .../AbstractExecutionGraphRequestHandler.java   |  22 +-
 .../handlers/AbstractJsonRequestHandler.java    |  10 +-
 .../handlers/ClusterOverviewHandler.java        |  23 +--
 .../handlers/CurrentJobIdsHandler.java          |  22 +-
 .../handlers/CurrentJobsOverviewHandler.java    |  24 +--
 .../handlers/DashboardConfigHandler.java        |   4 +-
 .../handlers/HandlerRedirectUtils.java          |  41 ++--
 .../handlers/JarAccessDeniedHandler.java        |   4 +-
 .../webmonitor/handlers/JarDeleteHandler.java   |   4 +-
 .../webmonitor/handlers/JarListHandler.java     |   4 +-
 .../webmonitor/handlers/JarPlanHandler.java     |   4 +-
 .../webmonitor/handlers/JarRunHandler.java      |  20 +-
 .../webmonitor/handlers/JarUploadHandler.java   |   8 +-
 .../handlers/JobCancellationHandler.java        |  19 +-
 .../JobCancellationWithSavepointHandlers.java   | 124 ++++++------
 .../handlers/JobManagerConfigHandler.java       |   4 +-
 .../webmonitor/handlers/JobStoppingHandler.java |  19 +-
 .../webmonitor/handlers/RequestHandler.java     |   9 +-
 .../handlers/TaskManagerLogHandler.java         |  66 +++---
 .../handlers/TaskManagersHandler.java           |  39 ++--
 .../CheckpointStatsDetailsSubtasksHandler.java  |   6 +-
 .../metrics/AbstractMetricsHandler.java         |   4 +-
 .../webmonitor/metrics/MetricFetcher.java       | 202 +++++++++----------
 .../runtime/webmonitor/WebFrontendITCase.java   |  27 +--
 .../webmonitor/WebRuntimeMonitorITCase.java     |  73 ++++---
 .../handlers/ClusterOverviewHandlerTest.java    |   8 +-
 .../handlers/CurrentJobIdsHandlerTest.java      |   8 +-
 .../CurrentJobsOverviewHandlerTest.java         |  10 +-
 .../handlers/HandlerRedirectUtilsTest.java      |  39 ++--
 .../webmonitor/handlers/JarRunHandlerTest.java  |   5 +-
 .../handlers/JobAccumulatorsHandlerTest.java    |   5 +-
 .../handlers/JobCancellationHandlerTest.java    |   4 +-
 ...obCancellationWithSavepointHandlersTest.java |  82 ++++----
 .../handlers/JobConfigHandlerTest.java          |   5 +-
 .../handlers/JobDetailsHandlerTest.java         |   5 +-
 .../handlers/JobExceptionsHandlerTest.java      |   5 +-
 .../webmonitor/handlers/JobPlanHandlerTest.java |   5 +-
 .../handlers/JobStoppingHandlerTest.java        |   7 +-
 .../JobVertexAccumulatorsHandlerTest.java       |   5 +-
 .../JobVertexBackPressureHandlerTest.java       |   2 +-
 .../handlers/JobVertexDetailsHandlerTest.java   |   5 +-
 .../JobVertexTaskManagersHandlerTest.java       |   5 +-
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |   6 +-
 ...ExecutionAttemptAccumulatorsHandlerTest.java |   5 +-
 ...btaskExecutionAttemptDetailsHandlerTest.java |   5 +-
 .../SubtasksAllAccumulatorsHandlerTest.java     |   5 +-
 .../handlers/SubtasksTimesHandlerTest.java      |   5 +-
 .../handlers/TaskManagerLogHandlerTest.java     |  54 ++---
 .../handlers/TaskManagersHandlerTest.java       |   7 +-
 .../metrics/AbstractMetricsHandlerTest.java     |  26 ++-
 .../metrics/JobManagerMetricsHandlerTest.java   |  20 +-
 .../metrics/JobMetricsHandlerTest.java          |  20 +-
 .../metrics/JobVertexMetricsHandlerTest.java    |  20 +-
 .../webmonitor/metrics/MetricFetcherTest.java   | 111 ++++------
 .../metrics/TaskManagerMetricsHandlerTest.java  |  20 +-
 .../runtime/akka/AkkaJobManagerGateway.java     | 190 +++++++++++++++--
 .../apache/flink/runtime/client/JobClient.java  |   6 +-
 .../clusterframework/BootstrapTools.java        |  34 ++--
 .../flink/runtime/concurrent/FutureUtils.java   |  10 +
 .../runtime/jobmaster/JobManagerGateway.java    | 122 ++++++++++-
 .../metrics/dump/MetricDumpSerialization.java   |   1 +
 .../runtime/webmonitor/WebMonitorUtils.java     |  29 ++-
 .../retriever/JobManagerRetriever.java          | 123 +++++++++++
 .../retriever/MetricQueryServiceGateway.java    |  36 ++++
 .../retriever/MetricQueryServiceRetriever.java  |  35 ++++
 .../retriever/impl/AkkaJobManagerRetriever.java |  69 +++++++
 .../retriever/impl/AkkaQueryServiceGateway.java |  53 +++++
 .../impl/AkkaQueryServiceRetriever.java         |  51 +++++
 .../retriever/impl/RpcJobManagerRetriever.java  |  46 +++++
 .../flink/runtime/jobmanager/JobManager.scala   |  10 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  10 +-
 .../runtime/testingUtils/TestingUtils.scala     |   2 +
 .../flink/yarn/YarnApplicationMasterRunner.java |  12 +-
 82 files changed, 1551 insertions(+), 1018 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index c8d5c92..4138b4d 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -389,6 +389,8 @@ These parameters allow for advanced tuning. The default values are sufficient wh
 
 - `web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`.
 
+- `web.timeout`: Timeout for asynchronous operation executed by the web frontend in milliseconds (DEFAULT: `10000`, 10 s)
+
 ### File Systems
 
 The parameters define the behavior of tasks that create result files.

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
index f499045..3733244 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
@@ -149,6 +149,13 @@ public class WebOptions {
 			.defaultValue(50)
 			.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples");
 
+	/**
+	 * Timeout for asynchronous operations by the WebRuntimeMonitor in milliseconds.
+	 */
+	public static final ConfigOption<Long> TIMEOUT = ConfigOptions
+		.key("web.timeout")
+		.defaultValue(10L * 1000L);
+
 
 	private WebOptions() {
 		throw new IllegalAccessError();

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 260b7f3..7891386 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
@@ -52,6 +54,8 @@ import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -319,11 +323,16 @@ public class MesosApplicationMasterRunner {
 			// 2: the web monitor
 			LOG.debug("Starting Web Frontend");
 
+			Time webMonitorTimeout = Time.milliseconds(config.getLong(WebOptions.TIMEOUT));
+
 			webMonitor = BootstrapTools.startWebMonitorIfConfigured(
 				config,
 				highAvailabilityServices,
-				actorSystem,
-				jobManager,
+				new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout),
+				new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
+				webMonitorTimeout,
+				futureExecutor,
+				AkkaUtils.getAkkaURL(actorSystem, jobManager),
 				LOG);
 			if (webMonitor != null) {
 				final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 75b0475..739b375 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -19,20 +19,19 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
 import java.util.WeakHashMap;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -48,7 +47,7 @@ public class ExecutionGraphHolder {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
 	private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>();
 
@@ -56,50 +55,36 @@ public class ExecutionGraphHolder {
 		this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
 	}
 
-	public ExecutionGraphHolder(FiniteDuration timeout) {
+	public ExecutionGraphHolder(Time timeout) {
 		this.timeout = checkNotNull(timeout);
 	}
 
 	/**
-	 * Retrieves the execution graph with {@link JobID} jid or null if it cannot be found.
+	 * Retrieves the execution graph with {@link JobID} jid wrapped in {@link Optional} or
+	 * {@link Optional#empty()} if it cannot be found.
 	 *
 	 * @param jid jobID of the execution graph to be retrieved
-	 * @return the retrieved execution graph or null if it is not retrievable
+	 * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph
+	 * @throws Exception if the ExecutionGraph retrieval failed.
 	 */
-	public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
+	public Optional<AccessExecutionGraph> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) throws Exception {
 		AccessExecutionGraph cached = cache.get(jid);
 		if (cached != null) {
 			if (cached.getState() == JobStatus.SUSPENDED) {
 				cache.remove(jid);
 			} else {
-				return cached;
+				return Optional.of(cached);
 			}
 		}
 
-		try {
-			if (jobManager != null) {
-				Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
-				Object result = Await.result(future, timeout);
-
-				if (result instanceof JobManagerMessages.JobNotFound) {
-					return null;
-				}
-				else if (result instanceof JobManagerMessages.JobFound) {
-					AccessExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
-					cache.put(jid, eg);
-					return eg;
-				}
-				else {
-					throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
-				}
-			}
-			else {
-				LOG.warn("No connection to the leading JobManager.");
-				return null;
-			}
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Error requesting execution graph", e);
-		}
+		CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);
+
+		Optional<AccessExecutionGraph> result = executionGraphFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+		return result.map((executionGraph) -> {
+			cache.put(jid, executionGraph);
+
+			return executionGraph;
+		});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
deleted file mode 100644
index 175a4b8..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.ResponseWebMonitorPort;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Retrieves and stores the actor gateway to the current leading JobManager. In case of an error,
- * the {@link WebRuntimeMonitor} to which this instance is associated will be stopped.
- *
- * <p>The job manager gateway only works if the web monitor and the job manager run in the same
- * actor system, because many execution graph structures are not serializable. This breaks the nice
- * leader retrieval abstraction and we have a special code path in case that another job manager is
- * leader (see {@link org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils}. In such a
- * case, we get the address of the web monitor of the leading job manager and redirect to it
- * (instead of directly communicating with it).
- */
-public class JobManagerRetriever implements LeaderRetrievalListener {
-
-	private static final Logger LOG = LoggerFactory.getLogger(JobManagerRetriever.class);
-
-	private final Object waitLock = new Object();
-
-	private final WebMonitor webMonitor;
-	private final ActorSystem actorSystem;
-	private final FiniteDuration lookupTimeout;
-	private final FiniteDuration timeout;
-
-	private volatile Future<Tuple2<ActorGateway, Integer>> leaderGatewayPortFuture;
-
-	public JobManagerRetriever(
-			WebMonitor webMonitor,
-			ActorSystem actorSystem,
-			FiniteDuration lookupTimeout,
-			FiniteDuration timeout) {
-
-		this.webMonitor = checkNotNull(webMonitor);
-		this.actorSystem = checkNotNull(actorSystem);
-		this.lookupTimeout = checkNotNull(lookupTimeout);
-		this.timeout = checkNotNull(timeout);
-	}
-
-	/**
-	 * Returns the currently known leading job manager gateway and its web monitor port.
-	 */
-	public Option<Tuple2<ActorGateway, Integer>> getJobManagerGatewayAndWebPort() throws Exception {
-		if (leaderGatewayPortFuture != null) {
-			Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = leaderGatewayPortFuture;
-
-			if (gatewayPortFuture.isCompleted()) {
-				Tuple2<ActorGateway, Integer> gatewayPort = Await.result(gatewayPortFuture, timeout);
-
-				return Option.apply(gatewayPort);
-			} else {
-				return Option.empty();
-			}
-		} else {
-			return Option.empty();
-		}
-	}
-
-	/**
-	 * Awaits the leading job manager gateway and its web monitor port.
-	 */
-	public Tuple2<ActorGateway, Integer> awaitJobManagerGatewayAndWebPort() throws Exception {
-		Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = null;
-		Deadline deadline = timeout.fromNow();
-
-		while (!deadline.isOverdue()) {
-			synchronized (waitLock) {
-				gatewayPortFuture = leaderGatewayPortFuture;
-
-				if (gatewayPortFuture != null) {
-					break;
-				}
-
-				waitLock.wait(deadline.timeLeft().toMillis());
-			}
-		}
-
-		if (gatewayPortFuture == null) {
-			throw new TimeoutException("There is no JobManager available.");
-		} else {
-			return Await.result(gatewayPortFuture, deadline.timeLeft());
-		}
-	}
-
-	@Override
-	public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
-		if (leaderAddress != null && !leaderAddress.equals("")) {
-			try {
-				final Promise<Tuple2<ActorGateway, Integer>> leaderGatewayPortPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-				synchronized (waitLock) {
-					leaderGatewayPortFuture = leaderGatewayPortPromise.future();
-					waitLock.notifyAll();
-				}
-
-				LOG.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
-
-				AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, lookupTimeout)
-						// Resolve the actor ref
-						.flatMap(new Mapper<ActorRef, Future<Tuple2<ActorGateway, Object>>>() {
-							@Override
-							public Future<Tuple2<ActorGateway, Object>> apply(ActorRef jobManagerRef) {
-								ActorGateway leaderGateway = new AkkaActorGateway(
-										jobManagerRef, leaderSessionID);
-
-								Future<Object> webMonitorPort = leaderGateway.ask(
-									JobManagerMessages.getRequestWebMonitorPort(),
-									timeout);
-
-								return Futures.successful(leaderGateway).zip(webMonitorPort);
-							}
-						}, actorSystem.dispatcher())
-								// Request the web monitor port
-						.onComplete(new OnComplete<Tuple2<ActorGateway, Object>>() {
-							@Override
-							public void onComplete(Throwable failure, Tuple2<ActorGateway, Object> success) throws Throwable {
-								if (failure == null) {
-									if (success._2() instanceof ResponseWebMonitorPort) {
-										int webMonitorPort = ((ResponseWebMonitorPort) success._2()).port();
-
-										leaderGatewayPortPromise.success(new Tuple2<>(success._1(), webMonitorPort));
-									} else {
-										leaderGatewayPortPromise.failure(new Exception("Received the message " +
-										success._2() + " as response to " + JobManagerMessages.getRequestWebMonitorPort() +
-											". But a message of type " + ResponseWebMonitorPort.class + " was expected."));
-									}
-								} else {
-									LOG.warn("Failed to retrieve leader gateway and port.", failure);
-									leaderGatewayPortPromise.failure(failure);
-								}
-							}
-						}, actorSystem.dispatcher());
-			}
-			catch (Exception e) {
-				handleError(e);
-			}
-		}
-	}
-
-	@Override
-	public void handleError(Exception exception) {
-		LOG.error("Received error from LeaderRetrievalService.", exception);
-
-		try {
-			// stop associated webMonitor
-			webMonitor.stop();
-		}
-		catch (Exception e) {
-			LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 4777202..35d13dd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -43,9 +45,7 @@ import java.net.URLDecoder;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
-
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -72,8 +72,8 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 			WebMonitorConfig cfg,
 			RequestHandler handler,
 			JobManagerRetriever retriever,
-			Future<String> localJobManagerAddressFuture,
-			FiniteDuration timeout,
+			CompletableFuture<String> localJobManagerAddressFuture,
+			Time timeout,
 			boolean httpsEnabled) {
 
 		super(retriever, localJobManagerAddressFuture, timeout, httpsEnabled);
@@ -87,7 +87,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 	}
 
 	@Override
-	protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) {
+	protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) {
 		FullHttpResponse response;
 
 		try {
@@ -106,7 +106,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 			queryParams.put(WEB_MONITOR_ADDRESS_KEY,
 				(httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort());
 
-			response = handler.handleRequest(pathParams, queryParams, jobManager);
+			response = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
 		}
 		catch (NotFoundException e) {
 			// this should result in a 404 error code (not found)

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
index d524632..4cb55f1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.webmonitor;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -29,11 +31,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -48,9 +48,9 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
 
 	private final JobManagerRetriever retriever;
 
-	protected final Future<String> localJobManagerAddressFuture;
+	protected final CompletableFuture<String> localJobManagerAddressFuture;
 
-	protected final FiniteDuration timeout;
+	protected final Time timeout;
 
 	/** Whether the web service has https enabled. */
 	protected final boolean httpsEnabled;
@@ -59,8 +59,8 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
 
 	public RuntimeMonitorHandlerBase(
 		JobManagerRetriever retriever,
-		Future<String> localJobManagerAddressFuture,
-		FiniteDuration timeout,
+		CompletableFuture<String> localJobManagerAddressFuture,
+		Time timeout,
 		boolean httpsEnabled) {
 
 		this.retriever = checkNotNull(retriever);
@@ -78,17 +78,17 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
 
 	@Override
 	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
-		if (localJobManagerAddressFuture.isCompleted()) {
+		if (localJobManagerAddressFuture.isDone()) {
 			if (localJobManagerAddress == null) {
-				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
+				localJobManagerAddress = localJobManagerAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			}
 
-			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
+			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
 
-			if (jobManager.isDefined()) {
-				Tuple2<ActorGateway, Integer> gatewayPort = jobManager.get();
+			if (optJobManagerGateway.isPresent()) {
+				JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
 				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
-					localJobManagerAddress, gatewayPort);
+					localJobManagerAddress, jobManagerGateway, timeout);
 
 				if (redirectAddress != null) {
 					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path(),
@@ -96,7 +96,7 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
 					KeepAliveWrite.flush(ctx, routed.request(), redirect);
 				}
 				else {
-					respondAsLeader(ctx, routed, gatewayPort._1());
+					respondAsLeader(ctx, routed, jobManagerGateway);
 				}
 			} else {
 				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
@@ -106,5 +106,5 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
 		}
 	}
 
-	protected abstract void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager);
+	protected abstract void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index e27a15f..17f02f0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -71,12 +70,14 @@ import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 
-import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,16 +86,11 @@ import javax.net.ssl.SSLContext;
 import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -108,7 +104,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class WebRuntimeMonitor implements WebMonitor {
 
 	/** By default, all requests to the JobManager have a timeout of 10 seconds. */
-	public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
+	public static final Time DEFAULT_REQUEST_TIMEOUT = Time.seconds(10L);
 
 	/** Logger for web frontend startup / shutdown messages. */
 	private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
@@ -120,14 +116,15 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private final LeaderRetrievalService leaderRetrievalService;
 
-	/** LeaderRetrievalListener which stores the currently leading JobManager and its archive. */
+	/** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */
 	private final JobManagerRetriever retriever;
 
 	private final SSLContext serverSSLContext;
 
-	private final Promise<String> jobManagerAddressPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+	private final CompletableFuture<String> jobManagerAddressFuture = new CompletableFuture<>();
+
+	private final Time timeout;
 
-	private final FiniteDuration timeout;
 	private final WebFrontendBootstrap netty;
 
 	private final File webRootDir;
@@ -142,7 +139,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private AtomicBoolean cleanedUp = new AtomicBoolean();
 
-	private ExecutorService executorService;
 
 	private MetricFetcher metricFetcher;
 
@@ -150,11 +146,15 @@ public class WebRuntimeMonitor implements WebMonitor {
 			Configuration config,
 			LeaderRetrievalService leaderRetrievalService,
 			BlobView blobView,
-			ActorSystem actorSystem) throws IOException, InterruptedException {
+			JobManagerRetriever jobManagerRetriever,
+			MetricQueryServiceRetriever queryServiceRetriever,
+			Time timeout,
+			Executor executor) throws IOException, InterruptedException {
 
 		this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
-		this.timeout = AkkaUtils.getTimeout(config);
-		this.retriever = new JobManagerRetriever(this, actorSystem, AkkaUtils.getTimeout(config), timeout);
+		this.retriever = Preconditions.checkNotNull(jobManagerRetriever);
+		this.timeout = Preconditions.checkNotNull(timeout);
+
 		this.cfg = new WebMonitorConfig(config);
 
 		final String configuredAddress = cfg.getWebFrontendAddress();
@@ -191,7 +191,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 		// - Back pressure stats ----------------------------------------------
 
-		stackTraceSamples = new StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000);
+		stackTraceSamples = new StackTraceSampleCoordinator(executor, 60000);
 
 		// Back pressure stats tracker config
 		int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
@@ -209,10 +209,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 		// --------------------------------------------------------------------
 
-		executorService = new ForkJoinPool();
-
-		ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService);
-
 		// Config to enable https access to the web-ui
 		boolean enableSSL = config.getBoolean(WebOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config);
 
@@ -226,11 +222,11 @@ public class WebRuntimeMonitor implements WebMonitor {
 		} else {
 			serverSSLContext = null;
 		}
-		metricFetcher = new MetricFetcher(actorSystem, retriever, context);
+		metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, executor, timeout);
 
 		String defaultSavepointDir = config.getString(CoreOptions.SAVEPOINT_DIRECTORY);
 
-		JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, context, defaultSavepointDir);
+		JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, executor, defaultSavepointDir);
 		RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler());
 		RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler());
 
@@ -274,8 +270,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 		get(router,
 			new TaskManagerLogHandler(
 				retriever,
-				context,
-				jobManagerAddressPromise.future(),
+				executor,
+				jobManagerAddressFuture,
 				timeout,
 				TaskManagerLogHandler.FileMode.LOG,
 				config,
@@ -284,8 +280,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 		get(router,
 			new TaskManagerLogHandler(
 				retriever,
-				context,
-				jobManagerAddressPromise.future(),
+				executor,
+				jobManagerAddressFuture,
 				timeout,
 				TaskManagerLogHandler.FileMode.STDOUT,
 				config,
@@ -296,27 +292,27 @@ public class WebRuntimeMonitor implements WebMonitor {
 		router
 			// log and stdout
 			.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
-				new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile,
+				new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.logFile,
 					enableSSL))
 
 			.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
-				new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile,
+				new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.stdOutFile,
 					enableSSL));
 
 		get(router, new JobManagerMetricsHandler(metricFetcher));
 
 		// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
-		get(router, new JobCancellationHandler());
+		get(router, new JobCancellationHandler(timeout));
 		// DELETE is the preferred way of canceling a job (Rest-conform)
-		delete(router, new JobCancellationHandler());
+		delete(router, new JobCancellationHandler(timeout));
 
 		get(router, triggerHandler);
 		get(router, inProgressHandler);
 
 		// stop a job via GET (for proper integration with YARN this has to be performed via GET)
-		get(router, new JobStoppingHandler());
+		get(router, new JobStoppingHandler(timeout));
 		// DELETE is the preferred way of stopping a job (Rest-conform)
-		delete(router, new JobStoppingHandler());
+		delete(router, new JobStoppingHandler(timeout));
 
 		int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
 		CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
@@ -351,7 +347,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		}
 
 		// this handler serves all the static contents
-		router.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir,
+		router.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, webRootDir,
 			enableSSL));
 
 		// add shutdown hook for deleting the directories and remaining temp files on shutdown
@@ -387,7 +383,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 	 * @return array of all JsonArchivists relevant for the history server
 	 */
 	public static JsonArchivist[] getJsonArchivists() {
-		JsonArchivist[] archivists = new JsonArchivist[]{
+		JsonArchivist[] archivists = {
 			new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),
 
 			new JobPlanHandler.JobPlanJsonArchivist(),
@@ -418,7 +414,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		LOG.info("Starting with JobManager {} on port {}", jobManagerAkkaUrl, getServerPort());
 
 		synchronized (startupShutdownLock) {
-			jobManagerAddressPromise.success(jobManagerAkkaUrl);
+			jobManagerAddressFuture.complete(jobManagerAkkaUrl);
 			leaderRetrievalService.start(retriever);
 
 			long delay = backPressureStatsTracker.getCleanUpInterval();
@@ -451,8 +447,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 			backPressureStatsTracker.shutDown();
 
-			executorService.shutdownNow();
-
 			cleanup();
 		}
 	}
@@ -522,7 +516,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 	// ------------------------------------------------------------------------
 
 	private RuntimeMonitorHandler handler(RequestHandler handler) {
-		return new RuntimeMonitorHandler(cfg, handler, retriever, jobManagerAddressPromise.future(), timeout,
+		return new RuntimeMonitorHandler(cfg, handler, retriever, jobManagerAddressFuture, timeout,
 			serverSSLContext !=  null);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index be6928e..15acb00 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -26,9 +26,10 @@ package org.apache.flink.runtime.webmonitor.files;
  * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
  *****************************************************************************/
 
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -70,13 +71,9 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.Locale;
+import java.util.Optional;
 import java.util.TimeZone;
-
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
@@ -118,9 +115,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 
 	private final JobManagerRetriever retriever;
 
-	private final Future<String> localJobManagerAddressFuture;
+	private final CompletableFuture<String> localJobManagerAddressFuture;
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
 	/** The path in which the static documents are. */
 	private final File rootPath;
@@ -135,8 +132,8 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 
 	public StaticFileServerHandler(
 			JobManagerRetriever retriever,
-			Future<String> localJobManagerAddressPromise,
-			FiniteDuration timeout,
+			CompletableFuture<String> localJobManagerAddressPromise,
+			Time timeout,
 			File rootPath,
 			boolean httpsEnabled) throws IOException {
 
@@ -145,8 +142,8 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 
 	public StaticFileServerHandler(
 			JobManagerRetriever retriever,
-			Future<String> localJobManagerAddressFuture,
-			FiniteDuration timeout,
+			CompletableFuture<String> localJobManagerAddressFuture,
+			Time timeout,
 			File rootPath,
 			boolean httpsEnabled,
 			Logger logger) throws IOException {
@@ -165,9 +162,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 
 	@Override
 	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
-		if (localJobManagerAddressFuture.isCompleted()) {
+		if (localJobManagerAddressFuture.isDone()) {
 			if (localJobManagerAddress == null) {
-				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
+				localJobManagerAddress = localJobManagerAddressFuture.get();
 			}
 
 			final HttpRequest request = routed.request();
@@ -183,12 +180,12 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 				requestPath = "";
 			}
 
-			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
+			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
 
-			if (jobManager.isDefined()) {
+			if (optJobManagerGateway.isPresent()) {
 				// Redirect to leader if necessary
 				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
-					localJobManagerAddress, jobManager.get());
+					localJobManagerAddress, optJobManagerGateway.get(), timeout);
 
 				if (redirectAddress != null) {
 					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index d6c17af..89108db 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.NotFoundException;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Base class for request handlers whose response depends on an ExecutionGraph
@@ -35,11 +38,11 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
 	private final ExecutionGraphHolder executionGraphHolder;
 
 	public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder) {
-		this.executionGraphHolder = executionGraphHolder;
+		this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder);
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		String jidString = pathParams.get("jobid");
 		if (jidString == null) {
 			throw new RuntimeException("JobId parameter missing");
@@ -53,12 +56,17 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
 			throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage());
 		}
 
-		AccessExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager);
-		if (eg == null) {
-			throw new NotFoundException("Could not find job with id " + jid);
+		final Optional<AccessExecutionGraph> optGraph;
+
+		try {
+			optGraph = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
+		} catch (Exception e) {
+			throw new FlinkException("Could not retrieve ExecutionGraph for job with jobId " + jid + " from the JobManager.", e);
 		}
 
-		return handleRequest(eg, pathParams);
+		final AccessExecutionGraph graph = optGraph.orElseThrow(() -> new NotFoundException("Could not find job with jobId " + jid + '.'));
+
+		return handleRequest(graph, pathParams);
 	}
 
 	public abstract String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
index 2b4a45f..266ffb0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -38,8 +38,8 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
 	private static final Charset ENCODING = Charset.forName("UTF-8");
 
 	@Override
-	public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
-		String result = handleJsonRequest(pathParams, queryParams, jobManager);
+	public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
+		String result = handleJsonRequest(pathParams, queryParams, jobManagerGateway);
 		byte[] bytes = result.getBytes(ENCODING);
 
 		DefaultFullHttpResponse response = new DefaultFullHttpResponse(
@@ -57,7 +57,7 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
 	 *
 	 * @param pathParams The map of REST path parameters, decoded by the router.
 	 * @param queryParams The map of query parameters.
-	 * @param jobManager The JobManager actor.
+	 * @param jobManagerGateway to communicate with the JobManager.
 	 *
 	 * @return The JSON string that is the HTTP response.
 	 *
@@ -69,6 +69,6 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
 	public abstract String handleJsonRequest(
 			Map<String, String> pathParams,
 			Map<String, String> queryParams,
-			ActorGateway jobManager) throws Exception;
+			JobManagerGateway jobManagerGateway) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index 816ef24..4ebc4e7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
@@ -27,10 +27,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.StringWriter;
 import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -46,9 +44,9 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 
 	private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId;
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
-	public ClusterOverviewHandler(FiniteDuration timeout) {
+	public ClusterOverviewHandler(Time timeout) {
 		this.timeout = checkNotNull(timeout);
 	}
 
@@ -58,12 +56,13 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		// we need no parameters, get all requests
 		try {
-			if (jobManager != null) {
-				Future<Object> future = jobManager.ask(RequestStatusOverview.getInstance(), timeout);
-				StatusOverview overview = (StatusOverview) Await.result(future, timeout);
+			if (jobManagerGateway != null) {
+				CompletableFuture<StatusOverview> overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
+
+				StatusOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 				StringWriter writer = new StringWriter();
 				JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 9d0b863..778a300 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -19,18 +19,16 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.StringWriter;
 import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.Objects.requireNonNull;
 
@@ -43,9 +41,9 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 
 	private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
-	public CurrentJobIdsHandler(FiniteDuration timeout) {
+	public CurrentJobIdsHandler(Time timeout) {
 		this.timeout = requireNonNull(timeout);
 	}
 
@@ -55,12 +53,12 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		// we need no parameters, get all requests
 		try {
-			if (jobManager != null) {
-				Future<Object> future = jobManager.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
-				JobsWithIDsOverview overview = (JobsWithIDsOverview) Await.result(future, timeout);
+			if (jobManagerGateway != null) {
+				CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+				JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 				StringWriter writer = new StringWriter();
 				JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index d0518c8..b324426 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -35,10 +35,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -51,13 +49,13 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 	private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running";
 	private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed";
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
 	private final boolean includeRunningJobs;
 	private final boolean includeFinishedJobs;
 
 	public CurrentJobsOverviewHandler(
-			FiniteDuration timeout,
+			Time timeout,
 			boolean includeRunningJobs,
 			boolean includeFinishedJobs) {
 
@@ -79,13 +77,11 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		try {
-			if (jobManager != null) {
-				Future<Object> future = jobManager.ask(
-						new RequestJobDetails(includeRunningJobs, includeFinishedJobs), timeout);
-
-				MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout);
+			if (jobManagerGateway != null) {
+				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
+				MultipleJobsDetails result = jobDetailsFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 				final long now = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index 312c890..fe1d06b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -55,7 +55,7 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		return this.configString;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index 9fbafb8..e27d125 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -30,13 +31,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import scala.Tuple2;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -49,35 +45,26 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class HandlerRedirectUtils {
 
-	private static final Logger LOG = LoggerFactory.getLogger(HandlerRedirectUtils.class);
-
-	/** Pattern to extract the host from an remote Akka URL. */
-	private static final Pattern LeaderAddressHostPattern = Pattern.compile("^.+@(.+):([0-9]+)/user/.+$");
-
 	public static String getRedirectAddress(
 			String localJobManagerAddress,
-			Tuple2<ActorGateway, Integer> leader) throws Exception {
+			JobManagerGateway jobManagerGateway,
+			Time timeout) throws Exception {
 
-		final String leaderAddress = leader._1().path();
-		final int webMonitorPort = leader._2();
+		final String leaderAddress = jobManagerGateway.getAddress();
 
 		final String jobManagerName = localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1);
 
 		if (!localJobManagerAddress.equals(leaderAddress) &&
 			!leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) {
 			// We are not the leader and need to redirect
-			Matcher matcher = LeaderAddressHostPattern.matcher(leaderAddress);
-
-			if (matcher.matches()) {
-				String redirectAddress = String.format("%s:%d", matcher.group(1), webMonitorPort);
-				return redirectAddress;
-			}
-			else {
-				LOG.warn("Unexpected leader address pattern {}. Cannot extract host.", leaderAddress);
-			}
-		}
+			final String hostname = jobManagerGateway.getHostname();
 
-		return null;
+			final CompletableFuture<Integer> webMonitorPortFuture = jobManagerGateway.requestWebPort(timeout);
+			final int webMonitorPort = webMonitorPortFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+			return String.format("%s:%d", hostname, webMonitorPort);
+		} else {
+			return null;
+		}
 	}
 
 	public static HttpResponse getRedirectResponse(String redirectAddress, String path, boolean httpsEnabled) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index 4a21fec..db55169 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import java.util.Map;
 
@@ -42,7 +42,7 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		return ERROR_MESSAGE;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 2572a76..73771bd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -46,7 +46,7 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		final String file = pathParams.get("jarid");
 		try {
 			File[] list = jarDir.listFiles(new FilenameFilter() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 4dd20b1..4f9b188 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -51,7 +51,7 @@ public class JarListHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		try {
 			StringWriter writer = new StringWriter();
 			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 1b25e7f..b239160 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -45,7 +45,7 @@ public class JarPlanHandler extends JarActionHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		try {
 			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
 			JobGraph graph = getJobGraphAndClassLoader(config).f0;

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 282fea8..12ffa4f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -34,8 +34,6 @@ import java.io.File;
 import java.io.StringWriter;
 import java.util.Map;
 
-import scala.concurrent.duration.FiniteDuration;
-
 /**
  * This handler handles requests to fetch plan for a jar.
  */
@@ -43,13 +41,13 @@ public class JarRunHandler extends JarActionHandler {
 
 	static final String JAR_RUN_REST_PATH = "/jars/:jarid/run";
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 	private final Configuration clientConfig;
 
-	public JarRunHandler(File jarDirectory, FiniteDuration timeout, Configuration clientConfig) {
+	public JarRunHandler(File jarDirectory, Time timeout, Configuration clientConfig) {
 		super(jarDirectory);
-		this.timeout = timeout;
-		this.clientConfig = clientConfig;
+		this.timeout = Preconditions.checkNotNull(timeout);
+		this.clientConfig = Preconditions.checkNotNull(clientConfig);
 	}
 
 	@Override
@@ -58,17 +56,17 @@ public class JarRunHandler extends JarActionHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		try {
 			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
 			Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
 
 			try {
 				JobClient.submitJobDetached(
-					new AkkaJobManagerGateway(jobManager),
+					jobManagerGateway,
 					clientConfig,
 					graph.f0,
-					Time.milliseconds(timeout.toMillis()),
+					timeout,
 					graph.f1);
 			} catch (JobExecutionException e) {
 				throw new ProgramInvocationException("Failed to submit the job to the job manager", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 745a110..705c321 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import java.io.File;
 import java.util.Map;
@@ -44,9 +44,9 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
 
 	@Override
 	public String handleJsonRequest(
-				Map<String, String> pathParams,
-				Map<String, String> queryParams,
-				ActorGateway jobManager) throws Exception {
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			JobManagerGateway jobManagerGateway) throws Exception {
 
 		String tempFilePath = queryParams.get("filepath");
 		String filename = queryParams.get("filename");

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index d9de7d7..513dc08 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -19,8 +19,9 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
@@ -33,17 +34,23 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
 	private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel";
 	private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel";
 
+	private final Time timeout;
+
+	public JobCancellationHandler(Time timeout) {
+		this.timeout = Preconditions.checkNotNull(timeout);
+	}
+
 	@Override
 	public String[] getPaths() {
 		return new String[]{JOB_CONCELLATION_REST_PATH, JOB_CONCELLATION_YARN_REST_PATH};
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 		try {
-			JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-			if (jobManager != null) {
-				jobManager.tell(new JobManagerMessages.CancelJob(jobid));
+			JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+			if (jobManagerGateway != null) {
+				jobManagerGateway.cancelJob(jobId, timeout);
 				return "{}";
 			}
 			else {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index 7dd4a52..9b474aa 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -19,16 +19,17 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.NotFoundException;
+import org.apache.flink.util.FlinkException;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -37,7 +38,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
 
-import akka.dispatch.OnComplete;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import javax.annotation.Nullable;
@@ -48,10 +48,9 @@ import java.nio.charset.Charset;
 import java.util.ArrayDeque;
 import java.util.HashMap;
 import java.util.Map;
-
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -92,16 +91,16 @@ public class JobCancellationWithSavepointHandlers {
 
 	public JobCancellationWithSavepointHandlers(
 			ExecutionGraphHolder currentGraphs,
-			ExecutionContext executionContext) {
-		this(currentGraphs, executionContext, null);
+			Executor executor) {
+		this(currentGraphs, executor, null);
 	}
 
 	public JobCancellationWithSavepointHandlers(
 			ExecutionGraphHolder currentGraphs,
-			ExecutionContext executionContext,
+			Executor executor,
 			@Nullable String defaultSavepointDirectory) {
 
-		this.triggerHandler = new TriggerHandler(currentGraphs, executionContext);
+		this.triggerHandler = new TriggerHandler(currentGraphs, executor);
 		this.inProgressHandler = new InProgressHandler();
 		this.defaultSavepointDirectory = defaultSavepointDirectory;
 	}
@@ -127,11 +126,11 @@ public class JobCancellationWithSavepointHandlers {
 		private final ExecutionGraphHolder currentGraphs;
 
 		/** Execution context for futures. */
-		private final ExecutionContext executionContext;
+		private final Executor executor;
 
-		public TriggerHandler(ExecutionGraphHolder currentGraphs, ExecutionContext executionContext) {
+		public TriggerHandler(ExecutionGraphHolder currentGraphs, Executor executor) {
 			this.currentGraphs = checkNotNull(currentGraphs);
-			this.executionContext = checkNotNull(executionContext);
+			this.executor = checkNotNull(executor);
 		}
 
 		@Override
@@ -144,35 +143,40 @@ public class JobCancellationWithSavepointHandlers {
 		public FullHttpResponse handleRequest(
 				Map<String, String> pathParams,
 				Map<String, String> queryParams,
-				ActorGateway jobManager) throws Exception {
+				JobManagerGateway jobManagerGateway) throws Exception {
 
 			try {
-				if (jobManager != null) {
+				if (jobManagerGateway != null) {
 					JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+					final Optional<AccessExecutionGraph> optGraph;
 
-					AccessExecutionGraph graph = currentGraphs.getExecutionGraph(jobId, jobManager);
-					if (graph == null) {
-						throw new Exception("Cannot find ExecutionGraph for job.");
-					} else {
-						CheckpointCoordinator coord = graph.getCheckpointCoordinator();
-						if (coord == null) {
-							throw new Exception("Cannot find CheckpointCoordinator for job.");
-						}
+					try {
+						optGraph = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
+					} catch (Exception e) {
+						throw new FlinkException("Could not retrieve the execution with jobId " + jobId + " from the JobManager.", e);
+					}
 
-						String targetDirectory = pathParams.get("targetDirectory");
-						if (targetDirectory == null) {
-							if (defaultSavepointDirectory == null) {
-								throw new IllegalStateException("No savepoint directory configured. " +
-										"You can either specify a directory when triggering this savepoint or " +
-										"configure a cluster-wide default via key '" +
-										CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
-							} else {
-								targetDirectory = defaultSavepointDirectory;
-							}
-						}
+					final AccessExecutionGraph graph = optGraph.orElseThrow(
+						() -> new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'));
 
-						return handleNewRequest(jobManager, jobId, targetDirectory, coord.getCheckpointTimeout());
+					CheckpointCoordinator coord = graph.getCheckpointCoordinator();
+					if (coord == null) {
+						throw new Exception("Cannot find CheckpointCoordinator for job.");
 					}
+
+					String targetDirectory = pathParams.get("targetDirectory");
+					if (targetDirectory == null) {
+						if (defaultSavepointDirectory == null) {
+							throw new IllegalStateException("No savepoint directory configured. " +
+									"You can either specify a directory when triggering this savepoint or " +
+									"configure a cluster-wide default via key '" +
+									CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
+						} else {
+							targetDirectory = defaultSavepointDirectory;
+						}
+					}
+
+					return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
 				} else {
 					throw new Exception("No connection to the leading JobManager.");
 				}
@@ -182,7 +186,7 @@ public class JobCancellationWithSavepointHandlers {
 		}
 
 		@SuppressWarnings("unchecked")
-		private FullHttpResponse handleNewRequest(ActorGateway jobManager, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException {
+		private FullHttpResponse handleNewRequest(JobManagerGateway jobManagerGateway, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException {
 			// Check whether a request exists
 			final long requestId;
 			final boolean isNewRequest;
@@ -202,35 +206,21 @@ public class JobCancellationWithSavepointHandlers {
 
 				try {
 					// Trigger cancellation
-					Object msg = new CancelJobWithSavepoint(jobId, targetDirectory);
-					Future<Object> cancelFuture = jobManager
-							.ask(msg, FiniteDuration.apply(checkpointTimeout, "ms"));
-
-					cancelFuture.onComplete(new OnComplete<Object>() {
-						@Override
-						public void onComplete(Throwable failure, Object resp) throws Throwable {
-							synchronized (lock) {
-								try {
-									if (resp != null) {
-										if (resp.getClass() == CancellationSuccess.class) {
-											String path = ((CancellationSuccess) resp).savepointPath();
-											completed.put(requestId, path);
-										} else if (resp.getClass() == CancellationFailure.class) {
-											Throwable cause = ((CancellationFailure) resp).cause();
-											completed.put(requestId, cause);
-										} else {
-											Throwable cause = new IllegalStateException("Unexpected CancellationResponse of type " + resp.getClass());
-											completed.put(requestId, cause);
-										}
-									} else {
-										completed.put(requestId, failure);
-									}
-								} finally {
-									inProgress.remove(jobId);
+					CompletableFuture<String> cancelJobFuture = jobManagerGateway
+						.cancelJobWithSavepoint(jobId, targetDirectory, Time.milliseconds(checkpointTimeout));
+
+					cancelJobFuture.whenCompleteAsync(
+						(String path, Throwable throwable) -> {
+							try {
+								if (throwable != null) {
+									completed.put(requestId, throwable);
+								} else {
+									completed.put(requestId, path);
 								}
+							} finally {
+								inProgress.remove(jobId);
 							}
-						}
-					}, executionContext);
+						}, executor);
 
 					success = true;
 				} finally {
@@ -298,9 +288,9 @@ public class JobCancellationWithSavepointHandlers {
 
 		@Override
 		@SuppressWarnings("unchecked")
-		public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+		public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
 			try {
-				if (jobManager != null) {
+				if (jobManagerGateway != null) {
 					JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
 					long requestId = Long.parseLong(pathParams.get("requestId"));