You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/18 20:59:40 UTC

flink git commit: [FLINK-7458] Generalize GatewayRetriever for WebRuntimeMonitor

Repository: flink
Updated Branches:
  refs/heads/master 1269f75ee -> 6ad0d3519


[FLINK-7458] Generalize GatewayRetriever for WebRuntimeMonitor

Introduce a generalized GatewayRetriever replacing the JobManagerRetriever. The
GatewayRetriever fulfills the same purpose as the JobManagerRetriever with the
ability to retrieve the gateway for an arbitrary endpoint type.

This closes #4549.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ad0d351
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ad0d351
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ad0d351

Branch: refs/heads/master
Commit: 6ad0d3519585a825499245493f4bc77dac77d439
Parents: 1269f75
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Aug 15 12:00:58 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 18 15:15:30 2017 +0200

----------------------------------------------------------------------
 .../webmonitor/RuntimeMonitorHandler.java       |  27 +---
 .../webmonitor/RuntimeMonitorHandlerBase.java   |  50 +++++--
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  13 +-
 .../files/StaticFileServerHandler.java          |  60 +++++---
 .../handlers/HandlerRedirectUtils.java          |  54 ++++++--
 .../handlers/TaskManagerLogHandler.java         |   4 +-
 .../webmonitor/metrics/MetricFetcher.java       |   8 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |   5 +-
 .../handlers/HandlerRedirectUtilsTest.java      |  15 +-
 .../handlers/TaskManagerLogHandlerTest.java     |  10 +-
 .../metrics/AbstractMetricsHandlerTest.java     |   8 +-
 .../metrics/JobManagerMetricsHandlerTest.java   |   6 +-
 .../metrics/JobMetricsHandlerTest.java          |   6 +-
 .../metrics/JobVertexMetricsHandlerTest.java    |   6 +-
 .../webmonitor/metrics/MetricFetcherTest.java   |   5 +-
 .../metrics/TaskManagerMetricsHandlerTest.java  |   6 +-
 .../runtime/akka/AkkaJobManagerGateway.java     |   6 +
 .../clusterframework/BootstrapTools.java        |   5 +-
 .../runtime/jobmaster/JobManagerGateway.java    |   4 +-
 .../runtime/webmonitor/RestfulGateway.java      |  42 ++++++
 .../runtime/webmonitor/WebMonitorUtils.java     |   7 +-
 .../webmonitor/retriever/GatewayRetriever.java  |  61 +++++++++
 .../retriever/JobManagerRetriever.java          | 123 -----------------
 .../retriever/LeaderGatewayRetriever.java       |  55 ++++++++
 .../webmonitor/retriever/LeaderRetriever.java   | 112 +++++++++++++++
 .../retriever/impl/AkkaJobManagerRetriever.java |  35 ++---
 .../retriever/impl/RpcGatewayRetriever.java     |  50 +++++++
 .../retriever/impl/RpcJobManagerRetriever.java  |  46 -------
 .../impl/AkkaJobManagerRetrieverTest.java       | 105 ++++++++++++++
 .../retriever/impl/RpcGatewayRetrieverTest.java | 136 +++++++++++++++++++
 30 files changed, 771 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 6305537..d3fc177 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -19,22 +19,18 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 
@@ -43,12 +39,12 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.net.URLDecoder;
-import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils.ENCODING;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -62,8 +58,6 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RuntimeMonitorHandler.class);
 
-	private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
-
 	public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address";
 
 	private final RequestHandler handler;
@@ -73,7 +67,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 	public RuntimeMonitorHandler(
 			WebMonitorConfig cfg,
 			RequestHandler handler,
-			JobManagerRetriever retriever,
+			GatewayRetriever<JobManagerGateway> retriever,
 			CompletableFuture<String> localJobManagerAddressFuture,
 			Time timeout,
 			boolean httpsEnabled) {
@@ -124,18 +118,9 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 
 					if (optNotFound.isPresent()) {
 						// this should result in a 404 error code (not found)
-						Throwable e = optNotFound.get();
-						ByteBuf message = e.getMessage() == null ? Unpooled.buffer(0)
-							: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
-						finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
-						finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
-						finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
+						finalResponse = HandlerRedirectUtils.getResponse(HttpResponseStatus.NOT_FOUND, optNotFound.get().getMessage());
 					} else {
-						byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING);
-						finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
-							HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
-						finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
-						finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
+						finalResponse = HandlerRedirectUtils.getErrorResponse(throwable);
 					}
 				} else {
 					finalResponse = httpResponse;

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
index 4cb55f1..4f4facf 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -31,6 +31,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -46,7 +49,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @ChannelHandler.Sharable
 public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHandler<Routed> {
 
-	private final JobManagerRetriever retriever;
+	private final Logger logger = LoggerFactory.getLogger(getClass());
+
+	private final GatewayRetriever<JobManagerGateway> retriever;
 
 	protected final CompletableFuture<String> localJobManagerAddressFuture;
 
@@ -58,7 +63,7 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
 	protected String localJobManagerAddress;
 
 	public RuntimeMonitorHandlerBase(
-		JobManagerRetriever retriever,
+		GatewayRetriever<JobManagerGateway> retriever,
 		CompletableFuture<String> localJobManagerAddressFuture,
 		Time timeout,
 		boolean httpsEnabled) {
@@ -83,19 +88,38 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
 				localJobManagerAddress = localJobManagerAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			}
 
-			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
+			Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();
 
 			if (optJobManagerGateway.isPresent()) {
 				JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
-				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
-					localJobManagerAddress, jobManagerGateway, timeout);
-
-				if (redirectAddress != null) {
-					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path(),
-						httpsEnabled);
-					KeepAliveWrite.flush(ctx, routed.request(), redirect);
-				}
-				else {
+				Optional<CompletableFuture<String>> optRedirectAddress = HandlerRedirectUtils.getRedirectAddress(
+					localJobManagerAddress,
+					jobManagerGateway,
+					timeout);
+
+				if (optRedirectAddress.isPresent()) {
+					optRedirectAddress.get().whenComplete(
+						(String redirectAddress, Throwable throwable) -> {
+							HttpResponse response;
+
+							if (throwable != null) {
+								logger.error("Could not retrieve the redirect address.", throwable);
+								response = HandlerRedirectUtils.getErrorResponse(throwable);
+							} else {
+								try {
+									response = HandlerRedirectUtils.getRedirectResponse(
+										redirectAddress,
+										routed.path(),
+										httpsEnabled);
+								} catch (Exception e) {
+									logger.error("Could not create the redirect response.", e);
+									response = HandlerRedirectUtils.getErrorResponse(e);
+								}
+							}
+
+							KeepAliveWrite.flush(ctx, routed.request(), response);
+						});
+				} else {
 					respondAsLeader(ctx, routed, jobManagerGateway);
 				}
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index e74541e..0cdab9c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
@@ -70,7 +71,7 @@ import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
@@ -117,7 +118,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 	private final LeaderRetrievalService leaderRetrievalService;
 
 	/** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */
-	private final JobManagerRetriever retriever;
+	private final LeaderGatewayRetriever<JobManagerGateway> retriever;
 
 	private final SSLContext serverSSLContext;
 
@@ -146,7 +147,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 			Configuration config,
 			LeaderRetrievalService leaderRetrievalService,
 			BlobView blobView,
-			JobManagerRetriever jobManagerRetriever,
+			LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever,
 			MetricQueryServiceRetriever queryServiceRetriever,
 			Time timeout,
 			Executor executor) throws IOException, InterruptedException {
@@ -292,7 +293,11 @@ public class WebRuntimeMonitor implements WebMonitor {
 		router
 			// log and stdout
 			.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
-				new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.logFile,
+				new StaticFileServerHandler(
+					retriever,
+					jobManagerAddressFuture,
+					timeout,
+					logFiles.logFile,
 					enableSSL))
 
 			.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index 15acb00..1564064 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -27,9 +27,9 @@ package org.apache.flink.runtime.webmonitor.files;
  *****************************************************************************/
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -97,7 +97,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * example.</p>
  */
 @ChannelHandler.Sharable
-public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> {
+public class StaticFileServerHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
 
 	/** Default logger, if none is specified. */
 	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
@@ -113,7 +113,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 
 	// ------------------------------------------------------------------------
 
-	private final JobManagerRetriever retriever;
+	private final GatewayRetriever<T> retriever;
 
 	private final CompletableFuture<String> localJobManagerAddressFuture;
 
@@ -131,7 +131,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 	private String localJobManagerAddress;
 
 	public StaticFileServerHandler(
-			JobManagerRetriever retriever,
+			GatewayRetriever<T> retriever,
 			CompletableFuture<String> localJobManagerAddressPromise,
 			Time timeout,
 			File rootPath,
@@ -141,7 +141,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 	}
 
 	public StaticFileServerHandler(
-			JobManagerRetriever retriever,
+			GatewayRetriever<T> retriever,
 			CompletableFuture<String> localJobManagerAddressFuture,
 			Time timeout,
 			File rootPath,
@@ -168,31 +168,47 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 			}
 
 			final HttpRequest request = routed.request();
-			String requestPath = routed.path();
+			final String requestPath;
 
 			// make sure we request the "index.html" in case there is a directory request
-			if (requestPath.endsWith("/")) {
-				requestPath = requestPath + "index.html";
+			if (routed.path().endsWith("/")) {
+				requestPath = routed.path() + "index.html";
 			}
-
 			// in case the files being accessed are logs or stdout files, find appropriate paths.
-			if (requestPath.equals("/jobmanager/log") || requestPath.equals("/jobmanager/stdout")) {
+			else if (routed.path().equals("/jobmanager/log") || routed.path().equals("/jobmanager/stdout")) {
 				requestPath = "";
+			} else {
+				requestPath = routed.path();
 			}
 
-			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
+			Optional<T> optLeader = retriever.getNow();
 
-			if (optJobManagerGateway.isPresent()) {
+			if (optLeader.isPresent()) {
 				// Redirect to leader if necessary
-				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
-					localJobManagerAddress, optJobManagerGateway.get(), timeout);
-
-				if (redirectAddress != null) {
-					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(
-						redirectAddress, requestPath, httpsEnabled);
-					KeepAliveWrite.flush(ctx, routed.request(), redirect);
-				}
-				else {
+				Optional<CompletableFuture<String>> optRedirectAddress = HandlerRedirectUtils.getRedirectAddress(
+					localJobManagerAddress,
+					optLeader.get(),
+					timeout);
+
+				if (optRedirectAddress.isPresent()) {
+					optRedirectAddress.get().whenComplete(
+						(String address, Throwable throwable) -> {
+							if (throwable != null) {
+								logger.error("Failed to obtain redirect address.", throwable);
+								sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+							} else {
+								try {
+									HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(
+										address, requestPath, httpsEnabled);
+
+									KeepAliveWrite.flush(ctx, routed.request(), redirect);
+								} catch (Exception e) {
+									logger.error("Failed to send redirect response.", e);
+									sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+								}
+							}
+						});
+				} else {
 					respondAsLeader(ctx, request, requestPath);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index e27d125..642d2f4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -21,18 +21,24 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
+import org.apache.flink.util.ExceptionUtils;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
 
+import javax.annotation.Nullable;
+
+import java.nio.charset.Charset;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -45,25 +51,24 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class HandlerRedirectUtils {
 
-	public static String getRedirectAddress(
+	public static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
+
+	public static Optional<CompletableFuture<String>> getRedirectAddress(
 			String localJobManagerAddress,
-			JobManagerGateway jobManagerGateway,
+			RestfulGateway restfulGateway,
 			Time timeout) throws Exception {
 
-		final String leaderAddress = jobManagerGateway.getAddress();
+		final String leaderAddress = restfulGateway.getAddress();
 
 		final String jobManagerName = localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1);
 
 		if (!localJobManagerAddress.equals(leaderAddress) &&
 			!leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) {
-			// We are not the leader and need to redirect
-			final String hostname = jobManagerGateway.getHostname();
 
-			final CompletableFuture<Integer> webMonitorPortFuture = jobManagerGateway.requestWebPort(timeout);
-			final int webMonitorPort = webMonitorPortFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-			return String.format("%s:%d", hostname, webMonitorPort);
+			return Optional.of(restfulGateway.requestRestAddress(timeout));
+
 		} else {
-			return null;
+			return Optional.empty();
 		}
 	}
 
@@ -94,4 +99,31 @@ public class HandlerRedirectUtils {
 
 		return unavailableResponse;
 	}
+
+	public static FullHttpResponse getResponse(HttpResponseStatus status, @Nullable String message) {
+		ByteBuf messageByteBuf = message == null ? Unpooled.buffer(0)
+			: Unpooled.wrappedBuffer(message.getBytes(ENCODING));
+		FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, messageByteBuf);
+		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+		return response;
+	}
+
+	public static FullHttpResponse getErrorResponse(Throwable throwable) {
+		return getErrorResponse(throwable, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+	}
+
+	public static FullHttpResponse getErrorResponse(Throwable throwable, HttpResponseStatus status) {
+		byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING);
+		FullHttpResponse response = new DefaultFullHttpResponse(
+			HttpVersion.HTTP_1_1,
+			status,
+			Unpooled.wrappedBuffer(bytes));
+
+		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+		return response;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index f175573..b382b4c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -37,7 +37,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -120,7 +120,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 	}
 
 	public TaskManagerLogHandler(
-		JobManagerRetriever retriever,
+		GatewayRetriever<JobManagerGateway> retriever,
 		Executor executor,
 		CompletableFuture<String> localJobManagerAddressPromise,
 		Time timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 3fe4d12..a5f4ca5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.Preconditions;
@@ -53,7 +53,7 @@ import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr
 public class MetricFetcher {
 	private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
 
-	private final JobManagerRetriever retriever;
+	private final GatewayRetriever<JobManagerGateway> retriever;
 	private final MetricQueryServiceRetriever queryServiceRetriever;
 	private final Executor executor;
 	private final Time timeout;
@@ -64,7 +64,7 @@ public class MetricFetcher {
 	private long lastUpdateTime;
 
 	public MetricFetcher(
-			JobManagerRetriever retriever,
+			GatewayRetriever<JobManagerGateway> retriever,
 			MetricQueryServiceRetriever queryServiceRetriever,
 			Executor executor,
 			Time timeout) {
@@ -98,7 +98,7 @@ public class MetricFetcher {
 
 	private void fetchMetrics() {
 		try {
-			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
+			Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();
 			if (optJobManagerGateway.isPresent()) {
 				final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 5829d1c..10b5ced 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
@@ -509,11 +510,11 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 
 	private void waitForLeaderNotification(
 			String expectedJobManagerURL,
-			AkkaJobManagerRetriever retriever,
+			GatewayRetriever<JobManagerGateway> retriever,
 			Deadline deadline) throws Exception {
 
 		while (deadline.hasTimeLeft()) {
-			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
+			Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();
 
 			if (optJobManagerGateway.isPresent() && Objects.equals(expectedJobManagerURL, optJobManagerGateway.get().getAddress())) {
 				return;

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
index ac8d934..a8562b3 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
@@ -19,12 +19,14 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Matchers.any;
@@ -47,26 +49,29 @@ public class HandlerRedirectUtilsTest extends TestLogger {
 		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
 		when(jobManagerGateway.getAddress()).thenReturn("akka://flink/user/foobar");
 
-		String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
+		Optional<CompletableFuture<String>> redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
 			localJobManagerAddress,
 			jobManagerGateway,
 			Time.seconds(3L));
 
-		Assert.assertNull(redirectingAddress);
+		Assert.assertFalse(redirectingAddress.isPresent());
 	}
 
 	@Test
 	public void testGetRedirectAddressWithRemoteAkkaPath() throws Exception {
-		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
+		JobManagerGateway jobManagerGateway = mock(AkkaJobManagerGateway.class);
 		when(jobManagerGateway.getAddress()).thenReturn(remotePath);
 		when(jobManagerGateway.getHostname()).thenReturn(remoteHostname);
 		when(jobManagerGateway.requestWebPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(webPort));
+		when(jobManagerGateway.requestRestAddress(any(Time.class))).thenCallRealMethod();
 
-		String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
+		Optional<CompletableFuture<String>> optRedirectingAddress = HandlerRedirectUtils.getRedirectAddress(
 			localJobManagerAddress,
 			jobManagerGateway,
 			Time.seconds(3L));
 
-		Assert.assertEquals(remoteURL, redirectingAddress);
+		Assert.assertTrue(optRedirectingAddress.isPresent());
+
+		Assert.assertEquals(remoteURL, optRedirectingAddress.get().get());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
index cf59f05..c11fe6a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -63,7 +63,7 @@ public class TaskManagerLogHandlerTest {
 	@Test
 	public void testGetPaths() {
 		TaskManagerLogHandler handlerLog = new TaskManagerLogHandler(
-			mock(JobManagerRetriever.class),
+			mock(GatewayRetriever.class),
 			Executors.directExecutor(),
 			CompletableFuture.completedFuture("/jm/address"),
 			TestingUtils.TIMEOUT(),
@@ -76,7 +76,7 @@ public class TaskManagerLogHandlerTest {
 		Assert.assertEquals("/taskmanagers/:taskmanagerid/log", pathsLog[0]);
 
 		TaskManagerLogHandler handlerOut = new TaskManagerLogHandler(
-			mock(JobManagerRetriever.class),
+			mock(GatewayRetriever.class),
 			Executors.directExecutor(),
 			CompletableFuture.completedFuture("/jm/address"),
 			TestingUtils.TIMEOUT(),
@@ -113,8 +113,8 @@ public class TaskManagerLogHandlerTest {
 		when(jobManagerGateway.requestTaskManagerInstance(any(InstanceID.class), any(Time.class))).thenReturn(
 			CompletableFuture.completedFuture(Optional.of(taskManager)));
 
-		JobManagerRetriever retriever = mock(JobManagerRetriever.class);
-		when(retriever.getJobManagerGatewayNow())
+		GatewayRetriever<JobManagerGateway> retriever = mock(GatewayRetriever.class);
+		when(retriever.getNow())
 			.thenReturn(Optional.of(jobManagerGateway));
 
 		TaskManagerLogHandler handler = new TaskManagerLogHandler(

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index 5296d33..0755888 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.metrics;
 
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
@@ -43,7 +43,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 	@Test
 	public void testHandleRequest() throws Exception {
 		MetricFetcher fetcher = new MetricFetcher(
-			mock(JobManagerRetriever.class),
+			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
 			TestingUtils.TIMEOUT());
@@ -96,7 +96,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 	@Test
 	public void testInvalidListDoesNotFail() {
 		MetricFetcher fetcher = new MetricFetcher(
-			mock(JobManagerRetriever.class),
+			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
 			TestingUtils.TIMEOUT());
@@ -126,7 +126,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 	@Test
 	public void testInvalidGetDoesNotFail() {
 		MetricFetcher fetcher = new MetricFetcher(
-			mock(JobManagerRetriever.class),
+			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
 			TestingUtils.TIMEOUT());

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
index b02949a..6d17b40 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.metrics;
 
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
@@ -49,7 +49,7 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
 	@Test
 	public void getMapFor() {
 		MetricFetcher fetcher = new MetricFetcher(
-			mock(JobManagerRetriever.class),
+			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
 			TestingUtils.TIMEOUT());
@@ -67,7 +67,7 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
 	@Test
 	public void getMapForNull() {
 		MetricFetcher fetcher = new MetricFetcher(
-			mock(JobManagerRetriever.class),
+			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
 			TestingUtils.TIMEOUT());

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
index 569f772..b26ceab 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.metrics;
 
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
@@ -50,7 +50,7 @@ public class JobMetricsHandlerTest extends TestLogger {
 	@Test
 	public void getMapFor() throws Exception {
 		MetricFetcher fetcher = new MetricFetcher(
-			mock(JobManagerRetriever.class),
+			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
 			TestingUtils.TIMEOUT());
@@ -69,7 +69,7 @@ public class JobMetricsHandlerTest extends TestLogger {
 	@Test
 	public void getMapForNull() {
 		MetricFetcher fetcher = new MetricFetcher(
-			mock(JobManagerRetriever.class),
+			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
 			TestingUtils.TIMEOUT());

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
index e6bbd2e..d637a4a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.metrics;
 
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
@@ -51,7 +51,7 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
 	@Test
 	public void getMapFor() throws Exception {
 		MetricFetcher fetcher = new MetricFetcher(
-			mock(JobManagerRetriever.class),
+			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
 			TestingUtils.TIMEOUT());
@@ -73,7 +73,7 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
 	@Test
 	public void getMapForNull() {
 		MetricFetcher fetcher = new MetricFetcher(
-			mock(JobManagerRetriever.class),
+			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
 			TestingUtils.TIMEOUT());

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index 4c91997..20c6373 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
@@ -96,8 +97,8 @@ public class MetricFetcherTest extends TestLogger {
 		when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
 		when(jobManagerGateway.requestWebPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(0));
 
-		AkkaJobManagerRetriever retriever = mock(AkkaJobManagerRetriever.class);
-		when(retriever.getJobManagerGatewayNow())
+		GatewayRetriever<JobManagerGateway> retriever = mock(AkkaJobManagerRetriever.class);
+		when(retriever.getNow())
 			.thenReturn(Optional.of(jobManagerGateway));
 
 		// ========= setup QueryServices ================================================================================

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
index c4c1c7a..9c5549e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.metrics;
 
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
@@ -50,7 +50,7 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
 	@Test
 	public void getMapFor() throws Exception {
 		MetricFetcher fetcher = new MetricFetcher(
-			mock(JobManagerRetriever.class),
+			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
 			TestingUtils.TIMEOUT());
@@ -69,7 +69,7 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
 	@Test
 	public void getMapForNull() {
 		MetricFetcher fetcher = new MetricFetcher(
-			mock(JobManagerRetriever.class),
+			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
 			TestingUtils.TIMEOUT());

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index bbc5889..b9a7a8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -267,4 +267,10 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 				.ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
 				.mapTo(ClassTag$.MODULE$.apply(JobsWithIDsOverview.class)));
 	}
+
+	@Override
+	public CompletableFuture<String> requestRestAddress(Time timeout) {
+		return requestWebPort(timeout).thenApply(
+			(Integer webPort) -> getHostname() + ':' + webPort);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index d24a3d0..bd3e304 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -33,9 +33,10 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.NetUtils;
 
@@ -186,7 +187,7 @@ public class BootstrapTools {
 	public static WebMonitor startWebMonitorIfConfigured(
 			Configuration config,
 			HighAvailabilityServices highAvailabilityServices,
-			JobManagerRetriever jobManagerRetriever,
+			LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever,
 			MetricQueryServiceRetriever queryServiceRetriever,
 			Time timeout,
 			Executor executor,

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index a4d0d11..f2aaf17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
-import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 
 import javax.annotation.Nullable;
 
@@ -44,7 +44,7 @@ import java.util.concurrent.CompletableFuture;
  * <p>This interface constitutes the operations an external component can
  * trigger on the JobManager.
  */
-public interface JobManagerGateway extends RpcGateway {
+public interface JobManagerGateway extends RestfulGateway {
 
 	/**
 	 * Requests the BlobServer port.

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
new file mode 100644
index 0000000..a5d52e5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Gateway for restful endpoints.
+ *
+ * <p>Gateways which implement this method run a REST endpoint which is reachable
+ * under the returned address.
+ */
+public interface RestfulGateway extends RpcGateway {
+
+	/**
+	 * Requests the REST address of this {@link RpcEndpoint}.
+	 *
+	 * @param timeout for this operation
+	 * @return Future REST endpoint address
+	 */
+	CompletableFuture<String> requestRestAddress(Time timeout);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 9493696..57996bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -29,10 +29,11 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -130,7 +131,7 @@ public final class WebMonitorUtils {
 	public static WebMonitor startWebRuntimeMonitor(
 			Configuration config,
 			HighAvailabilityServices highAvailabilityServices,
-			JobManagerRetriever jobManagerRetriever,
+			LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever,
 			MetricQueryServiceRetriever queryServiceRetriever,
 			Time timeout,
 			Executor executor) {
@@ -143,7 +144,7 @@ public final class WebMonitorUtils {
 				Configuration.class,
 				LeaderRetrievalService.class,
 				BlobView.class,
-				JobManagerRetriever.class,
+				LeaderGatewayRetriever.class,
 				MetricQueryServiceRetriever.class,
 				Time.class,
 				Executor.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
new file mode 100644
index 0000000..1771b72
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Generic retriever interface for {@link RpcGateway}.
+ *
+ * @param <T> type of the object to retrieve
+ */
+public interface GatewayRetriever<T extends RpcGateway> {
+
+	/**
+	 * Get future of object to retrieve.
+	 *
+	 * @return Future object to retrieve
+	 */
+	CompletableFuture<T> getFuture();
+
+	/**
+	 * Returns the currently retrieved object if there is such an object. Otherwise
+	 * it returns an empty optional.
+	 *
+	 * @return Optional object to retrieve
+	 * @throws Exception if the future has been completed with an exception
+	 */
+	default Optional<T> getNow() throws Exception {
+		CompletableFuture<T> leaderFuture = getFuture();
+		if (leaderFuture != null) {
+			CompletableFuture<T> currentLeaderFuture = leaderFuture;
+
+			if (currentLeaderFuture.isDone()) {
+				return Optional.of(currentLeaderFuture.get());
+			} else {
+				return Optional.empty();
+			}
+		} else {
+			return Optional.empty();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
deleted file mode 100644
index 2eade48..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.retriever;
-
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.util.FlinkException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Retrieves and stores the JobManagerGateway for the current leading JobManager.
- */
-public abstract class JobManagerRetriever implements LeaderRetrievalListener {
-
-	private final Logger log = LoggerFactory.getLogger(getClass());
-
-	// False if we have to create a new JobManagerGateway future when being notified
-	// about a new leader address
-	private final AtomicBoolean firstTimeUsage;
-
-	private volatile CompletableFuture<JobManagerGateway> jobManagerGatewayFuture;
-
-	public JobManagerRetriever() {
-		firstTimeUsage = new AtomicBoolean(true);
-		jobManagerGatewayFuture = new CompletableFuture<>();
-	}
-
-	/**
-	 * Returns the currently known leading job manager gateway and its web monitor port.
-	 */
-	public Optional<JobManagerGateway> getJobManagerGatewayNow() throws Exception {
-		if (jobManagerGatewayFuture != null) {
-			CompletableFuture<JobManagerGateway> jobManagerGatewayFuture = this.jobManagerGatewayFuture;
-
-			if (jobManagerGatewayFuture.isDone()) {
-				return Optional.of(jobManagerGatewayFuture.get());
-			} else {
-				return Optional.empty();
-			}
-		} else {
-			return Optional.empty();
-		}
-	}
-
-	/**
-	 * Returns the current JobManagerGateway future.
-	 */
-	public CompletableFuture<JobManagerGateway> getJobManagerGateway() throws Exception {
-		return jobManagerGatewayFuture;
-	}
-
-	@Override
-	public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
-		if (leaderAddress != null && !leaderAddress.equals("")) {
-			try {
-				final CompletableFuture<JobManagerGateway> newJobManagerGatewayFuture;
-
-				if (firstTimeUsage.compareAndSet(true, false)) {
-					newJobManagerGatewayFuture = jobManagerGatewayFuture;
-				} else {
-					newJobManagerGatewayFuture = new CompletableFuture<>();
-					jobManagerGatewayFuture = newJobManagerGatewayFuture;
-				}
-
-				log.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
-
-				createJobManagerGateway(leaderAddress, leaderSessionID).whenComplete(
-					(JobManagerGateway jobManagerGateway, Throwable throwable) -> {
-						if (throwable != null) {
-							newJobManagerGatewayFuture.completeExceptionally(new FlinkException("Could not retrieve" +
-								"the current job manager gateway.", throwable));
-						} else {
-							newJobManagerGatewayFuture.complete(jobManagerGateway);
-						}
-					}
-				);
-			}
-			catch (Exception e) {
-				handleError(e);
-			}
-		}
-	}
-
-	@Override
-	public void handleError(Exception exception) {
-		log.error("Received error from LeaderRetrievalService.", exception);
-
-		jobManagerGatewayFuture.completeExceptionally(exception);
-	}
-
-	/**
-	 * Create a JobManagerGateway for the given leader address and leader id.
-	 *
-	 * @param leaderAddress to connect against
-	 * @param leaderId the endpoint currently uses
-	 * @return Future containing the resolved JobManagerGateway
-	 * @throws Exception if the JobManagerGateway creation failed
-	 */
-	protected abstract CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
new file mode 100644
index 0000000..4e59859
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Retrieves and stores the leading {@link RpcGateway}.
+ *
+ * @param <T> type of the gateway to retrieve
+ */
+public abstract class LeaderGatewayRetriever<T extends RpcGateway> extends LeaderRetriever implements GatewayRetriever<T> {
+
+	private volatile CompletableFuture<T> gatewayFuture;
+
+	public LeaderGatewayRetriever() {
+		gatewayFuture = createGateway(getLeaderFuture());
+	}
+
+	@Override
+	public CompletableFuture<T> getFuture() {
+		return gatewayFuture;
+	}
+
+	@Override
+	public CompletableFuture<Tuple2<String, UUID>> createNewFuture() {
+		CompletableFuture<Tuple2<String, UUID>> newFuture = super.createNewFuture();
+
+		gatewayFuture = createGateway(newFuture);
+
+		return newFuture;
+	}
+
+	protected abstract CompletableFuture<T> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
new file mode 100644
index 0000000..fbfb507
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Retrieves and stores the current leader address.
+ */
+public class LeaderRetriever implements LeaderRetrievalListener {
+	protected final Logger log = LoggerFactory.getLogger(getClass());
+
+	// False if we have to create a new JobManagerGateway future when being notified
+	// about a new leader address
+	private final AtomicBoolean firstTimeUsage;
+
+	protected volatile CompletableFuture<Tuple2<String, UUID>> leaderFuture;
+
+	public LeaderRetriever() {
+		firstTimeUsage = new AtomicBoolean(true);
+		leaderFuture = new CompletableFuture<>();
+	}
+
+	/**
+	 * Returns the current leader information if available. Otherwise it returns an
+	 * empty optional.
+	 *
+	 * @return The current leader information if available. Otherwise it returns an
+	 * empty optional.
+	 * @throws Exception if the leader future has been completed with an exception
+	 */
+	public Optional<Tuple2<String, UUID>> getLeaderNow() throws Exception {
+		CompletableFuture<Tuple2<String, UUID>> leaderFuture = this.leaderFuture;
+		if (leaderFuture != null) {
+			CompletableFuture<Tuple2<String, UUID>> currentLeaderFuture = leaderFuture;
+
+			if (currentLeaderFuture.isDone()) {
+				return Optional.of(currentLeaderFuture.get());
+			} else {
+				return Optional.empty();
+			}
+		} else {
+			return Optional.empty();
+		}
+	}
+
+	/**
+	 * Returns the current JobManagerGateway future.
+	 */
+	public CompletableFuture<Tuple2<String, UUID>> getLeaderFuture() {
+		return leaderFuture;
+	}
+
+	@Override
+	public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
+		if (leaderAddress != null && !leaderAddress.equals("")) {
+			try {
+				final CompletableFuture<Tuple2<String, UUID>> newLeaderFuture;
+
+				if (firstTimeUsage.compareAndSet(true, false)) {
+					newLeaderFuture = leaderFuture;
+				} else {
+					newLeaderFuture = createNewFuture();
+					leaderFuture = newLeaderFuture;
+				}
+
+				log.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
+
+				newLeaderFuture.complete(Tuple2.of(leaderAddress, leaderSessionID));
+			}
+			catch (Exception e) {
+				handleError(e);
+			}
+		}
+	}
+
+	@Override
+	public void handleError(Exception exception) {
+		log.error("Received error from LeaderRetrievalService.", exception);
+
+		leaderFuture.completeExceptionally(exception);
+	}
+
+	protected CompletableFuture<Tuple2<String, UUID>> createNewFuture() {
+		return new CompletableFuture<>();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
index 027b42a..121387b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
@@ -19,13 +19,14 @@
 package org.apache.flink.runtime.webmonitor.retriever.impl;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.util.Preconditions;
 
 import akka.actor.ActorRef;
@@ -35,9 +36,9 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * {@link JobManagerRetriever} implementation for Akka based JobManagers.
+ * {@link LeaderGatewayRetriever} implementation for Akka based JobManagers.
  */
-public class AkkaJobManagerRetriever extends JobManagerRetriever {
+public class AkkaJobManagerRetriever extends LeaderGatewayRetriever<JobManagerGateway> {
 
 	private final ActorSystem actorSystem;
 	private final Time timeout;
@@ -51,19 +52,21 @@ public class AkkaJobManagerRetriever extends JobManagerRetriever {
 	}
 
 	@Override
-	protected CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception {
-		return FutureUtils.toJava(
-			AkkaUtils.getActorRefFuture(
-				leaderAddress,
-				actorSystem,
-				FutureUtils.toFiniteDuration(timeout)))
-			.thenApplyAsync(
-				(ActorRef jobManagerRef) -> {
-					ActorGateway leaderGateway = new AkkaActorGateway(
-						jobManagerRef, leaderId);
+	protected CompletableFuture<JobManagerGateway> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture) {
+		return leaderFuture.thenCompose(
+			(Tuple2<String, UUID> addressLeaderId) ->
+				FutureUtils.toJava(
+					AkkaUtils.getActorRefFuture(
+						addressLeaderId.f0,
+						actorSystem,
+						FutureUtils.toFiniteDuration(timeout)))
+					.thenApplyAsync(
+						(ActorRef jobManagerRef) -> {
+							ActorGateway leaderGateway = new AkkaActorGateway(
+								jobManagerRef, addressLeaderId.f1);
 
-					return new AkkaJobManagerGateway(leaderGateway);
-				},
-				actorSystem.dispatcher());
+							return new AkkaJobManagerGateway(leaderGateway);
+						}
+					));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetriever.java
new file mode 100644
index 0000000..86afc63
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetriever.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link LeaderGatewayRetriever} implementation using the {@link RpcService}.
+ *
+ * @param <T> type of the gateway to retrieve
+ */
+public class RpcGatewayRetriever<T extends RpcGateway> extends LeaderGatewayRetriever<T> {
+
+	private final RpcService rpcService;
+	private final Class<T> gatewayType;
+
+	public RpcGatewayRetriever(RpcService rpcService, Class<T> gatewayType) {
+		this.rpcService = Preconditions.checkNotNull(rpcService);
+		this.gatewayType = Preconditions.checkNotNull(gatewayType);
+	}
+
+	@Override
+	protected CompletableFuture<T> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture) {
+		return leaderFuture.thenCompose(
+			(Tuple2<String, UUID> addressLeaderTuple) -> rpcService.connect(addressLeaderTuple.f0, gatewayType));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
deleted file mode 100644
index e608aa0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.retriever.impl;
-
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
-import org.apache.flink.util.Preconditions;
-
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * JobManagerRetriever implementation for Flip-6 JobManager.
- */
-public class RpcJobManagerRetriever extends JobManagerRetriever {
-
-	private final RpcService rpcService;
-
-	public RpcJobManagerRetriever(
-		RpcService rpcService) {
-
-		this.rpcService = Preconditions.checkNotNull(rpcService);
-	}
-
-	@Override
-	protected CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception {
-		return rpcService.connect(leaderAddress, JobManagerGateway.class);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
new file mode 100644
index 0000000..d02f3ef
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorTest;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Test for the {@link AkkaJobManagerRetriever}.
+ */
+public class AkkaJobManagerRetrieverTest extends TestLogger {
+
+	private static final Time timeout = Time.seconds(10L);
+	private static ActorSystem actorSystem;
+
+	@BeforeClass
+	public static void setup() {
+		actorSystem = AkkaUtils.createDefaultActorSystem();
+	}
+
+	@AfterClass
+	public static void teardown() {
+		if (actorSystem != null) {
+			actorSystem.shutdown();
+			actorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout));
+
+			actorSystem = null;
+		}
+	}
+
+	/**
+	 * Tests that we can retrieve the current leading job manager.
+	 */
+	@Test
+	public void testAkkaJobManagerRetrieval() throws Exception {
+		AkkaJobManagerRetriever akkaJobManagerRetriever = new AkkaJobManagerRetriever(actorSystem, timeout);
+		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService();
+
+		CompletableFuture<JobManagerGateway> gatewayFuture = akkaJobManagerRetriever.getFuture();
+		final UUID leaderSessionId = UUID.randomUUID();
+
+		ActorRef actorRef = null;
+
+		try {
+			actorRef = actorSystem.actorOf(
+				Props.create(JobClientActorTest.PlainActor.class, leaderSessionId));
+
+			final String address = actorRef.path().toString();
+
+			testingLeaderRetrievalService.start(akkaJobManagerRetriever);
+
+			// check that the gateway future has not been completed since there is no leader yet
+			assertFalse(gatewayFuture.isDone());
+
+			testingLeaderRetrievalService.notifyListener(address, leaderSessionId);
+
+			JobManagerGateway jobManagerGateway = gatewayFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			assertEquals(address, jobManagerGateway.getAddress());
+		} finally {
+			testingLeaderRetrievalService.stop();
+
+			if (actorRef != null) {
+				TestingUtils.stopActorGracefully(actorRef);
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
new file mode 100644
index 0000000..1ca3918
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Tests for the {@link RpcGatewayRetriever}.
+ */
+public class RpcGatewayRetrieverTest extends TestLogger {
+
+	private static final Time TIMEOUT = Time.seconds(10L);
+	private static TestingRpcService rpcService;
+
+	@BeforeClass
+	public static void setup() {
+		rpcService = new TestingRpcService();
+	}
+
+	@AfterClass
+	public static void teardown() throws InterruptedException, ExecutionException, TimeoutException {
+		if (rpcService != null) {
+			rpcService.stopService();
+			rpcService.getTerminationFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			rpcService = null;
+		}
+	}
+
+	/**
+	 * Tests that the RpcGatewayRetriever can retrieve the specified gateway type from a leader retrieval service.
+	 */
+	@Test
+	public void testRpcGatewayRetrieval() throws Exception {
+		final String expectedValue = "foobar";
+		final String expectedValue2 = "barfoo";
+		final UUID leaderSessionId = UUID.randomUUID();
+
+		RpcGatewayRetriever<DummyGateway> gatewayRetriever = new RpcGatewayRetriever<>(rpcService, DummyGateway.class);
+		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService();
+		DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(rpcService, "dummyRpcEndpoint1", expectedValue);
+		DummyRpcEndpoint dummyRpcEndpoint2 = new DummyRpcEndpoint(rpcService, "dummyRpcEndpoint2", expectedValue2);
+		rpcService.registerGateway(dummyRpcEndpoint.getAddress(), dummyRpcEndpoint.getSelfGateway(DummyGateway.class));
+		rpcService.registerGateway(dummyRpcEndpoint2.getAddress(), dummyRpcEndpoint2.getSelfGateway(DummyGateway.class));
+
+		try {
+			dummyRpcEndpoint.start();
+			dummyRpcEndpoint2.start();
+
+			testingLeaderRetrievalService.start(gatewayRetriever);
+
+			final CompletableFuture<DummyGateway> gatewayFuture = gatewayRetriever.getFuture();
+
+			assertFalse(gatewayFuture.isDone());
+
+			testingLeaderRetrievalService.notifyListener(dummyRpcEndpoint.getAddress(), leaderSessionId);
+
+			final DummyGateway dummyGateway = gatewayFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			assertEquals(dummyRpcEndpoint.getAddress(), dummyGateway.getAddress());
+			assertEquals(expectedValue, dummyGateway.foobar(TIMEOUT).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS));
+
+			// elect a new leader
+			testingLeaderRetrievalService.notifyListener(dummyRpcEndpoint2.getAddress(), leaderSessionId);
+
+			final CompletableFuture<DummyGateway> gatewayFuture2 = gatewayRetriever.getFuture();
+			final DummyGateway dummyGateway2 = gatewayFuture2.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			assertEquals(dummyRpcEndpoint2.getAddress(), dummyGateway2.getAddress());
+			assertEquals(expectedValue2, dummyGateway2.foobar(TIMEOUT).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS));
+		} finally {
+			dummyRpcEndpoint.shutDown();
+			dummyRpcEndpoint2.shutDown();
+			dummyRpcEndpoint.getTerminationFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+			dummyRpcEndpoint2.getTerminationFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+		}
+	}
+
+	/**
+	 * Testing RpcGateway.
+	 */
+	public interface DummyGateway extends RpcGateway {
+		CompletableFuture<String> foobar(@RpcTimeout Time timeout);
+	}
+
+	static class DummyRpcEndpoint extends RpcEndpoint implements DummyGateway {
+
+		private final String value;
+
+		protected DummyRpcEndpoint(RpcService rpcService, String endpointId, String value) {
+			super(rpcService, endpointId);
+			this.value = value;
+		}
+
+		@Override
+		public CompletableFuture<String> foobar(Time timeout) {
+			return CompletableFuture.completedFuture(value);
+		}
+	}
+}