You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/18 21:00:52 UTC

flink git commit: [FLINK-7459] Generalize Flink's redirection logic

Repository: flink
Updated Branches:
  refs/heads/master 6ad0d3519 -> 41dba8bb7


[FLINK-7459] Generalize Flink's redirection logic

Introduce RedirectHandler which can be extended to add redirection functionality to all
SimpleInboundChannelHandlers. This allows to share the same functionality across the
StaticFileServerHandler and the RuntimeMonitorHandlerBase which could now be removed.
In the future, the AbstractRestHandler will also extend the RedirectHandler.

This closes #4551.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/41dba8bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/41dba8bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/41dba8bb

Branch: refs/heads/master
Commit: 41dba8bb71fd615bcab219a48708fb2b9c4762be
Parents: 6ad0d35
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Aug 15 13:55:47 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 18 15:16:41 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/util/OptionalConsumer.java |  55 +++++++
 .../runtime/webmonitor/RedirectHandler.java     | 142 +++++++++++++++++
 .../webmonitor/RuntimeMonitorHandler.java       |  11 +-
 .../webmonitor/RuntimeMonitorHandlerBase.java   | 134 ----------------
 .../flink/runtime/webmonitor/WebHandler.java    |  32 ++++
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  21 ++-
 .../files/StaticFileServerHandler.java          | 114 +++----------
 .../handlers/HandlerRedirectUtils.java          |  10 +-
 .../handlers/TaskManagerLogHandler.java         |   8 +-
 .../runtime/webmonitor/RedirectHandlerTest.java | 159 +++++++++++++++++++
 ...obCancellationWithSavepointHandlersTest.java |   2 +-
 .../webmonitor/testutils/HttpTestClient.java    |   2 +-
 12 files changed, 436 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/41dba8bb/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java b/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java
new file mode 100644
index 0000000..94eac2f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * Utility class which allows to run code depending on whether the
+ * optional has a value or is empty.
+ *
+ * <p>This code has been copied from: https://stackoverflow.com/a/29395324/4815083.
+ *
+ * @param <T> type of the optional
+ */
+public class OptionalConsumer<T> {
+	private Optional<T> optional;
+
+	private OptionalConsumer(Optional<T> optional) {
+		this.optional = optional;
+	}
+
+	public static <T> OptionalConsumer<T> of(Optional<T> optional) {
+		return new OptionalConsumer<>(optional);
+	}
+
+	public OptionalConsumer<T> ifPresent(Consumer<T> c) {
+		optional.ifPresent(c);
+		return this;
+	}
+
+	public OptionalConsumer<T> ifNotPresent(Runnable r) {
+		if (!optional.isPresent()) {
+			r.run();
+		}
+
+		return this;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/41dba8bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RedirectHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RedirectHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RedirectHandler.java
new file mode 100644
index 0000000..589d1a5
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RedirectHandler.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.OptionalConsumer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
+ * REST endpoints.
+ *
+ * @param <T> type of the leader to retrieve
+ */
+@ChannelHandler.Sharable
+public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
+
+	protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+	private final CompletableFuture<String> localAddressFuture;
+
+	protected final GatewayRetriever<T> leaderRetriever;
+
+	protected final Time timeout;
+
+	/** Whether the web service has https enabled. */
+	protected final boolean httpsEnabled;
+
+	private String localAddress;
+
+	protected RedirectHandler(
+			@Nonnull CompletableFuture<String> localAddressFuture,
+			@Nonnull GatewayRetriever<T> leaderRetriever,
+			@Nonnull Time timeout,
+			boolean httpsEnabled) {
+		this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
+		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
+		this.timeout = Preconditions.checkNotNull(timeout);
+		this.httpsEnabled = httpsEnabled;
+		localAddress = null;
+	}
+
+	@Override
+	protected void channelRead0(
+		ChannelHandlerContext channelHandlerContext,
+		Routed routed) throws Exception {
+
+		if (localAddressFuture.isDone()) {
+			if (localAddress == null) {
+				try {
+					localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+				} catch (Exception e) {
+					logger.error("Could not obtain local address.", e);
+					KeepAliveWrite.flush(channelHandlerContext, routed.request(), HandlerRedirectUtils.getErrorResponse(e));
+				}
+			}
+
+			OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
+
+			optLeaderConsumer.ifPresent(
+				(T gateway) -> {
+					OptionalConsumer<CompletableFuture<String>> optRedirectAddressConsumer = OptionalConsumer.of(
+						HandlerRedirectUtils.getRedirectAddress(
+							localAddress,
+							gateway,
+							timeout));
+
+					optRedirectAddressConsumer
+						.ifPresent(
+							(CompletableFuture<String> redirectAddressFuture) ->
+								redirectAddressFuture.whenComplete(
+									(String redirectAddress, Throwable throwable) -> {
+										HttpResponse response;
+
+										if (throwable != null) {
+											logger.error("Could not retrieve the redirect address.", throwable);
+											response = HandlerRedirectUtils.getErrorResponse(throwable);
+										} else {
+											response = HandlerRedirectUtils.getRedirectResponse(
+												redirectAddress,
+												routed.path(),
+												httpsEnabled);
+										}
+
+										KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
+									}
+								))
+						.ifNotPresent(
+							() -> {
+								try {
+									respondAsLeader(channelHandlerContext, routed, gateway);
+								} catch (Exception e) {
+									logger.error("Error while responding as leader.", e);
+									KeepAliveWrite.flush(
+										channelHandlerContext,
+										routed.request(),
+										HandlerRedirectUtils.getErrorResponse(e));
+								}
+							});
+				}
+			).ifNotPresent(
+				() -> KeepAliveWrite.flush(channelHandlerContext, routed.request(), HandlerRedirectUtils.getUnavailableResponse()));
+		} else {
+			KeepAliveWrite.flush(channelHandlerContext, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
+		}
+	}
+
+	protected abstract void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/41dba8bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index d3fc177..6e388e1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -30,6 +30,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
@@ -54,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * proper codes, like OK, NOT_FOUND, or SERVER_ERROR.
  */
 @ChannelHandler.Sharable
-public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
+public class RuntimeMonitorHandler extends RedirectHandler<JobManagerGateway> implements WebHandler {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RuntimeMonitorHandler.class);
 
@@ -72,12 +73,11 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 			Time timeout,
 			boolean httpsEnabled) {
 
-		super(retriever, localJobManagerAddressFuture, timeout, httpsEnabled);
+		super(localJobManagerAddressFuture, retriever, timeout, httpsEnabled);
 		this.handler = checkNotNull(handler);
 		this.allowOrigin = cfg.getAllowOrigin();
 	}
 
-	@Override
 	public String[] getPaths() {
 		return handler.getPaths();
 	}
@@ -103,13 +103,14 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 				(httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort());
 
 			responseFuture = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
+
 		} catch (Exception e) {
 			responseFuture = FutureUtils.completedExceptionally(e);
 		}
 
 		responseFuture.whenComplete(
-			(FullHttpResponse httpResponse, Throwable throwable) -> {
-				final FullHttpResponse finalResponse;
+			(HttpResponse httpResponse, Throwable throwable) -> {
+				final HttpResponse finalResponse;
 
 				if (throwable != null) {
 					LOG.debug("Error while handling request.", throwable);

http://git-wip-us.apache.org/repos/asf/flink/blob/41dba8bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
deleted file mode 100644
index 4f4facf..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
-import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The Netty channel handler that processes all HTTP requests.
- * This handler takes the path parameters and delegates the work to a {@link RequestHandler}.
- * This handler also deals with setting correct response MIME types and returning
- * proper codes, like OK, NOT_FOUND, or SERVER_ERROR.
- */
-@ChannelHandler.Sharable
-public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHandler<Routed> {
-
-	private final Logger logger = LoggerFactory.getLogger(getClass());
-
-	private final GatewayRetriever<JobManagerGateway> retriever;
-
-	protected final CompletableFuture<String> localJobManagerAddressFuture;
-
-	protected final Time timeout;
-
-	/** Whether the web service has https enabled. */
-	protected final boolean httpsEnabled;
-
-	protected String localJobManagerAddress;
-
-	public RuntimeMonitorHandlerBase(
-		GatewayRetriever<JobManagerGateway> retriever,
-		CompletableFuture<String> localJobManagerAddressFuture,
-		Time timeout,
-		boolean httpsEnabled) {
-
-		this.retriever = checkNotNull(retriever);
-		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
-		this.timeout = checkNotNull(timeout);
-		this.httpsEnabled = httpsEnabled;
-	}
-
-	/**
-	 * Returns an array of REST URL's under which this handler can be registered.
-	 *
-	 * @return array containing REST URL's under which this handler can be registered.
-	 */
-	public abstract String[] getPaths();
-
-	@Override
-	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
-		if (localJobManagerAddressFuture.isDone()) {
-			if (localJobManagerAddress == null) {
-				localJobManagerAddress = localJobManagerAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-			}
-
-			Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();
-
-			if (optJobManagerGateway.isPresent()) {
-				JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
-				Optional<CompletableFuture<String>> optRedirectAddress = HandlerRedirectUtils.getRedirectAddress(
-					localJobManagerAddress,
-					jobManagerGateway,
-					timeout);
-
-				if (optRedirectAddress.isPresent()) {
-					optRedirectAddress.get().whenComplete(
-						(String redirectAddress, Throwable throwable) -> {
-							HttpResponse response;
-
-							if (throwable != null) {
-								logger.error("Could not retrieve the redirect address.", throwable);
-								response = HandlerRedirectUtils.getErrorResponse(throwable);
-							} else {
-								try {
-									response = HandlerRedirectUtils.getRedirectResponse(
-										redirectAddress,
-										routed.path(),
-										httpsEnabled);
-								} catch (Exception e) {
-									logger.error("Could not create the redirect response.", e);
-									response = HandlerRedirectUtils.getErrorResponse(e);
-								}
-							}
-
-							KeepAliveWrite.flush(ctx, routed.request(), response);
-						});
-				} else {
-					respondAsLeader(ctx, routed, jobManagerGateway);
-				}
-			} else {
-				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
-			}
-		} else {
-			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
-		}
-	}
-
-	protected abstract void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/41dba8bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
new file mode 100644
index 0000000..5fbf2a3
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+/**
+ * Marker interface for web handlers which can describe their paths.
+ */
+public interface WebHandler {
+
+	/**
+	 * Paths to register the handler under.
+	 *
+	 * @return Array of paths under which the handler wants to be registered
+	 */
+	String[] getPaths();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/41dba8bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 0cdab9c..9dbbf2f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -77,6 +77,7 @@ import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 
 import org.slf4j.Logger;
@@ -293,7 +294,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		router
 			// log and stdout
 			.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
-				new StaticFileServerHandler(
+				new StaticFileServerHandler<>(
 					retriever,
 					jobManagerAddressFuture,
 					timeout,
@@ -301,7 +302,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 					enableSSL))
 
 			.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
-				new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.stdOutFile,
+				new StaticFileServerHandler<>(retriever, jobManagerAddressFuture, timeout, logFiles.stdOutFile,
 					enableSSL));
 
 		get(router, new JobManagerMetricsHandler(executor, metricFetcher));
@@ -352,7 +353,11 @@ public class WebRuntimeMonitor implements WebMonitor {
 		}
 
 		// this handler serves all the static contents
-		router.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, webRootDir,
+		router.GET("/:*", new StaticFileServerHandler<>(
+			retriever,
+			jobManagerAddressFuture,
+			timeout,
+			webRootDir,
 			enableSSL));
 
 		// add shutdown hook for deleting the directories and remaining temp files on shutdown
@@ -483,14 +488,14 @@ public class WebRuntimeMonitor implements WebMonitor {
 	}
 
 	/** These methods are used in the route path setup. They register the given {@link RequestHandler} or
-	 * {@link RuntimeMonitorHandlerBase} with the given {@link Router} for the respective REST method.
+	 * {@link RuntimeMonitorHandler} with the given {@link Router} for the respective REST method.
 	 * The REST paths under which they are registered are defined by the handlers. **/
 
 	private void get(Router router, RequestHandler handler) {
 		get(router, handler(handler));
 	}
 
-	private void get(Router router, RuntimeMonitorHandlerBase handler) {
+	private static <T extends ChannelInboundHandler & WebHandler> void get(Router router, T handler) {
 		for (String path : handler.getPaths()) {
 			router.GET(path, handler);
 		}
@@ -500,17 +505,17 @@ public class WebRuntimeMonitor implements WebMonitor {
 		delete(router, handler(handler));
 	}
 
-	private void delete(Router router, RuntimeMonitorHandlerBase handler) {
+	private static <T extends ChannelInboundHandler & WebHandler> void delete(Router router, T handler) {
 		for (String path : handler.getPaths()) {
 			router.DELETE(path, handler);
 		}
 	}
 
-	private void post(Router router, RequestHandler handler) {
+	private void post(Router router, RequestHandler handler)  {
 		post(router, handler(handler));
 	}
 
-	private void post(Router router, RuntimeMonitorHandlerBase handler) {
+	private static <T extends ChannelInboundHandler & WebHandler> void post(Router router, T handler) {
 		for (String path : handler.getPaths()) {
 			router.POST(path, handler);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/41dba8bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index 1564064..520aa53 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -27,8 +27,8 @@ package org.apache.flink.runtime.webmonitor.files;
  *****************************************************************************/
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.webmonitor.RedirectHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -37,7 +37,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
-import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
@@ -47,15 +46,11 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
 import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -71,7 +66,6 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.Locale;
-import java.util.Optional;
 import java.util.TimeZone;
 import java.util.concurrent.CompletableFuture;
 
@@ -97,10 +91,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * example.</p>
  */
 @ChannelHandler.Sharable
-public class StaticFileServerHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
-
-	/** Default logger, if none is specified. */
-	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
+public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectHandler<T> {
 
 	/** Timezone in which this server answers its "if-modified" requests. */
 	private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT");
@@ -113,47 +104,19 @@ public class StaticFileServerHandler<T extends RestfulGateway> extends SimpleCha
 
 	// ------------------------------------------------------------------------
 
-	private final GatewayRetriever<T> retriever;
-
-	private final CompletableFuture<String> localJobManagerAddressFuture;
-
-	private final Time timeout;
-
 	/** The path in which the static documents are. */
 	private final File rootPath;
 
-	/** Whether the web service has https enabled. */
-	private final boolean httpsEnabled;
-
-	/** The log for all error reporting. */
-	private final Logger logger;
-
-	private String localJobManagerAddress;
-
 	public StaticFileServerHandler(
 			GatewayRetriever<T> retriever,
-			CompletableFuture<String> localJobManagerAddressPromise,
+			CompletableFuture<String> localJobManagerAddressFuture,
 			Time timeout,
 			File rootPath,
 			boolean httpsEnabled) throws IOException {
 
-		this(retriever, localJobManagerAddressPromise, timeout, rootPath, httpsEnabled, DEFAULT_LOGGER);
-	}
-
-	public StaticFileServerHandler(
-			GatewayRetriever<T> retriever,
-			CompletableFuture<String> localJobManagerAddressFuture,
-			Time timeout,
-			File rootPath,
-			boolean httpsEnabled,
-			Logger logger) throws IOException {
+		super(localJobManagerAddressFuture, retriever, timeout, httpsEnabled);
 
-		this.retriever = checkNotNull(retriever);
-		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
-		this.timeout = checkNotNull(timeout);
 		this.rootPath = checkNotNull(rootPath).getCanonicalFile();
-		this.httpsEnabled = httpsEnabled;
-		this.logger = checkNotNull(logger);
 	}
 
 	// ------------------------------------------------------------------------
@@ -161,69 +124,28 @@ public class StaticFileServerHandler<T extends RestfulGateway> extends SimpleCha
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
-		if (localJobManagerAddressFuture.isDone()) {
-			if (localJobManagerAddress == null) {
-				localJobManagerAddress = localJobManagerAddressFuture.get();
-			}
-
-			final HttpRequest request = routed.request();
-			final String requestPath;
-
-			// make sure we request the "index.html" in case there is a directory request
-			if (routed.path().endsWith("/")) {
-				requestPath = routed.path() + "index.html";
-			}
-			// in case the files being accessed are logs or stdout files, find appropriate paths.
-			else if (routed.path().equals("/jobmanager/log") || routed.path().equals("/jobmanager/stdout")) {
-				requestPath = "";
-			} else {
-				requestPath = routed.path();
-			}
+	protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception {
+		final HttpRequest request = routed.request();
+		final String requestPath;
 
-			Optional<T> optLeader = retriever.getNow();
-
-			if (optLeader.isPresent()) {
-				// Redirect to leader if necessary
-				Optional<CompletableFuture<String>> optRedirectAddress = HandlerRedirectUtils.getRedirectAddress(
-					localJobManagerAddress,
-					optLeader.get(),
-					timeout);
-
-				if (optRedirectAddress.isPresent()) {
-					optRedirectAddress.get().whenComplete(
-						(String address, Throwable throwable) -> {
-							if (throwable != null) {
-								logger.error("Failed to obtain redirect address.", throwable);
-								sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
-							} else {
-								try {
-									HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(
-										address, requestPath, httpsEnabled);
-
-									KeepAliveWrite.flush(ctx, routed.request(), redirect);
-								} catch (Exception e) {
-									logger.error("Failed to send redirect response.", e);
-									sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
-								}
-							}
-						});
-				} else {
-					respondAsLeader(ctx, request, requestPath);
-				}
-			}
-			else {
-				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
-			}
+		// make sure we request the "index.html" in case there is a directory request
+		if (routed.path().endsWith("/")) {
+			requestPath = routed.path() + "index.html";
+		}
+		// in case the files being accessed are logs or stdout files, find appropriate paths.
+		else if (routed.path().equals("/jobmanager/log") || routed.path().equals("/jobmanager/stdout")) {
+			requestPath = "";
 		} else {
-			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
+			requestPath = routed.path();
 		}
+
+		respondToRequest(channelHandlerContext, request, requestPath);
 	}
 
 	/**
 	 * Response when running with leading JobManager.
 	 */
-	private void respondAsLeader(ChannelHandlerContext ctx, HttpRequest request, String requestPath)
+	private void respondToRequest(ChannelHandlerContext ctx, HttpRequest request, String requestPath)
 			throws IOException, ParseException, URISyntaxException {
 
 		// convert to absolute path

http://git-wip-us.apache.org/repos/asf/flink/blob/41dba8bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index 642d2f4..9bb93cc 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -56,7 +56,7 @@ public class HandlerRedirectUtils {
 	public static Optional<CompletableFuture<String>> getRedirectAddress(
 			String localJobManagerAddress,
 			RestfulGateway restfulGateway,
-			Time timeout) throws Exception {
+			Time timeout) {
 
 		final String leaderAddress = restfulGateway.getAddress();
 
@@ -72,7 +72,7 @@ public class HandlerRedirectUtils {
 		}
 	}
 
-	public static HttpResponse getRedirectResponse(String redirectAddress, String path, boolean httpsEnabled) throws Exception {
+	public static HttpResponse getRedirectResponse(String redirectAddress, String path, boolean httpsEnabled) {
 		checkNotNull(redirectAddress, "Redirect address");
 		checkNotNull(path, "Path");
 
@@ -100,7 +100,7 @@ public class HandlerRedirectUtils {
 		return unavailableResponse;
 	}
 
-	public static FullHttpResponse getResponse(HttpResponseStatus status, @Nullable String message) {
+	public static HttpResponse getResponse(HttpResponseStatus status, @Nullable String message) {
 		ByteBuf messageByteBuf = message == null ? Unpooled.buffer(0)
 			: Unpooled.wrappedBuffer(message.getBytes(ENCODING));
 		FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, messageByteBuf);
@@ -110,11 +110,11 @@ public class HandlerRedirectUtils {
 		return response;
 	}
 
-	public static FullHttpResponse getErrorResponse(Throwable throwable) {
+	public static HttpResponse getErrorResponse(Throwable throwable) {
 		return getErrorResponse(throwable, HttpResponseStatus.INTERNAL_SERVER_ERROR);
 	}
 
-	public static FullHttpResponse getErrorResponse(Throwable throwable, HttpResponseStatus status) {
+	public static HttpResponse getErrorResponse(Throwable throwable, HttpResponseStatus status) {
 		byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING);
 		FullHttpResponse response = new DefaultFullHttpResponse(
 			HttpVersion.HTTP_1_1,

http://git-wip-us.apache.org/repos/asf/flink/blob/41dba8bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index b382b4c..dc9b49f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -36,7 +36,8 @@ import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
+import org.apache.flink.runtime.webmonitor.RedirectHandler;
+import org.apache.flink.runtime.webmonitor.WebHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -89,7 +90,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * example.</p>
  */
 @ChannelHandler.Sharable
-public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
+public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> implements WebHandler {
 	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerLogHandler.class);
 
 	private static final String TASKMANAGER_LOG_REST_PATH = "/taskmanagers/:taskmanagerid/log";
@@ -128,7 +129,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 		Configuration config,
 		boolean httpsEnabled,
 		BlobView blobView) {
-		super(retriever, localJobManagerAddressPromise, timeout, httpsEnabled);
+		super(localJobManagerAddressPromise, retriever, timeout, httpsEnabled);
 
 		this.executor = checkNotNull(executor);
 		this.config = config;
@@ -137,7 +138,6 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 		this.blobView = Preconditions.checkNotNull(blobView, "blobView");
 	}
 
-	@Override
 	public String[] getPaths() {
 		switch (fileMode) {
 			case LOG:

http://git-wip-us.apache.org/repos/asf/flink/blob/41dba8bb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
new file mode 100644
index 0000000..e434a1d
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RedirectHandler}.
+ */
+public class RedirectHandlerTest extends TestLogger {
+
+	private static final String RESPONSE_MESSAGE = "foobar";
+
+	/**
+	 * Tests the behaviour of the RedirectHandler under the following conditions.
+	 *
+	 * <p>1. No local address known --> service unavailable
+	 * 2. Local address knwon but no gateway resolved --> service unavailable
+	 * 3. Remote leader gateway --> redirection
+	 * 4. Local leader gateway
+	 * @throws Exception
+	 */
+	@Test
+	public void testRedirectHandler() throws Exception {
+		final String restPath = "/testing";
+		final String correctAddress = "foobar";
+		final String incorrectAddres = "barfoo";
+		final String redirectionAddress = "foobar:12345";
+		final String expectedRedirection = "http://" + redirectionAddress + restPath;
+
+		final Configuration configuration = new Configuration();
+		final Router router = new Router();
+		final Time timeout = Time.seconds(10L);
+		final CompletableFuture<String> localAddressFuture = new CompletableFuture<>();
+		final GatewayRetriever<RestfulGateway> gatewayRetriever = mock(GatewayRetriever.class);
+
+		final RestfulGateway redirectionGateway = mock(RestfulGateway.class);
+		when(redirectionGateway.getAddress()).thenReturn(incorrectAddres);
+		when(redirectionGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(redirectionAddress));
+
+		final RestfulGateway localGateway = mock(RestfulGateway.class);
+		when(localGateway.getAddress()).thenReturn(correctAddress);
+
+		when(gatewayRetriever.getNow()).thenReturn(Optional.empty(), Optional.of(redirectionGateway), Optional.of(localGateway));
+
+		final TestingHandler testingHandler = new TestingHandler(
+			localAddressFuture,
+			gatewayRetriever,
+			timeout,
+			false);
+
+		router.GET(restPath, testingHandler);
+		WebFrontendBootstrap bootstrap = new WebFrontendBootstrap(
+			router,
+			log,
+			null,
+			null,
+			"localhost",
+			0,
+			configuration);
+
+		try (HttpTestClient httpClient = new HttpTestClient("localhost", bootstrap.getServerPort())) {
+			// 1. without completed local address future --> Service unavailable
+			httpClient.sendGetRequest(restPath, FutureUtils.toFiniteDuration(timeout));
+
+			HttpTestClient.SimpleHttpResponse response = httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout));
+
+			Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE, response.getStatus());
+
+			// 2. with completed local address future but no leader gateway available --> Service unavailable
+			localAddressFuture.complete(correctAddress);
+
+			httpClient.sendGetRequest(restPath, FutureUtils.toFiniteDuration(timeout));
+
+			response = httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout));
+
+			Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE, response.getStatus());
+
+			// 3. with leader gateway which is not the one of this REST endpoints --> Redirection required
+			httpClient.sendGetRequest(restPath, FutureUtils.toFiniteDuration(timeout));
+
+			response = httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout));
+
+			Assert.assertEquals(HttpResponseStatus.TEMPORARY_REDIRECT, response.getStatus());
+			Assert.assertEquals(expectedRedirection, response.getLocation());
+
+			// 4. with local REST endpoint
+			httpClient.sendGetRequest(restPath, FutureUtils.toFiniteDuration(timeout));
+
+			response = httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout));
+
+			Assert.assertEquals(HttpResponseStatus.OK, response.getStatus());
+			Assert.assertEquals(RESPONSE_MESSAGE, response.getContent());
+
+		} finally {
+			bootstrap.shutdown();
+		}
+	}
+
+	private static class TestingHandler extends RedirectHandler<RestfulGateway> {
+
+		protected TestingHandler(
+				@Nonnull CompletableFuture<String> localAddressFuture,
+				@Nonnull GatewayRetriever<RestfulGateway> leaderRetriever,
+				@Nonnull Time timeout,
+				boolean httpsEnabled) {
+			super(localAddressFuture, leaderRetriever, timeout, httpsEnabled);
+		}
+
+		@Override
+		protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, RestfulGateway gateway) throws Exception {
+			HttpResponse response = HandlerRedirectUtils.getResponse(HttpResponseStatus.OK, RESPONSE_MESSAGE);
+			KeepAliveWrite.flush(channelHandlerContext.channel(), routed.request(), response);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/41dba8bb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
index 529d130..e34631e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
@@ -187,7 +187,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(successfulCancelWithSavepoint);
 
 		// Trigger
-		FullHttpResponse response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+		FullHttpResponse response = trigger.handleRequest(params, Collections.emptyMap(), jobManager).get();
 
 		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/41dba8bb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
index 94dd5f8..d9608fe 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
@@ -266,7 +266,7 @@ public class HttpTestClient implements AutoCloseable {
 		@Override
 		public String toString() {
 			return "HttpResponse(status=" + status + ", type='" + type + "'" + ", content='" +
-					content + "')";
+					content + ", location = " + location + "')";
 		}
 	}