You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/19 22:26:28 UTC

flink git commit: [FLINK-7527] [rest] Let AbstractRestHandler extend RedirectHandler

Repository: flink
Updated Branches:
  refs/heads/master a66315a5c -> 75e84e04f


[FLINK-7527] [rest] Let AbstractRestHandler extend RedirectHandler

By letting the AbstractRestHandler extend the RedirectHandler, we add redirection
capabilities to the AbstractRestHandler.

This closes #4597.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75e84e04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75e84e04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75e84e04

Branch: refs/heads/master
Commit: 75e84e04f5a3e2766e331fd05ddb725fe9b00d99
Parents: a66315a
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 17 15:04:19 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 20 00:22:27 2017 +0200

----------------------------------------------------------------------
 .../runtime/webmonitor/RedirectHandler.java     | 142 ---------------
 .../webmonitor/RuntimeMonitorHandler.java       |   5 +-
 .../runtime/webmonitor/files/MimeTypes.java     | 121 -------------
 .../files/StaticFileServerHandler.java          |   3 +-
 .../handlers/HandlerRedirectUtils.java          | 129 --------------
 .../handlers/TaskManagerLogHandler.java         |   2 +-
 .../runtime/webmonitor/RedirectHandlerTest.java |   3 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |   4 +-
 .../runtime/webmonitor/files/MimeTypesTest.java |   2 +
 .../handlers/HandlerRedirectUtilsTest.java      |   1 +
 .../flink/runtime/rest/RestServerEndpoint.java  |   4 +-
 .../rest/handler/AbstractRestHandler.java       | 100 +++--------
 .../rest/handler/PipelineErrorHandler.java      |   7 +-
 .../runtime/rest/handler/RedirectHandler.java   | 173 +++++++++++++++++++
 .../runtime/rest/handler/RouterHandler.java     |   3 +-
 .../rest/handler/util/HandlerRedirectUtils.java | 128 ++++++++++++++
 .../runtime/rest/handler/util/HandlerUtils.java | 141 +++++++++++++++
 .../runtime/rest/handler/util/MimeTypes.java    | 121 +++++++++++++
 .../flink/runtime/rest/RestEndpointITCase.java  |  54 ++++--
 19 files changed, 651 insertions(+), 492 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RedirectHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RedirectHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RedirectHandler.java
deleted file mode 100644
index 589d1a5..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RedirectHandler.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.OptionalConsumer;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
- * REST endpoints.
- *
- * @param <T> type of the leader to retrieve
- */
-@ChannelHandler.Sharable
-public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
-
-	protected final Logger logger = LoggerFactory.getLogger(getClass());
-
-	private final CompletableFuture<String> localAddressFuture;
-
-	protected final GatewayRetriever<T> leaderRetriever;
-
-	protected final Time timeout;
-
-	/** Whether the web service has https enabled. */
-	protected final boolean httpsEnabled;
-
-	private String localAddress;
-
-	protected RedirectHandler(
-			@Nonnull CompletableFuture<String> localAddressFuture,
-			@Nonnull GatewayRetriever<T> leaderRetriever,
-			@Nonnull Time timeout,
-			boolean httpsEnabled) {
-		this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
-		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
-		this.timeout = Preconditions.checkNotNull(timeout);
-		this.httpsEnabled = httpsEnabled;
-		localAddress = null;
-	}
-
-	@Override
-	protected void channelRead0(
-		ChannelHandlerContext channelHandlerContext,
-		Routed routed) throws Exception {
-
-		if (localAddressFuture.isDone()) {
-			if (localAddress == null) {
-				try {
-					localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-				} catch (Exception e) {
-					logger.error("Could not obtain local address.", e);
-					KeepAliveWrite.flush(channelHandlerContext, routed.request(), HandlerRedirectUtils.getErrorResponse(e));
-				}
-			}
-
-			OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
-
-			optLeaderConsumer.ifPresent(
-				(T gateway) -> {
-					OptionalConsumer<CompletableFuture<String>> optRedirectAddressConsumer = OptionalConsumer.of(
-						HandlerRedirectUtils.getRedirectAddress(
-							localAddress,
-							gateway,
-							timeout));
-
-					optRedirectAddressConsumer
-						.ifPresent(
-							(CompletableFuture<String> redirectAddressFuture) ->
-								redirectAddressFuture.whenComplete(
-									(String redirectAddress, Throwable throwable) -> {
-										HttpResponse response;
-
-										if (throwable != null) {
-											logger.error("Could not retrieve the redirect address.", throwable);
-											response = HandlerRedirectUtils.getErrorResponse(throwable);
-										} else {
-											response = HandlerRedirectUtils.getRedirectResponse(
-												redirectAddress,
-												routed.path(),
-												httpsEnabled);
-										}
-
-										KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
-									}
-								))
-						.ifNotPresent(
-							() -> {
-								try {
-									respondAsLeader(channelHandlerContext, routed, gateway);
-								} catch (Exception e) {
-									logger.error("Error while responding as leader.", e);
-									KeepAliveWrite.flush(
-										channelHandlerContext,
-										routed.request(),
-										HandlerRedirectUtils.getErrorResponse(e));
-								}
-							});
-				}
-			).ifNotPresent(
-				() -> KeepAliveWrite.flush(channelHandlerContext, routed.request(), HandlerRedirectUtils.getUnavailableResponse()));
-		} else {
-			KeepAliveWrite.flush(channelHandlerContext, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
-		}
-	}
-
-	protected abstract void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 6e388e1..cdcd0fe 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
@@ -45,7 +46,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils.ENCODING;
+import static org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils.ENCODING;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
deleted file mode 100644
index 4834cbc..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.files;
-
-import java.util.HashMap;
-
-/**
- * Simple utility class that resolves file extensions to MIME types.
- *
- * <p>There are various solutions built into Java that depend on extra resource and configuration
- * files. They are designed to be composable and extensible, but also unfortunately tricky to control.
- * This is meant to be a simple solution that may eventually be subsumed by a better one.
- */
-public class MimeTypes {
-
-	/** The default mime type. */
-	private static final String DEFAULT_MIME_TYPE = "application/octet-stream";
-
-	/** The map with the constants. */
-	private static final HashMap<String, String> MIME_MAP = new HashMap<String, String>();
-
-	/**
-	 * Gets the MIME type for the file with the given extension. If the mime type is not recognized,
-	 * this method returns null.
-	 *
-	 * @param fileExtension The file extension.
-	 * @return The MIME type, or {@code null}, if the file extension is not recognized.
-	 */
-	public static String getMimeTypeForExtension(String fileExtension) {
-		return MIME_MAP.get(fileExtension.toLowerCase());
-	}
-
-	/**
-	 * Gets the MIME type for the file with the given name, by extension. This method tries to extract
-	 * the file extension and then use the {@link #getMimeTypeForExtension(String)} to determine the
-	 * MIME type. If the extension cannot be determined, or the extension is unrecognized, this method
-	 * return {@code null}.
-	 *
-	 * @param fileName The file name.
-	 * @return The MIME type, or {@code null}, if the file's extension is not recognized.
-	 */
-	public static String getMimeTypeForFileName(String fileName) {
-		int extensionPos = fileName.lastIndexOf('.');
-		if (extensionPos >= 1 && extensionPos < fileName.length() - 1) {
-			String extension = fileName.substring(extensionPos + 1);
-			return getMimeTypeForExtension(extension);
-		}
-		else {
-			return null;
-		}
-	}
-
-	/**
-	 * Gets the default MIME type, which is {@code "application/octet-stream"}.
-	 *
-	 * @return The default MIME type.
-	 */
-	public static String getDefaultMimeType() {
-		return DEFAULT_MIME_TYPE;
-	}
-
-	// ------------------------------------------------------------------------
-	//  prevent instantiation
-	// ------------------------------------------------------------------------
-
-	private MimeTypes() {}
-
-	// ------------------------------------------------------------------------
-	//  initialization
-	// ------------------------------------------------------------------------
-
-	static {
-		// text types
-		MIME_MAP.put("html", "text/html");
-		MIME_MAP.put("htm", "text/html");
-		MIME_MAP.put("css", "text/css");
-		MIME_MAP.put("txt", "text/plain");
-		MIME_MAP.put("log", "text/plain");
-		MIME_MAP.put("out", "text/plain");
-		MIME_MAP.put("err", "text/plain");
-		MIME_MAP.put("xml", "text/xml");
-		MIME_MAP.put("csv", "text/csv");
-
-		// application types
-		MIME_MAP.put("js", "application/javascript");
-		MIME_MAP.put("json", "application/json");
-
-		// image types
-		MIME_MAP.put("png", "image/png");
-		MIME_MAP.put("jpg", "image/jpeg");
-		MIME_MAP.put("jpeg", "image/jpeg");
-		MIME_MAP.put("gif", "image/gif");
-		MIME_MAP.put("svg", "image/svg+xml");
-		MIME_MAP.put("tiff", "image/tiff");
-		MIME_MAP.put("tff", "image/tiff");
-		MIME_MAP.put("bmp", "image/bmp");
-
-		// fonts
-		MIME_MAP.put("woff", "application/font-woff");
-		MIME_MAP.put("woff2", "application/font-woff2");
-		MIME_MAP.put("ttf", "font/ttf");
-		MIME_MAP.put("otf", "font/opentype");
-		MIME_MAP.put("eot", "font/application/vnd.ms-fontobject");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index 520aa53..e6e632e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -27,7 +27,8 @@ package org.apache.flink.runtime.webmonitor.files;
  *****************************************************************************/
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.webmonitor.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.util.MimeTypes;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
deleted file mode 100644
index 9bb93cc..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.files.MimeTypes;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-
-import javax.annotation.Nullable;
-
-import java.nio.charset.Charset;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Utilities to extract a redirect address.
- *
- * <p>This is necessary at the moment, because many execution graph structures are not serializable.
- * The proper solution here is to have these serializable and transparently work with the leading
- * job manager instead of redirecting.
- */
-public class HandlerRedirectUtils {
-
-	public static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
-
-	public static Optional<CompletableFuture<String>> getRedirectAddress(
-			String localJobManagerAddress,
-			RestfulGateway restfulGateway,
-			Time timeout) {
-
-		final String leaderAddress = restfulGateway.getAddress();
-
-		final String jobManagerName = localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1);
-
-		if (!localJobManagerAddress.equals(leaderAddress) &&
-			!leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) {
-
-			return Optional.of(restfulGateway.requestRestAddress(timeout));
-
-		} else {
-			return Optional.empty();
-		}
-	}
-
-	public static HttpResponse getRedirectResponse(String redirectAddress, String path, boolean httpsEnabled) {
-		checkNotNull(redirectAddress, "Redirect address");
-		checkNotNull(path, "Path");
-
-		String protocol = httpsEnabled ? "https" : "http";
-		String newLocation = String.format("%s://%s%s", protocol, redirectAddress, path);
-
-		HttpResponse redirectResponse = new DefaultFullHttpResponse(
-				HttpVersion.HTTP_1_1, HttpResponseStatus.TEMPORARY_REDIRECT);
-		redirectResponse.headers().set(HttpHeaders.Names.LOCATION, newLocation);
-		redirectResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
-
-		return redirectResponse;
-	}
-
-	public static HttpResponse getUnavailableResponse() {
-		String result = "Service temporarily unavailable due to an ongoing leader election. Please refresh.";
-		byte[] bytes = result.getBytes(ConfigConstants.DEFAULT_CHARSET);
-
-		HttpResponse unavailableResponse = new DefaultFullHttpResponse(
-				HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE, Unpooled.wrappedBuffer(bytes));
-
-		unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytes.length);
-		unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, MimeTypes.getMimeTypeForExtension("txt"));
-
-		return unavailableResponse;
-	}
-
-	public static HttpResponse getResponse(HttpResponseStatus status, @Nullable String message) {
-		ByteBuf messageByteBuf = message == null ? Unpooled.buffer(0)
-			: Unpooled.wrappedBuffer(message.getBytes(ENCODING));
-		FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, messageByteBuf);
-		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
-		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
-		return response;
-	}
-
-	public static HttpResponse getErrorResponse(Throwable throwable) {
-		return getErrorResponse(throwable, HttpResponseStatus.INTERNAL_SERVER_ERROR);
-	}
-
-	public static HttpResponse getErrorResponse(Throwable throwable, HttpResponseStatus status) {
-		byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING);
-		FullHttpResponse response = new DefaultFullHttpResponse(
-			HttpVersion.HTTP_1_1,
-			status,
-			Unpooled.wrappedBuffer(bytes));
-
-		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
-		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
-		return response;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index b3238af..bb9b7f5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.RedirectHandler;
 import org.apache.flink.runtime.webmonitor.WebHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
index e434a1d..2bd6673 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 59f8b9d..72498ee 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -31,10 +31,10 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.rest.handler.util.MimeTypes;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.webmonitor.files.MimeTypes;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
@@ -330,7 +330,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				HttpTestClient.SimpleHttpResponse response = client.getNextResponse();
 
 				assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE, response.getStatus());
-				assertEquals(MimeTypes.getMimeTypeForExtension("txt"), response.getType());
+				assertEquals(MimeTypes.getMimeTypeForExtension("json"), response.getType());
 				assertTrue(response.getContent().contains("refresh"));
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java
index 2594b11..0a8d9d8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.files;
 
+import org.apache.flink.runtime.rest.handler.util.MimeTypes;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertNotNull;

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
index a8562b3..81553e4 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 735d3f8..88c7f78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -74,7 +74,7 @@ public abstract class RestServerEndpoint {
 	 * This method is called at the beginning of {@link #start()} to setup all handlers that the REST server endpoint
 	 * implementation requires.
 	 */
-	protected abstract Collection<AbstractRestHandler<?, ?, ?>> initializeHandlers();
+	protected abstract Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers();
 
 	/**
 	 * Starts this REST server endpoint.
@@ -185,7 +185,7 @@ public abstract class RestServerEndpoint {
 		}
 	}
 
-	private static <R extends RequestBody, P extends ResponseBody> void registerHandler(Router router, AbstractRestHandler<R, P, ?> handler) {
+	private static <R extends RequestBody, P extends ResponseBody> void registerHandler(Router router, AbstractRestHandler<?, R, P, ?> handler) {
 		switch (handler.getMessageHeaders().getHttpMethod()) {
 			case GET:
 				router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 864d74c..95483c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -18,31 +18,25 @@
 
 package org.apache.flink.runtime.rest.handler;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 
 import com.fasterxml.jackson.core.JsonParseException;
@@ -53,14 +47,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
-import java.io.IOException;
-import java.io.StringWriter;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
 /**
  * Super class for netty-based handlers that work with {@link RequestBody}s and {@link ResponseBody}s.
  *
@@ -70,14 +58,20 @@ import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVer
  * @param <P> type of outgoing responses
  */
 @ChannelHandler.Sharable
-public abstract class AbstractRestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends SimpleChannelInboundHandler<Routed> {
+public abstract class AbstractRestHandler<T extends RestfulGateway, R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends RedirectHandler<T> {
 	protected final Logger log = LoggerFactory.getLogger(getClass());
 
 	private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper();
 
 	private final MessageHeaders<R, P, M> messageHeaders;
 
-	protected AbstractRestHandler(MessageHeaders<R, P, M> messageHeaders) {
+	protected AbstractRestHandler(
+			CompletableFuture<String> localAddressFuture,
+			GatewayRetriever<T> leaderRetriever,
+			Time timeout,
+			boolean httpsEnabled,
+			MessageHeaders<R, P, M> messageHeaders) {
+		super(localAddressFuture, leaderRetriever, timeout, httpsEnabled);
 		this.messageHeaders = messageHeaders;
 	}
 
@@ -86,7 +80,7 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
 	}
 
 	@Override
-	protected void channelRead0(final ChannelHandlerContext ctx, Routed routed) throws Exception {
+	protected void respondAsLeader(final ChannelHandlerContext ctx, Routed routed, T gateway) throws Exception {
 		if (log.isDebugEnabled()) {
 			log.debug("Received request " + routed.request().getUri() + '.');
 		}
@@ -98,7 +92,7 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
 				// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
 				// FullHttpRequests.
 				log.error("Implementation error: Received a request that wasn't a FullHttpRequest.");
-				sendErrorResponse(new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST, ctx, httpRequest);
+				HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST);
 				return;
 			}
 
@@ -110,7 +104,7 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
 					request = mapper.readValue("{}", messageHeaders.getRequestClass());
 				} catch (JsonParseException | JsonMappingException je) {
 					log.error("Implementation error: Get request bodies must have a no-argument constructor.", je);
-					sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+					HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
 					return;
 				}
 			} else {
@@ -119,7 +113,7 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
 					request = mapper.readValue(in, messageHeaders.getRequestClass());
 				} catch (JsonParseException | JsonMappingException je) {
 					log.error("Failed to read request.", je);
-					sendErrorResponse(new ErrorResponseBody(String.format("Request did not match expected format %s.", messageHeaders.getRequestClass().getSimpleName())), HttpResponseStatus.BAD_REQUEST, ctx, httpRequest);
+					HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody(String.format("Request did not match expected format %s.", messageHeaders.getRequestClass().getSimpleName())), HttpResponseStatus.BAD_REQUEST);
 					return;
 				}
 			}
@@ -127,7 +121,7 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
 			CompletableFuture<P> response;
 			try {
 				HandlerRequest<R, M> handlerRequest = new HandlerRequest<>(request, messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
-				response = handleRequest(handlerRequest);
+				response = handleRequest(handlerRequest, gateway);
 			} catch (Exception e) {
 				response = FutureUtils.completedExceptionally(e);
 			}
@@ -136,18 +130,18 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
 				if (error != null) {
 					if (error instanceof RestHandlerException) {
 						RestHandlerException rhe = (RestHandlerException) error;
-						sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getHttpResponseStatus(), ctx, httpRequest);
+						HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody(rhe.getErrorMessage()), rhe.getHttpResponseStatus());
 					} else {
 						log.error("Implementation error: Unhandled exception.", error);
-						sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+						HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
 					}
 				} else {
-					sendResponse(messageHeaders.getResponseStatusCode(), resp, ctx, httpRequest);
+					HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode());
 				}
 			});
 		} catch (Exception e) {
 			log.error("Request processing failed.", e);
-			sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+			HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
 		}
 	}
 
@@ -162,57 +156,9 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
 	 * {@link HttpResponseStatus#INTERNAL_SERVER_ERROR} will be returned.
 	 *
 	 * @param request request that should be handled
+	 * @param gateway leader gateway
 	 * @return future containing a handler response
 	 * @throws RestHandlerException if the handling failed
 	 */
-	protected abstract CompletableFuture<P> handleRequest(@Nonnull HandlerRequest<R, M> request) throws RestHandlerException;
-
-	private static <P extends ResponseBody> void sendResponse(HttpResponseStatus statusCode, P response, ChannelHandlerContext ctx, HttpRequest httpRequest) {
-		StringWriter sw = new StringWriter();
-		try {
-			mapper.writeValue(sw, response);
-		} catch (IOException ioe) {
-			sendErrorResponse(new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
-			return;
-		}
-		sendResponse(ctx, httpRequest, statusCode, sw.toString());
-	}
-
-	static void sendErrorResponse(ErrorResponseBody error, HttpResponseStatus statusCode, ChannelHandlerContext ctx, HttpRequest httpRequest) {
-
-		StringWriter sw = new StringWriter();
-		try {
-			mapper.writeValue(sw, error);
-		} catch (IOException e) {
-			// this should never happen
-			sendResponse(ctx, httpRequest, HttpResponseStatus.INTERNAL_SERVER_ERROR, "Internal server error. Could not map error response to JSON.");
-		}
-		sendResponse(ctx, httpRequest, statusCode, sw.toString());
-	}
-
-	private static void sendResponse(@Nonnull ChannelHandlerContext ctx, @Nonnull HttpRequest httpRequest, @Nonnull HttpResponseStatus statusCode, @Nonnull String message) {
-		HttpResponse response = new DefaultHttpResponse(HTTP_1_1, statusCode);
-
-		response.headers().set(CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE);
-
-		if (HttpHeaders.isKeepAlive(httpRequest)) {
-			response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-		}
-
-		byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET);
-		ByteBuf b = Unpooled.copiedBuffer(buf);
-		HttpHeaders.setContentLength(response, buf.length);
-
-		// write the initial line and the header.
-		ctx.write(response);
-
-		ctx.write(b);
-
-		ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-
-		// close the connection, if no keep-alive is needed
-		if (!HttpHeaders.isKeepAlive(httpRequest)) {
-			lastContentFuture.addListener(ChannelFutureListener.CLOSE);
-		}
-	}
+	protected abstract CompletableFuture<P> handleRequest(@Nonnull HandlerRequest<R, M> request, @Nonnull T gateway) throws RestHandlerException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
index 14e643c..b43afdc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.handler;
 
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -45,7 +46,11 @@ public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpReques
 	protected void channelRead0(ChannelHandlerContext ctx, HttpRequest message) {
 		// we can't deal with this message. No one in the pipeline handled it. Log it.
 		logger.warn("Unknown message received: {}", message);
-		AbstractRestHandler.sendErrorResponse(new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST, ctx, message);
+		HandlerUtils.sendErrorResponse(
+			ctx,
+			message,
+			new ErrorResponseBody("Bad request received."),
+			HttpResponseStatus.BAD_REQUEST);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
new file mode 100644
index 0000000..9f63456
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.OptionalConsumer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
+ * REST endpoints.
+ *
+ * @param <T> type of the leader to retrieve
+ */
+@ChannelHandler.Sharable
+public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
+
+	protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+	private final CompletableFuture<String> localAddressFuture;
+
+	protected final GatewayRetriever<T> leaderRetriever;
+
+	protected final Time timeout;
+
+	/** Whether the web service has https enabled. */
+	protected final boolean httpsEnabled;
+
+	private String localAddress;
+
+	protected RedirectHandler(
+			@Nonnull CompletableFuture<String> localAddressFuture,
+			@Nonnull GatewayRetriever<T> leaderRetriever,
+			@Nonnull Time timeout,
+			boolean httpsEnabled) {
+		this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
+		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
+		this.timeout = Preconditions.checkNotNull(timeout);
+		this.httpsEnabled = httpsEnabled;
+		localAddress = null;
+	}
+
+	@Override
+	protected void channelRead0(
+		ChannelHandlerContext channelHandlerContext,
+		Routed routed) throws Exception {
+
+		try {
+			if (localAddressFuture.isDone()) {
+				if (localAddress == null) {
+					try {
+						localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+					} catch (Exception e) {
+						logger.error("Could not obtain local address.", e);
+
+						HandlerUtils.sendErrorResponse(
+							channelHandlerContext,
+							routed.request(),
+							new ErrorResponseBody("Fatal error. Could not obtain local address. Please try to refresh."),
+							HttpResponseStatus.INTERNAL_SERVER_ERROR);
+					}
+				}
+
+				OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
+
+				optLeaderConsumer.ifPresent(
+					(T gateway) -> {
+						OptionalConsumer<CompletableFuture<String>> optRedirectAddressConsumer = OptionalConsumer.of(
+							HandlerRedirectUtils.getRedirectAddress(
+								localAddress,
+								gateway,
+								timeout));
+
+						optRedirectAddressConsumer
+							.ifPresent(
+								(CompletableFuture<String> redirectAddressFuture) ->
+									redirectAddressFuture.whenComplete(
+										(String redirectAddress, Throwable throwable) -> {
+											if (throwable != null) {
+												logger.error("Could not retrieve the redirect address.", throwable);
+
+												HandlerUtils.sendErrorResponse(
+													channelHandlerContext,
+													routed.request(),
+													new ErrorResponseBody("Could not retrieve the redirect address of the current leader. Please try to refresh."),
+													HttpResponseStatus.INTERNAL_SERVER_ERROR);
+											} else {
+												HttpResponse response = HandlerRedirectUtils.getRedirectResponse(
+													redirectAddress,
+													routed.path(),
+													httpsEnabled);
+
+												KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
+											}
+										}
+									))
+							.ifNotPresent(
+								() -> {
+									try {
+										respondAsLeader(channelHandlerContext, routed, gateway);
+									} catch (Exception e) {
+										logger.error("Error while responding as leader.", e);
+										HandlerUtils.sendErrorResponse(
+											channelHandlerContext,
+											routed.request(),
+											new ErrorResponseBody("Error while responding to the request."),
+											HttpResponseStatus.INTERNAL_SERVER_ERROR);
+									}
+								});
+					}
+				).ifNotPresent(
+					() ->
+						HandlerUtils.sendErrorResponse(
+							channelHandlerContext,
+							routed.request(),
+							new ErrorResponseBody("Service temporarily unavailable due to an ongoing leader election. Please refresh."),
+							HttpResponseStatus.SERVICE_UNAVAILABLE));
+			} else {
+				HandlerUtils.sendErrorResponse(
+					channelHandlerContext,
+					routed.request(),
+					new ErrorResponseBody("Service temporarily unavailable due to an ongoing leader election. Please refresh."),
+					HttpResponseStatus.SERVICE_UNAVAILABLE);
+			}
+		} catch (Throwable throwable) {
+			logger.warn("Error occurred while processing web request.", throwable);
+			HandlerUtils.sendErrorResponse(
+				channelHandlerContext,
+				routed.request(),
+				new ErrorResponseBody("Error occurred in RedirectHandler: " + throwable.getMessage() + '.'),
+				HttpResponseStatus.INTERNAL_SERVER_ERROR);
+		}
+	}
+
+	protected abstract void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
index 72b779b..cfc456f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.handler;
 
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -42,6 +43,6 @@ public class RouterHandler extends Handler {
 
 	@Override
 	protected void respondNotFound(ChannelHandlerContext ctx, HttpRequest request) {
-		AbstractRestHandler.sendErrorResponse(new ErrorResponseBody("Not found."), HttpResponseStatus.NOT_FOUND, ctx, request);
+		HandlerUtils.sendErrorResponse(ctx, request, new ErrorResponseBody("Not found."), HttpResponseStatus.NOT_FOUND);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRedirectUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRedirectUtils.java
new file mode 100644
index 0000000..6f988a4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRedirectUtils.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.util;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+
+import javax.annotation.Nullable;
+
+import java.nio.charset.Charset;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utilities to extract a redirect address.
+ *
+ * <p>This is necessary at the moment, because many execution graph structures are not serializable.
+ * The proper solution here is to have these serializable and transparently work with the leading
+ * job manager instead of redirecting.
+ */
+public class HandlerRedirectUtils {
+
+	public static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
+
+	public static Optional<CompletableFuture<String>> getRedirectAddress(
+			String localJobManagerAddress,
+			RestfulGateway restfulGateway,
+			Time timeout) {
+
+		final String leaderAddress = restfulGateway.getAddress();
+
+		final String jobManagerName = localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1);
+
+		if (!localJobManagerAddress.equals(leaderAddress) &&
+			!leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) {
+
+			return Optional.of(restfulGateway.requestRestAddress(timeout));
+
+		} else {
+			return Optional.empty();
+		}
+	}
+
+	public static HttpResponse getRedirectResponse(String redirectAddress, String path, boolean httpsEnabled) {
+		checkNotNull(redirectAddress, "Redirect address");
+		checkNotNull(path, "Path");
+
+		String protocol = httpsEnabled ? "https" : "http";
+		String newLocation = String.format("%s://%s%s", protocol, redirectAddress, path);
+
+		HttpResponse redirectResponse = new DefaultFullHttpResponse(
+				HttpVersion.HTTP_1_1, HttpResponseStatus.TEMPORARY_REDIRECT);
+		redirectResponse.headers().set(HttpHeaders.Names.LOCATION, newLocation);
+		redirectResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
+
+		return redirectResponse;
+	}
+
+	public static HttpResponse getUnavailableResponse() {
+		String result = "Service temporarily unavailable due to an ongoing leader election. Please refresh.";
+		byte[] bytes = result.getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+		HttpResponse unavailableResponse = new DefaultFullHttpResponse(
+				HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE, Unpooled.wrappedBuffer(bytes));
+
+		unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytes.length);
+		unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, MimeTypes.getMimeTypeForExtension("txt"));
+
+		return unavailableResponse;
+	}
+
+	public static HttpResponse getResponse(HttpResponseStatus status, @Nullable String message) {
+		ByteBuf messageByteBuf = message == null ? Unpooled.buffer(0)
+			: Unpooled.wrappedBuffer(message.getBytes(ENCODING));
+		FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, messageByteBuf);
+		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+		return response;
+	}
+
+	public static HttpResponse getErrorResponse(Throwable throwable) {
+		return getErrorResponse(throwable, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+	}
+
+	public static HttpResponse getErrorResponse(Throwable throwable, HttpResponseStatus status) {
+		byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING);
+		FullHttpResponse response = new DefaultFullHttpResponse(
+			HttpVersion.HTTP_1_1,
+			status,
+			Unpooled.wrappedBuffer(bytes));
+
+		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+		return response;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
new file mode 100644
index 0000000..0d7483a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Utilities for the REST handlers.
+ */
+public class HandlerUtils {
+
+	private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper();
+
+	/**
+	 * Sends the given response and status code to the given channel.
+	 *
+	 * @param channelHandlerContext identifying the open channel
+	 * @param httpRequest originating http request
+	 * @param response which should be sent
+	 * @param statusCode of the message to send
+	 * @param <P> type of the response
+	 */
+	public static <P extends ResponseBody> void sendResponse(
+			ChannelHandlerContext channelHandlerContext,
+			HttpRequest httpRequest,
+			P response,
+			HttpResponseStatus statusCode) {
+		StringWriter sw = new StringWriter();
+		try {
+			mapper.writeValue(sw, response);
+		} catch (IOException ioe) {
+			sendErrorResponse(channelHandlerContext, httpRequest, new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
+			return;
+		}
+		sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode);
+	}
+
+	/**
+	 * Sends the given error response and status code to the given channel.
+	 *
+	 * @param channelHandlerContext identifying the open channel
+	 * @param httpRequest originating http request
+	 * @param errorMessage which should be sent
+	 * @param statusCode of the message to send
+	 */
+	public static void sendErrorResponse(
+			ChannelHandlerContext channelHandlerContext,
+			HttpRequest httpRequest,
+			ErrorResponseBody errorMessage,
+			HttpResponseStatus statusCode) {
+
+		StringWriter sw = new StringWriter();
+		try {
+			mapper.writeValue(sw, errorMessage);
+		} catch (IOException e) {
+			// this should never happen
+			sendResponse(channelHandlerContext, httpRequest, "Internal server error. Could not map error response to JSON.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
+		}
+		sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode);
+	}
+
+	/**
+	 * Sends the given response and status code to the given channel.
+	 *
+	 * @param channelHandlerContext identifying the open channel
+	 * @param httpRequest originating http request
+	 * @param message which should be sent
+	 * @param statusCode of the message to send
+	 */
+	public static void sendResponse(
+			@Nonnull ChannelHandlerContext channelHandlerContext,
+			@Nonnull HttpRequest httpRequest,
+			@Nonnull String message,
+			@Nonnull HttpResponseStatus statusCode) {
+		HttpResponse response = new DefaultHttpResponse(HTTP_1_1, statusCode);
+
+		response.headers().set(CONTENT_TYPE, "application/json");
+
+		if (HttpHeaders.isKeepAlive(httpRequest)) {
+			response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+		}
+
+		byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET);
+		ByteBuf b = Unpooled.copiedBuffer(buf);
+		HttpHeaders.setContentLength(response, buf.length);
+
+		// write the initial line and the header.
+		channelHandlerContext.write(response);
+
+		channelHandlerContext.write(b);
+
+		ChannelFuture lastContentFuture = channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+
+		// close the connection, if no keep-alive is needed
+		if (!HttpHeaders.isKeepAlive(httpRequest)) {
+			lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MimeTypes.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MimeTypes.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MimeTypes.java
new file mode 100644
index 0000000..6d54bea
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MimeTypes.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.util;
+
+import java.util.HashMap;
+
+/**
+ * Simple utility class that resolves file extensions to MIME types.
+ *
+ * <p>There are various solutions built into Java that depend on extra resource and configuration
+ * files. They are designed to be composable and extensible, but also unfortunately tricky to control.
+ * This is meant to be a simple solution that may eventually be subsumed by a better one.
+ */
+public class MimeTypes {
+
+	/** The default mime type. */
+	private static final String DEFAULT_MIME_TYPE = "application/octet-stream";
+
+	/** The map with the constants. */
+	private static final HashMap<String, String> MIME_MAP = new HashMap<String, String>();
+
+	/**
+	 * Gets the MIME type for the file with the given extension. If the mime type is not recognized,
+	 * this method returns null.
+	 *
+	 * @param fileExtension The file extension.
+	 * @return The MIME type, or {@code null}, if the file extension is not recognized.
+	 */
+	public static String getMimeTypeForExtension(String fileExtension) {
+		return MIME_MAP.get(fileExtension.toLowerCase());
+	}
+
+	/**
+	 * Gets the MIME type for the file with the given name, by extension. This method tries to extract
+	 * the file extension and then use the {@link #getMimeTypeForExtension(String)} to determine the
+	 * MIME type. If the extension cannot be determined, or the extension is unrecognized, this method
+	 * return {@code null}.
+	 *
+	 * @param fileName The file name.
+	 * @return The MIME type, or {@code null}, if the file's extension is not recognized.
+	 */
+	public static String getMimeTypeForFileName(String fileName) {
+		int extensionPos = fileName.lastIndexOf('.');
+		if (extensionPos >= 1 && extensionPos < fileName.length() - 1) {
+			String extension = fileName.substring(extensionPos + 1);
+			return getMimeTypeForExtension(extension);
+		}
+		else {
+			return null;
+		}
+	}
+
+	/**
+	 * Gets the default MIME type, which is {@code "application/octet-stream"}.
+	 *
+	 * @return The default MIME type.
+	 */
+	public static String getDefaultMimeType() {
+		return DEFAULT_MIME_TYPE;
+	}
+
+	// ------------------------------------------------------------------------
+	//  prevent instantiation
+	// ------------------------------------------------------------------------
+
+	private MimeTypes() {}
+
+	// ------------------------------------------------------------------------
+	//  initialization
+	// ------------------------------------------------------------------------
+
+	static {
+		// text types
+		MIME_MAP.put("html", "text/html");
+		MIME_MAP.put("htm", "text/html");
+		MIME_MAP.put("css", "text/css");
+		MIME_MAP.put("txt", "text/plain");
+		MIME_MAP.put("log", "text/plain");
+		MIME_MAP.put("out", "text/plain");
+		MIME_MAP.put("err", "text/plain");
+		MIME_MAP.put("xml", "text/xml");
+		MIME_MAP.put("csv", "text/csv");
+
+		// application types
+		MIME_MAP.put("js", "application/javascript");
+		MIME_MAP.put("json", "application/json");
+
+		// image types
+		MIME_MAP.put("png", "image/png");
+		MIME_MAP.put("jpg", "image/jpeg");
+		MIME_MAP.put("jpeg", "image/jpeg");
+		MIME_MAP.put("gif", "image/gif");
+		MIME_MAP.put("svg", "image/svg+xml");
+		MIME_MAP.put("tiff", "image/tiff");
+		MIME_MAP.put("tff", "image/tiff");
+		MIME_MAP.put("bmp", "image/bmp");
+
+		// fonts
+		MIME_MAP.put("woff", "application/font-woff");
+		MIME_MAP.put("woff2", "application/font-woff2");
+		MIME_MAP.put("ttf", "font/ttf");
+		MIME_MAP.put("otf", "font/opentype");
+		MIME_MAP.put("eot", "font/application/vnd.ms-fontobject");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
index ab43f77..89d87f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -30,8 +30,11 @@ import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
@@ -43,11 +46,13 @@ import org.junit.Test;
 
 import javax.annotation.Nonnull;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * IT cases for {@link RestClient} and {@link RestServerEndpoint}.
@@ -60,13 +65,25 @@ public class RestEndpointITCase extends TestLogger {
 	private static final Time timeout = Time.seconds(10L);
 
 	@Test
-	public void testEndpoints() throws ConfigurationException, IOException, InterruptedException, ExecutionException {
+	public void testEndpoints() throws Exception {
 		Configuration config = new Configuration();
 
 		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
 		RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration(config);
 
-		RestServerEndpoint serverEndpoint = new TestRestServerEndpoint(serverConfig);
+		final String address = "localhost";
+		RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
+		when(mockRestfulGateway.getAddress()).thenReturn(address);
+		GatewayRetriever<RestfulGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+		when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+
+		TestHandler testHandler = new TestHandler(
+			CompletableFuture.completedFuture(address),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT,
+			true);
+
+		RestServerEndpoint serverEndpoint = new TestRestServerEndpoint(serverConfig, testHandler);
 		RestClient clientEndpoint = new TestRestClient(clientConfig);
 
 		try {
@@ -111,26 +128,39 @@ public class RestEndpointITCase extends TestLogger {
 
 	private static class TestRestServerEndpoint extends RestServerEndpoint {
 
-		TestRestServerEndpoint(RestServerEndpointConfiguration configuration) {
+		private final TestHandler testHandler;
+
+		TestRestServerEndpoint(RestServerEndpointConfiguration configuration, TestHandler testHandler) {
 			super(configuration);
+
+			this.testHandler = Preconditions.checkNotNull(testHandler);
 		}
 
 		@Override
-		protected Collection<AbstractRestHandler<?, ?, ?>> initializeHandlers() {
-			return Collections.singleton(new TestHandler());
+		protected Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers() {
+			return Collections.singleton(testHandler);
 		}
 	}
 
-	private static class TestHandler extends AbstractRestHandler<TestRequest, TestResponse, TestParameters> {
+	private static class TestHandler extends AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {
 
 		public static final Object LOCK = new Object();
 
-		TestHandler() {
-			super(new TestHeaders());
+		TestHandler(
+			CompletableFuture<String> localAddressFuture,
+			GatewayRetriever<RestfulGateway> leaderRetriever,
+			Time timeout,
+			boolean httpsEnabled) {
+			super(
+				localAddressFuture,
+				leaderRetriever,
+				timeout,
+				httpsEnabled,
+				new TestHeaders());
 		}
 
 		@Override
-		protected CompletableFuture<TestResponse> handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request) throws RestHandlerException {
+		protected CompletableFuture<TestResponse> handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request, RestfulGateway gateway) throws RestHandlerException {
 			Assert.assertEquals(request.getPathParameter(JobIDPathParameter.class), PATH_JOB_ID);
 			Assert.assertEquals(request.getQueryParameter(JobIDQueryParameter.class).get(0), QUERY_JOB_ID);