You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/28 09:04:29 UTC

[2/2] flink git commit: [FLINK-4787] [runtime-web] Return generic HttpResponse in RequestHandler

[FLINK-4787] [runtime-web] Return generic HttpResponse in RequestHandler

- Let RequestHandler return a generic HttpResponse instead of a String. This
  enables handlers to return custom reponses (differnt http codes, etc.)
- Introduce AbstractJsonRequestHandler for default JSON responses


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fb60091
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fb60091
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fb60091

Branch: refs/heads/master
Commit: 2fb600916860acf2256464659ca60424bbf26857
Parents: e9b20ec
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Oct 11 10:08:14 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Oct 28 11:04:12 2016 +0200

----------------------------------------------------------------------
 .../webmonitor/RuntimeMonitorHandler.java       | 23 +++---
 .../AbstractExecutionGraphRequestHandler.java   |  4 +-
 .../handlers/AbstractJsonRequestHandler.java    | 73 ++++++++++++++++++++
 .../handlers/ClusterOverviewHandler.java        |  4 +-
 .../handlers/CurrentJobIdsHandler.java          |  5 +-
 .../handlers/CurrentJobsOverviewHandler.java    |  4 +-
 .../handlers/DashboardConfigHandler.java        |  4 +-
 .../handlers/JarAccessDeniedHandler.java        |  4 +-
 .../webmonitor/handlers/JarActionHandler.java   |  2 +-
 .../webmonitor/handlers/JarDeleteHandler.java   |  4 +-
 .../webmonitor/handlers/JarListHandler.java     |  4 +-
 .../webmonitor/handlers/JarPlanHandler.java     |  2 +-
 .../webmonitor/handlers/JarRunHandler.java      |  2 +-
 .../webmonitor/handlers/JarUploadHandler.java   |  4 +-
 .../handlers/JobCancellationHandler.java        |  4 +-
 .../handlers/JobManagerConfigHandler.java       |  4 +-
 .../webmonitor/handlers/JobStoppingHandler.java |  4 +-
 .../webmonitor/handlers/RequestHandler.java     | 16 +++--
 .../handlers/TaskManagersHandler.java           |  5 +-
 .../metrics/AbstractMetricsHandler.java         |  6 +-
 .../metrics/AbstractMetricsHandlerTest.java     |  6 +-
 21 files changed, 130 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 5008a8c..aba4e17 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -23,6 +23,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
@@ -61,7 +62,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 
 	public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address";
 	
-	private final RequestHandler handler;	
+	private final RequestHandler handler;
 
 	public RuntimeMonitorHandler(
 			RequestHandler handler,
@@ -75,7 +76,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 
 	@Override
 	protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) {
-		DefaultFullHttpResponse response;
+		FullHttpResponse response;
 
 		try {
 			// we only pass the first element in the list to the handlers.
@@ -93,14 +94,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 			queryParams.put(WEB_MONITOR_ADDRESS_KEY,
 				(httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort());
 
-			String result = handler.handleRequest(pathParams, queryParams, jobManager);
-			byte[] bytes = result.getBytes(ENCODING);
-
-			response = new DefaultFullHttpResponse(
-					HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
-
-			response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
-			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+			response = handler.handleRequest(pathParams, queryParams, jobManager);
 		}
 		catch (NotFoundException e) {
 			// this should result in a 404 error code (not found)
@@ -108,6 +102,8 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 					: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
 			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
 			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
+			response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
 			LOG.debug("Error while handling request", e);
 		}
 		catch (Exception e) {
@@ -115,11 +111,14 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
 					HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
 			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
+			response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
 			LOG.debug("Error while handling request", e);
 		}
 
-		response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
-		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+		response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
+
 		KeepAliveWrite.flush(ctx, routed.request(), response);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index ff28d4e..8cd70e9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -30,7 +30,7 @@ import java.util.Map;
  * Base class for request handlers whose response depends on an ExecutionGraph
  * that can be retrieved via "jobid" parameter.
  */
-public abstract class AbstractExecutionGraphRequestHandler implements RequestHandler {
+public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonRequestHandler {
 	
 	private final ExecutionGraphHolder executionGraphHolder;
 	
@@ -39,7 +39,7 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		String jidString = pathParams.get("jobid");
 		if (jidString == null) {
 			throw new RuntimeException("JobId parameter missing");

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
new file mode 100644
index 0000000..ae163cb
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.runtime.instance.ActorGateway;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+/**
+ * Base class for most request handlers. The handlers must produce a JSON response.
+ */
+public abstract class AbstractJsonRequestHandler implements RequestHandler {
+
+	private static final Charset ENCODING = Charset.forName("UTF-8");
+
+	@Override
+	public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+		String result = handleJsonRequest(pathParams, queryParams, jobManager);
+		byte[] bytes = result.getBytes(ENCODING);
+
+		DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+				HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
+
+		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+		return response;
+	}
+
+	/**
+	 * Core method that handles the request and generates the response. The method needs to
+	 * respond with a valid JSON string. Exceptions may be thrown and will be handled.
+	 *
+	 * @param pathParams The map of REST path parameters, decoded by the router.
+	 * @param queryParams The map of query parameters.
+	 * @param jobManager The JobManager actor.
+	 *
+	 * @return The JSON string that is the HTTP response.
+	 *
+	 * @throws Exception Handlers may forward exceptions. Exceptions of type
+	 *         {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
+	 *         response with the exception message, other exceptions will cause a HTTP 500 response
+	 *         with the exception stack trace.
+	 */
+	public abstract String handleJsonRequest(
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			ActorGateway jobManager) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index b7389c4..99ef3d9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -36,7 +36,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Responder that returns the status of the Flink cluster, such as how many
  * TaskManagers are currently connected, and how many jobs are running.
  */
-public class ClusterOverviewHandler implements RequestHandler {
+public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 
 	private static final String version = EnvironmentInformation.getVersion();
 
@@ -49,7 +49,7 @@ public class ClusterOverviewHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		// we need no parameters, get all requests
 		try {
 			if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 11f2a3b..b690c56 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -38,7 +37,7 @@ import static java.util.Objects.requireNonNull;
  * May serve the IDs of current jobs, or past jobs, depending on whether this handler is
  * given the JobManager or Archive Actor Reference.
  */
-public class CurrentJobIdsHandler implements RequestHandler {
+public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 
 	private final FiniteDuration timeout;
 	
@@ -47,7 +46,7 @@ public class CurrentJobIdsHandler implements RequestHandler {
 	}
 	
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		// we need no parameters, get all requests
 		try {
 			if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 571f911..07064da 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -36,7 +36,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Request handler that returns a summary of the job status.
  */
-public class CurrentJobsOverviewHandler implements RequestHandler {
+public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 
 	private final FiniteDuration timeout;
 	
@@ -55,7 +55,7 @@ public class CurrentJobsOverviewHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			if (jobManager != null) {
 				Future<Object> future = jobManager.ask(

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index debb24c..6fe072b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -31,7 +31,7 @@ import java.util.TimeZone;
  * against this web server should behave. It defines for example the refresh interval,
  * and time zone of the server timestamps.
  */
-public class DashboardConfigHandler implements RequestHandler {
+public class DashboardConfigHandler extends AbstractJsonRequestHandler {
 	
 	private final String configString;
 	
@@ -67,7 +67,7 @@ public class DashboardConfigHandler implements RequestHandler {
 	}
 	
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		return this.configString;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index 67673e2..ba32d0d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -22,13 +22,13 @@ import org.apache.flink.runtime.instance.ActorGateway;
 
 import java.util.Map;
 
-public class JarAccessDeniedHandler implements RequestHandler {
+public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
 
 	private static final String ERROR_MESSAGE = "{\"error\": \"Web submission interface is not " +
 			"available for this cluster. To enable it, set the configuration key ' jobmanager.web.submit.enable.'\"}";
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		return ERROR_MESSAGE;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index 9da54c1..1e23f1f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -47,7 +47,7 @@ import java.util.Map;
 /**
  * Abstract handler for fetching plan for a jar or running a jar.
  */
-public abstract class JarActionHandler implements RequestHandler {
+public abstract class JarActionHandler extends AbstractJsonRequestHandler {
 	
 	private final File jarDir;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 6e6c520..ae959a5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -29,7 +29,7 @@ import java.util.Map;
 /**
  * Handles requests for deletion of jars.
  */
-public class JarDeleteHandler implements RequestHandler {
+public class JarDeleteHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
@@ -38,7 +38,7 @@ public class JarDeleteHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		final String file = pathParams.get("jarid");
 		try {
 			File[] list = jarDir.listFiles(new FilenameFilter() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index c263628..f3cdc30 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -31,7 +31,7 @@ import java.util.Map;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
 
-public class JarListHandler implements RequestHandler {
+public class JarListHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
@@ -40,7 +40,7 @@ public class JarListHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			StringWriter writer = new StringWriter();
 			JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 7e0a810..3a95d6a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -37,7 +37,7 @@ public class JarPlanHandler extends JarActionHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			JobGraph graph = getJobGraphAndClassLoader(pathParams, queryParams).f0;
 			StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 18b0f15..8d3e57f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -48,7 +48,7 @@ public class JarRunHandler extends JarActionHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(pathParams, queryParams);
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 011e8f9..9a3b0e1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -27,7 +27,7 @@ import java.util.UUID;
 /**
  * Handles requests for uploading of jars.
  */
-public class JarUploadHandler implements RequestHandler {
+public class JarUploadHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
@@ -36,7 +36,7 @@ public class JarUploadHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(
+	public String handleJsonRequest(
 				Map<String, String> pathParams,
 				Map<String, String> queryParams,
 				ActorGateway jobManager) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index b17acdc..9f35719 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -28,10 +28,10 @@ import java.util.Map;
 /**
  * Request handler for the CANCEL request.
  */
-public class JobCancellationHandler implements RequestHandler {
+public class JobCancellationHandler extends AbstractJsonRequestHandler {
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
 			if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index 6d9f7e1..11ca931 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -28,7 +28,7 @@ import java.util.Map;
 /**
  * Returns the Job Manager's configuration.
  */
-public class JobManagerConfigHandler implements RequestHandler {
+public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
 
 	private final Configuration config;
 
@@ -37,7 +37,7 @@ public class JobManagerConfigHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index 791790a..0f8c958 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -28,10 +28,10 @@ import java.util.Map;
 /**
  * Request handler for the STOP request.
  */
-public class JobStoppingHandler implements RequestHandler {
+public class JobStoppingHandler extends AbstractJsonRequestHandler {
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
 			if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 0927b7e..c56cfc3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -18,29 +18,35 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import io.netty.handler.codec.http.FullHttpResponse;
 import org.apache.flink.runtime.instance.ActorGateway;
 
 import java.util.Map;
 
 /**
- * Base interface for all request handlers. The handlers must produce a JSOn response.
+ * Base interface for all request handlers.
+ *
+ * <p>Most handlers will want to use the {@link AbstractJsonRequestHandler}
+ * as a starting point, which produces a valid HTTP response.
  */
 public interface RequestHandler {
 
 	/**
 	 * Core method that handles the request and generates the response. The method needs to
-	 * respond with a valid JSON string. Exceptions may be throws and will be handled.
+	 * respond with a full http response, including content-type, content-length, etc.
+	 *
+	 * <p>Exceptions may be throws and will be handled.
 	 * 
 	 * @param pathParams The map of REST path parameters, decoded by the router.
 	 * @param queryParams The map of query parameters.
 	 * @param jobManager The JobManager actor.
-	 * 
-	 * @return The JSON string that is the HTTP response.
+	 *
+	 * @return The full http response.
 	 * 
 	 * @throws Exception Handlers may forward exceptions. Exceptions of type
 	 *         {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
 	 *         response with the exception message, other exceptions will cause a HTTP 500 response
 	 *         with the exception stack trace.
 	 */
-	String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception;
+	FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index b5e9088..c20d4fe 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers;
 import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
 import org.apache.flink.util.StringUtils;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -38,7 +37,7 @@ import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
-public class TaskManagersHandler implements RequestHandler {
+public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 
 	public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
 	
@@ -49,7 +48,7 @@ public class TaskManagersHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			if (jobManager != null) {
 				// whether one task manager's metrics are requested, or all task manager, we

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index 8374523..80126c6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -19,8 +19,8 @@ package org.apache.flink.runtime.webmonitor.metrics;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -38,7 +38,7 @@ import java.util.Map;
  * The handler will then return a list containing the values of the requested metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
  */
-public abstract class AbstractMetricsHandler implements RequestHandler {
+public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
 	private final MetricFetcher fetcher;
 
 	public AbstractMetricsHandler(MetricFetcher fetcher) {
@@ -46,7 +46,7 @@ public abstract class AbstractMetricsHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		fetcher.update();
 		String requestedMetricsList = queryParams.get("get");
 		return requestedMetricsList != null

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index 483dbf6..13a9067 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -48,7 +48,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		pathParams.put("vertexid", "taskid");
 
 		// get list of available metrics
-		String availableList = handler.handleRequest(pathParams, queryParams, null);
+		String availableList = handler.handleJsonRequest(pathParams, queryParams, null);
 
 		assertEquals("[" +
 				"{\"id\":\"8.opname.abc.metric5\"}," +
@@ -59,7 +59,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		// get value for a single metric
 		queryParams.put("get", "8.opname.abc.metric5");
 
-		String metricValue = handler.handleRequest(pathParams, queryParams, null);
+		String metricValue = handler.handleJsonRequest(pathParams, queryParams, null);
 
 		assertEquals("[" +
 				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" +
@@ -70,7 +70,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		// get values for multiple metrics
 		queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4");
 
-		String metricValues = handler.handleRequest(pathParams, queryParams, null);
+		String metricValues = handler.handleJsonRequest(pathParams, queryParams, null);
 
 		assertEquals("[" +
 				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +