You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/28 09:04:28 UTC

[1/2] flink git commit: [FLINK-4787] [runtime-web] Add JobCancellationWithSavepointHandlers

Repository: flink
Updated Branches:
  refs/heads/master e9b20ec21 -> 3bc9cad04


[FLINK-4787] [runtime-web] Add JobCancellationWithSavepointHandlers

- Add handlers for triggering and monitoring job cancellation with
  savepoints.

This closes #2626.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bc9cad0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bc9cad0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bc9cad0

Branch: refs/heads/master
Commit: 3bc9cad045b25d413f0b9f054fff12fac18a4f0e
Parents: 2fb6009
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Oct 11 10:09:20 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Oct 28 11:04:12 2016 +0200

----------------------------------------------------------------------
 docs/monitoring/rest_api.md                     |  70 +++-
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  12 +
 .../JobCancellationWithSavepointHandlers.java   | 420 +++++++++++++++++++
 ...obCancellationWithSavepointHandlersTest.java | 314 ++++++++++++++
 .../metrics/AbstractMetricsHandlerTest.java     |   8 +-
 .../checkpoint/CheckpointCoordinator.java       |   4 +
 .../executiongraph/AccessExecutionGraph.java    |   9 +
 .../executiongraph/ArchivedExecutionGraph.java  |   6 +
 .../runtime/executiongraph/ExecutionGraph.java  |   1 +
 9 files changed, 839 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/docs/monitoring/rest_api.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md
index e7fccc5..e84e2cc 100644
--- a/docs/monitoring/rest_api.md
+++ b/docs/monitoring/rest_api.md
@@ -31,7 +31,6 @@ The monitoring API is a REST-ful API that accepts HTTP GET requests and responds
 {:toc}
 
 
-
 ## Overview
 
 The monitoring API is backed by a web server that runs as part of the *JobManager*. By default, this server listens at post `8081`, which can be configured in `flink-conf.yaml` via `jobmanager.web.port`. Note that the monitoring API web server and the web dashboard web server are currently the same and thus run together at the same port. They respond to different HTTP URLs, though.
@@ -584,3 +583,72 @@ Sample Result:
   } ]
 }
 ~~~
+
+### Job Cancellation
+
+#### Cancel Job
+
+`DELETE` request to **`/jobs/:jobid/cancel`**.
+
+Triggers job cancellation, result on success is `{}`.
+
+#### Cancel Job with Savepoint
+
+Triggers a savepoint and cancels the job after the savepoint succeeds.
+
+`GET` request to **`/jobs/:jobid/cancel-with-savepoint/`** triggers a savepoint to the default savepoint directory and cancels the job.
+
+`GET` request to **`/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory`** triggers a savepoint to the given target directory and cancels the job.
+
+Since savepoints can take some time to complete this operation happens asynchronously. The result to this request is the location of the in-progress cancellation.
+
+Sample Trigger Result:
+
+~~~
+{
+  "status": "accepted",
+  "request-id": 1,
+  "location": "/jobs/:jobid/cancel-with-savepoint/in-progress/1"
+}
+~~~
+
+##### Monitoring Progress
+
+The progress of the cancellation has to be monitored by the user at
+
+~~~
+/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId
+~~~
+
+The request ID is returned by the trigger result.
+
+###### In-Progress
+
+~~~
+{
+  "status": "in-progress",
+  "request-id": 1
+}
+~~~
+
+###### Success
+
+~~~
+{
+  "status": "success",
+  "request-id": 1,
+  "savepoint-path": "<savepointPath>"
+}
+~~~
+
+The `savepointPath` points to the external path of the savepoint, which can be used to resume the savepoint.
+
+###### Failed
+
+~~~
+{
+  "status": "failed",
+  "request-id": 1,
+  "cause": "<error message>"
+}
+~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 3e2634f..e907124 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobCancellationWithSavepointHandlers;
 import org.apache.flink.runtime.webmonitor.handlers.JobCheckpointsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
@@ -238,6 +239,12 @@ public class WebRuntimeMonitor implements WebMonitor {
 		}
 		metricFetcher = new MetricFetcher(actorSystem, retriever, context);
 
+		String defaultSavepointDir = config.getString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, null);
+
+		JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, context, defaultSavepointDir);
+		RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler());
+		RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler());
+
 		router = new Router()
 			// config how to interact with this web server
 			.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
@@ -307,6 +314,10 @@ public class WebRuntimeMonitor implements WebMonitor {
 			// DELETE is the preferred way of canceling a job (Rest-conform)
 			.DELETE("/jobs/:jobid/cancel", handler(new JobCancellationHandler()))
 
+			.GET("/jobs/:jobid/cancel-with-savepoint", triggerHandler)
+			.GET("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory", triggerHandler)
+			.GET(JobCancellationWithSavepointHandlers.IN_PROGRESS_URL, inProgressHandler)
+
 			// stop a job via GET (for proper integration with YARN this has to be performed via GET)
 			.GET("/jobs/:jobid/yarn-stop", handler(new JobStoppingHandler()))
 
@@ -489,6 +500,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
+
 	private RuntimeMonitorHandler handler(RequestHandler handler) {
 		return new RuntimeMonitorHandler(handler, retriever, jobManagerAddressPromise.future(), timeout,
 			serverSSLContext !=  null);

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
new file mode 100644
index 0000000..492ce76
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import akka.dispatch.OnComplete;
+import com.fasterxml.jackson.core.JsonGenerator;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler for {@link CancelJobWithSavepoint} messages.
+ */
+public class JobCancellationWithSavepointHandlers {
+
+	/** URL for in-progress cancellations. */
+	public static final String IN_PROGRESS_URL = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
+
+	/** Encodings for String. */
+	private static final Charset ENCODING = Charset.forName("UTF-8");
+
+	/** Shared lock between Trigger and In-Progress handlers. */
+	private final Object lock = new Object();
+
+	/** In-Progress requests */
+	private final Map<JobID, Long> inProgress = new HashMap<>();
+
+	/** Succeeded/failed request. Either String or Throwable. */
+	private final Map<Long, Object> completed = new HashMap<>();
+
+	/** Atomic request counter */
+	private long requestCounter;
+
+	/** Handler for trigger requests. */
+	private final TriggerHandler triggerHandler;
+
+	/** Handler for in-progress requests. */
+	private final InProgressHandler inProgressHandler;
+
+	/** Default savepoint directory. */
+	private final String defaultSavepointDirectory;
+
+	public JobCancellationWithSavepointHandlers(
+			ExecutionGraphHolder currentGraphs,
+			ExecutionContext executionContext) {
+		this(currentGraphs, executionContext, null);
+	}
+
+	public JobCancellationWithSavepointHandlers(
+			ExecutionGraphHolder currentGraphs,
+			ExecutionContext executionContext,
+			@Nullable String defaultSavepointDirectory) {
+
+		this.triggerHandler = new TriggerHandler(currentGraphs, executionContext);
+		this.inProgressHandler = new InProgressHandler();
+		this.defaultSavepointDirectory = defaultSavepointDirectory;
+	}
+
+	public TriggerHandler getTriggerHandler() {
+		return triggerHandler;
+	}
+
+	public InProgressHandler getInProgressHandler() {
+		return inProgressHandler;
+	}
+
+	// ------------------------------------------------------------------------
+	// New requests
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Handler for triggering a {@link CancelJobWithSavepoint} message.
+	 */
+	class TriggerHandler implements RequestHandler {
+
+		/** Current execution graphs. */
+		private final ExecutionGraphHolder currentGraphs;
+
+		/** Execution context for futures. */
+		private final ExecutionContext executionContext;
+
+		public TriggerHandler(ExecutionGraphHolder currentGraphs, ExecutionContext executionContext) {
+			this.currentGraphs = checkNotNull(currentGraphs);
+			this.executionContext = checkNotNull(executionContext);
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public FullHttpResponse handleRequest(
+				Map<String, String> pathParams,
+				Map<String, String> queryParams,
+				ActorGateway jobManager) throws Exception {
+
+			try {
+				if (jobManager != null) {
+					JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+
+					AccessExecutionGraph graph = currentGraphs.getExecutionGraph(jobId, jobManager);
+					if (graph == null) {
+						throw new Exception("Cannot find ExecutionGraph for job.");
+					} else {
+						CheckpointCoordinator coord = graph.getCheckpointCoordinator();
+						if (coord == null) {
+							throw new Exception("Cannot find CheckpointCoordinator for job.");
+						}
+
+						String targetDirectory = pathParams.get("targetDirectory");
+						if (targetDirectory == null) {
+							if (defaultSavepointDirectory == null) {
+								throw new IllegalStateException("No savepoint directory configured. " +
+										"You can either specify a directory when triggering this savepoint or " +
+										"configure a cluster-wide default via key '" +
+										ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'.");
+							} else {
+								targetDirectory = defaultSavepointDirectory;
+							}
+						}
+
+						return handleNewRequest(jobManager, jobId, targetDirectory, coord.getCheckpointTimeout());
+					}
+				} else {
+					throw new Exception("No connection to the leading JobManager.");
+				}
+			} catch (Exception e) {
+				throw new Exception("Failed to cancel the job: " + e.getMessage(), e);
+			}
+		}
+
+		@SuppressWarnings("unchecked")
+		private FullHttpResponse handleNewRequest(ActorGateway jobManager, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException {
+			// Check whether a request exists
+			final long requestId;
+			final boolean isNewRequest;
+			synchronized (lock) {
+				if (inProgress.containsKey(jobId)) {
+					requestId = inProgress.get(jobId);
+					isNewRequest = false;
+				} else {
+					requestId = ++requestCounter;
+					inProgress.put(jobId, requestId);
+					isNewRequest = true;
+				}
+			}
+
+			if (isNewRequest) {
+				boolean success = false;
+
+				try {
+					// Trigger cancellation
+					Object msg = new CancelJobWithSavepoint(jobId, targetDirectory);
+					Future<Object> cancelFuture = jobManager
+							.ask(msg, FiniteDuration.apply(checkpointTimeout, "ms"));
+
+					cancelFuture.onComplete(new OnComplete<Object>() {
+						@Override
+						public void onComplete(Throwable failure, Object resp) throws Throwable {
+							synchronized (lock) {
+								try {
+									if (resp != null) {
+										if (resp.getClass() == CancellationSuccess.class) {
+											String path = ((CancellationSuccess) resp).savepointPath();
+											completed.put(requestId, path);
+										} else if (resp.getClass() == CancellationFailure.class) {
+											Throwable cause = ((CancellationFailure) resp).cause();
+											completed.put(requestId, cause);
+										} else {
+											Throwable cause = new IllegalStateException("Unexpected CancellationResponse of type " + resp.getClass());
+											completed.put(requestId, cause);
+										}
+									} else {
+										completed.put(requestId, failure);
+									}
+								} finally {
+									inProgress.remove(jobId);
+								}
+							}
+						}
+					}, executionContext);
+
+					success = true;
+				} finally {
+					synchronized (lock) {
+						if (!success) {
+							inProgress.remove(jobId);
+						}
+					}
+				}
+			}
+
+			// In-progress location
+			String location = IN_PROGRESS_URL
+					.replace(":jobid", jobId.toString())
+					.replace(":requestId", Long.toString(requestId));
+
+			// Accepted response
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+			gen.writeStartObject();
+			gen.writeStringField("status", "accepted");
+			gen.writeNumberField("request-id", requestId);
+			gen.writeStringField("location", location);
+			gen.writeEndObject();
+			gen.close();
+
+			String json = writer.toString();
+			byte[] bytes = json.getBytes(ENCODING);
+
+			DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+					HttpVersion.HTTP_1_1,
+					HttpResponseStatus.ACCEPTED,
+					Unpooled.wrappedBuffer(bytes));
+
+			response.headers().set(HttpHeaders.Names.LOCATION, location);
+
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+			FullHttpResponse accepted = response;
+
+			return accepted;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// In-progress requests
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Handler for in-progress cancel with savepoint operations.
+	 */
+	class InProgressHandler implements RequestHandler {
+
+		/** The number of recent checkpoints whose IDs are remembered. */
+		private static final int NUM_GHOST_REQUEST_IDS = 16;
+
+		/** Remember some recently completed */
+		private final ArrayDeque<Tuple2<Long, Object>> recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS);
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+			try {
+				if (jobManager != null) {
+					JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+					long requestId = Long.parseLong(pathParams.get("requestId"));
+
+					synchronized (lock) {
+						Object result = completed.remove(requestId);
+
+						if (result != null) {
+							// Add to recent history
+							recentlyCompleted.add(new Tuple2<>(requestId, result));
+							if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
+								recentlyCompleted.remove();
+							}
+
+							if (result.getClass() == String.class) {
+								String savepointPath = (String) result;
+								return createSuccessResponse(requestId, savepointPath);
+							} else {
+								Throwable cause = (Throwable) result;
+								return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
+							}
+						} else {
+							// Check in-progress
+							Long inProgressRequestId = inProgress.get(jobId);
+							if (inProgressRequestId != null) {
+								// Sanity check
+								if (inProgressRequestId == requestId) {
+									return createInProgressResponse(requestId);
+								} else {
+									String msg= "Request ID does not belong to JobID";
+									return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
+								}
+							}
+
+							// Check recent history
+							for (Tuple2<Long, Object> recent : recentlyCompleted) {
+								if (recent.f0 == requestId) {
+									if (recent.f1.getClass() == String.class) {
+										String savepointPath = (String) recent.f1;
+										return createSuccessResponse(requestId, savepointPath);
+									} else {
+										Throwable cause = (Throwable) recent.f1;
+										return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
+									}
+								}
+							}
+
+							return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID");
+						}
+					}
+				} else {
+					throw new Exception("No connection to the leading JobManager.");
+				}
+			} catch (Exception e) {
+				throw new Exception("Failed to cancel the job: " + e.getMessage(), e);
+			}
+		}
+
+		private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException {
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+			gen.writeStartObject();
+
+			gen.writeStringField("status", "success");
+			gen.writeNumberField("request-id", requestId);
+			gen.writeStringField("savepoint-path", savepointPath);
+
+			gen.writeEndObject();
+			gen.close();
+
+			String json = writer.toString();
+			byte[] bytes = json.getBytes(ENCODING);
+
+			DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+					HttpVersion.HTTP_1_1,
+					HttpResponseStatus.CREATED,
+					Unpooled.wrappedBuffer(bytes));
+
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+			return response;
+		}
+
+		private FullHttpResponse createInProgressResponse(long requestId) throws IOException {
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+			gen.writeStartObject();
+
+			gen.writeStringField("status", "in-progress");
+			gen.writeNumberField("request-id", requestId);
+
+			gen.writeEndObject();
+			gen.close();
+
+			String json = writer.toString();
+			byte[] bytes = json.getBytes(ENCODING);
+
+			DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+					HttpVersion.HTTP_1_1,
+					HttpResponseStatus.ACCEPTED,
+					Unpooled.wrappedBuffer(bytes));
+
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+			return response;
+		}
+
+		private FullHttpResponse createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) throws IOException {
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+			gen.writeStartObject();
+
+			gen.writeStringField("status", "failed");
+			gen.writeNumberField("request-id", requestId);
+			gen.writeStringField("cause", errMsg);
+
+			gen.writeEndObject();
+			gen.close();
+
+			String json = writer.toString();
+			byte[] bytes = json.getBytes(ENCODING);
+
+			DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+					HttpVersion.HTTP_1_1,
+					code,
+					Unpooled.wrappedBuffer(bytes));
+
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+			return response;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
new file mode 100644
index 0000000..cebb14e
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import akka.dispatch.ExecutionContexts$;
+import akka.dispatch.Futures;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.impl.Promise;
+
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class JobCancellationWithSavepointHandlersTest {
+
+	private static final ExecutionContext EC = ExecutionContexts$.MODULE$.fromExecutor(Executors.directExecutor());
+
+	/**
+	 * Tests that the cancellation ask timeout respects the checkpoint timeout.
+	 * Otherwise, AskTimeoutExceptions are bound to happen for large state.
+	 */
+	@Test
+	public void testAskTimeoutEqualsCheckpointTimeout() throws Exception {
+		long timeout = 128288238L;
+		JobID jobId = new JobID();
+		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+		ExecutionGraph graph = mock(ExecutionGraph.class);
+		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+		when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph);
+		when(graph.getCheckpointCoordinator()).thenReturn(coord);
+		when(coord.getCheckpointTimeout()).thenReturn(timeout);
+
+		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC);
+		JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
+
+		Map<String, String> params = new HashMap<>();
+		params.put("jobid", jobId.toString());
+		params.put("targetDirectory", "placeholder");
+
+		ActorGateway jobManager = mock(ActorGateway.class);
+
+		Future<Object> future = Futures.successful((Object) new CancellationSuccess(jobId, null));
+		when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future);
+
+		handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+
+		verify(jobManager).ask(any(CancelJobWithSavepoint.class), eq(FiniteDuration.apply(timeout, "ms")));
+	}
+
+	/**
+	 * Tests that the savepoint directory configuration is respected.
+	 */
+	@Test
+	public void testSavepointDirectoryConfiguration() throws Exception {
+		long timeout = 128288238L;
+		JobID jobId = new JobID();
+		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+		ExecutionGraph graph = mock(ExecutionGraph.class);
+		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+		when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph);
+		when(graph.getCheckpointCoordinator()).thenReturn(coord);
+		when(coord.getCheckpointTimeout()).thenReturn(timeout);
+
+		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC, "the-default-directory");
+		JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
+
+		Map<String, String> params = new HashMap<>();
+		params.put("jobid", jobId.toString());
+
+		ActorGateway jobManager = mock(ActorGateway.class);
+
+		Future<Object> future = Futures.successful((Object) new CancellationSuccess(jobId, null));
+		when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future);
+
+		// 1. Use targetDirectory path param
+		params.put("targetDirectory", "custom-directory");
+		handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+
+		verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), eq(FiniteDuration.apply(timeout, "ms")));
+
+		// 2. Use default
+		params.remove("targetDirectory");
+
+		handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+
+		verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "the-default-directory")), eq(FiniteDuration.apply(timeout, "ms")));
+
+		// 3. Throw Exception
+		handlers = new JobCancellationWithSavepointHandlers(holder, EC, null);
+		handler = handlers.getTriggerHandler();
+
+		try {
+			handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+			fail("Did not throw expected test Exception");
+		} catch (Exception e) {
+			IllegalStateException cause = (IllegalStateException) e.getCause();
+			assertEquals(true, cause.getMessage().contains(ConfigConstants.SAVEPOINT_DIRECTORY_KEY));
+		}
+	}
+
+	/**
+	 * Tests triggering a new request and monitoring it.
+	 */
+	@Test
+	public void testTriggerNewRequest() throws Exception {
+		JobID jobId = new JobID();
+		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+		ExecutionGraph graph = mock(ExecutionGraph.class);
+		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+		when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph);
+		when(graph.getCheckpointCoordinator()).thenReturn(coord);
+
+		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC);
+		JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();
+		JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler();
+
+		Map<String, String> params = new HashMap<>();
+		params.put("jobid", jobId.toString());
+		params.put("targetDirectory", "custom-directory");
+
+		ActorGateway jobManager = mock(ActorGateway.class);
+
+		// Successful
+		Promise<Object> promise = new Promise.DefaultPromise<>();
+		when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(promise);
+
+		// Trigger
+		FullHttpResponse response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+
+		verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class));
+
+		String location = String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId);
+
+		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+		assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+		assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION));
+
+		String json = response.content().toString(Charset.forName("UTF-8"));
+		JsonNode root = new ObjectMapper().readTree(json);
+
+		assertEquals("accepted", root.get("status").getValueAsText());
+		assertEquals("1", root.get("request-id").getValueAsText());
+		assertEquals(location, root.get("location").getValueAsText());
+
+		// Trigger again
+		response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+		assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+		assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION));
+
+		json = response.content().toString(Charset.forName("UTF-8"));
+		root = new ObjectMapper().readTree(json);
+
+		assertEquals("accepted", root.get("status").getValueAsText());
+		assertEquals("1", root.get("request-id").getValueAsText());
+		assertEquals(location, root.get("location").getValueAsText());
+
+		// Only single actual request
+		verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class));
+
+		// Query progress
+		params.put("requestId", "1");
+
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+		assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+		json = response.content().toString(Charset.forName("UTF-8"));
+		root = new ObjectMapper().readTree(json);
+
+		assertEquals("in-progress", root.get("status").getValueAsText());
+		assertEquals("1", root.get("request-id").getValueAsText());
+
+		// Complete
+		promise.success(new CancellationSuccess(jobId, "_path-savepoint_"));
+
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+
+		assertEquals(HttpResponseStatus.CREATED, response.getStatus());
+		assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+		json = response.content().toString(Charset.forName("UTF-8"));
+
+		root = new ObjectMapper().readTree(json);
+
+		assertEquals("success", root.get("status").getValueAsText());
+		assertEquals("1", root.get("request-id").getValueAsText());
+		assertEquals("_path-savepoint_", root.get("savepoint-path").getValueAsText());
+
+		// Query again, keep recent history
+
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+
+		assertEquals(HttpResponseStatus.CREATED, response.getStatus());
+		assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+		json = response.content().toString(Charset.forName("UTF-8"));
+
+		root = new ObjectMapper().readTree(json);
+
+		assertEquals("success", root.get("status").getValueAsText());
+		assertEquals("1", root.get("request-id").getValueAsText());
+		assertEquals("_path-savepoint_", root.get("savepoint-path").getValueAsText());
+
+		// Query for unknown request
+		params.put("requestId", "9929");
+
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		assertEquals(HttpResponseStatus.BAD_REQUEST, response.getStatus());
+		assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+		json = response.content().toString(Charset.forName("UTF-8"));
+
+		root = new ObjectMapper().readTree(json);
+
+		assertEquals("failed", root.get("status").getValueAsText());
+		assertEquals("9929", root.get("request-id").getValueAsText());
+		assertEquals("Unknown job/request ID", root.get("cause").getValueAsText());
+	}
+
+	/**
+	 * Tests response when a request fails.
+	 */
+	@Test
+	public void testFailedCancellation() throws Exception {
+		JobID jobId = new JobID();
+		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+		ExecutionGraph graph = mock(ExecutionGraph.class);
+		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+		when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph);
+		when(graph.getCheckpointCoordinator()).thenReturn(coord);
+
+		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC);
+		JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();
+		JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler();
+
+		Map<String, String> params = new HashMap<>();
+		params.put("jobid", jobId.toString());
+		params.put("targetDirectory", "custom-directory");
+
+		ActorGateway jobManager = mock(ActorGateway.class);
+
+		// Successful
+		Future<Object> future = Futures.failed(new Exception("Test Exception"));
+		when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future);
+
+		// Trigger
+		trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class));
+
+		// Query progress
+		params.put("requestId", "1");
+
+		FullHttpResponse response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus());
+		assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+		String json = response.content().toString(Charset.forName("UTF-8"));
+		JsonNode root = new ObjectMapper().readTree(json);
+
+		assertEquals("failed", root.get("status").getValueAsText());
+		assertEquals("1", root.get("request-id").getValueAsText());
+		assertEquals("Test Exception", root.get("cause").getValueAsText());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index 13a9067..fe7ceef 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -100,7 +100,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		pathParams.put("jobid", "nonexistent");
 
 		try {
-			assertEquals("", handler.handleRequest(pathParams, queryParams, null));
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
 		} catch (Exception e) {
 			fail();
 		}
@@ -126,7 +126,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		queryParams.put("get", "");
 
 		try {
-			assertEquals("", handler.handleRequest(pathParams, queryParams, null));
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
 		} catch (Exception e) {
 			fail(e.getMessage());
 		}
@@ -136,7 +136,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		queryParams.put("get", "subindex.opname.abc.metric5");
 
 		try {
-			assertEquals("", handler.handleRequest(pathParams, queryParams, null));
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
 		} catch (Exception e) {
 			fail(e.getMessage());
 		}
@@ -146,7 +146,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		queryParams.put("get", "subindex.opname.abc.nonexistant");
 
 		try {
-			assertEquals("", handler.handleRequest(pathParams, queryParams, null));
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
 		} catch (Exception e) {
 			fail(e.getMessage());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 588ba84..698c2f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -835,6 +835,10 @@ public class CheckpointCoordinator {
 		return checkpointIdCounter;
 	}
 
+	public long getCheckpointTimeout() {
+		return checkpointTimeout;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Periodic scheduling of checkpoints
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 0ff6ace..0fd97da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -108,6 +109,14 @@ public interface AccessExecutionGraph {
 	long getStatusTimestamp(JobStatus status);
 
 	/**
+	 * Returns the {@link CheckpointCoordinator} for this execution graph.
+	 *
+	 * @return CheckpointCoordinator for this execution graph or <code>null</code>
+	 * if none is available.
+	 */
+	CheckpointCoordinator getCheckpointCoordinator();
+
+	/**
 	 * Returns the {@link CheckpointStatsTracker} for this execution graph.
 	 *
 	 * @return CheckpointStatsTracker for thie execution graph

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 493825a..d8c58c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -198,6 +199,11 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 	}
 
 	@Override
+	public CheckpointCoordinator getCheckpointCoordinator() {
+		return null;
+	}
+
+	@Override
 	public CheckpointStatsTracker getCheckpointStatsTracker() {
 		return tracker;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 101bdba..0a79cf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -416,6 +416,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
+	@Override
 	public CheckpointCoordinator getCheckpointCoordinator() {
 		return checkpointCoordinator;
 	}


[2/2] flink git commit: [FLINK-4787] [runtime-web] Return generic HttpResponse in RequestHandler

Posted by uc...@apache.org.
[FLINK-4787] [runtime-web] Return generic HttpResponse in RequestHandler

- Let RequestHandler return a generic HttpResponse instead of a String. This
  enables handlers to return custom reponses (differnt http codes, etc.)
- Introduce AbstractJsonRequestHandler for default JSON responses


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fb60091
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fb60091
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fb60091

Branch: refs/heads/master
Commit: 2fb600916860acf2256464659ca60424bbf26857
Parents: e9b20ec
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Oct 11 10:08:14 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Oct 28 11:04:12 2016 +0200

----------------------------------------------------------------------
 .../webmonitor/RuntimeMonitorHandler.java       | 23 +++---
 .../AbstractExecutionGraphRequestHandler.java   |  4 +-
 .../handlers/AbstractJsonRequestHandler.java    | 73 ++++++++++++++++++++
 .../handlers/ClusterOverviewHandler.java        |  4 +-
 .../handlers/CurrentJobIdsHandler.java          |  5 +-
 .../handlers/CurrentJobsOverviewHandler.java    |  4 +-
 .../handlers/DashboardConfigHandler.java        |  4 +-
 .../handlers/JarAccessDeniedHandler.java        |  4 +-
 .../webmonitor/handlers/JarActionHandler.java   |  2 +-
 .../webmonitor/handlers/JarDeleteHandler.java   |  4 +-
 .../webmonitor/handlers/JarListHandler.java     |  4 +-
 .../webmonitor/handlers/JarPlanHandler.java     |  2 +-
 .../webmonitor/handlers/JarRunHandler.java      |  2 +-
 .../webmonitor/handlers/JarUploadHandler.java   |  4 +-
 .../handlers/JobCancellationHandler.java        |  4 +-
 .../handlers/JobManagerConfigHandler.java       |  4 +-
 .../webmonitor/handlers/JobStoppingHandler.java |  4 +-
 .../webmonitor/handlers/RequestHandler.java     | 16 +++--
 .../handlers/TaskManagersHandler.java           |  5 +-
 .../metrics/AbstractMetricsHandler.java         |  6 +-
 .../metrics/AbstractMetricsHandlerTest.java     |  6 +-
 21 files changed, 130 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 5008a8c..aba4e17 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -23,6 +23,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
@@ -61,7 +62,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 
 	public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address";
 	
-	private final RequestHandler handler;	
+	private final RequestHandler handler;
 
 	public RuntimeMonitorHandler(
 			RequestHandler handler,
@@ -75,7 +76,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 
 	@Override
 	protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) {
-		DefaultFullHttpResponse response;
+		FullHttpResponse response;
 
 		try {
 			// we only pass the first element in the list to the handlers.
@@ -93,14 +94,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 			queryParams.put(WEB_MONITOR_ADDRESS_KEY,
 				(httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort());
 
-			String result = handler.handleRequest(pathParams, queryParams, jobManager);
-			byte[] bytes = result.getBytes(ENCODING);
-
-			response = new DefaultFullHttpResponse(
-					HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
-
-			response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
-			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+			response = handler.handleRequest(pathParams, queryParams, jobManager);
 		}
 		catch (NotFoundException e) {
 			// this should result in a 404 error code (not found)
@@ -108,6 +102,8 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 					: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
 			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
 			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
+			response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
 			LOG.debug("Error while handling request", e);
 		}
 		catch (Exception e) {
@@ -115,11 +111,14 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
 					HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
 			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
+			response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
 			LOG.debug("Error while handling request", e);
 		}
 
-		response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
-		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+		response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
+
 		KeepAliveWrite.flush(ctx, routed.request(), response);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index ff28d4e..8cd70e9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -30,7 +30,7 @@ import java.util.Map;
  * Base class for request handlers whose response depends on an ExecutionGraph
  * that can be retrieved via "jobid" parameter.
  */
-public abstract class AbstractExecutionGraphRequestHandler implements RequestHandler {
+public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonRequestHandler {
 	
 	private final ExecutionGraphHolder executionGraphHolder;
 	
@@ -39,7 +39,7 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		String jidString = pathParams.get("jobid");
 		if (jidString == null) {
 			throw new RuntimeException("JobId parameter missing");

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
new file mode 100644
index 0000000..ae163cb
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.runtime.instance.ActorGateway;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+/**
+ * Base class for most request handlers. The handlers must produce a JSON response.
+ */
+public abstract class AbstractJsonRequestHandler implements RequestHandler {
+
+	private static final Charset ENCODING = Charset.forName("UTF-8");
+
+	@Override
+	public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+		String result = handleJsonRequest(pathParams, queryParams, jobManager);
+		byte[] bytes = result.getBytes(ENCODING);
+
+		DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+				HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
+
+		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+		return response;
+	}
+
+	/**
+	 * Core method that handles the request and generates the response. The method needs to
+	 * respond with a valid JSON string. Exceptions may be thrown and will be handled.
+	 *
+	 * @param pathParams The map of REST path parameters, decoded by the router.
+	 * @param queryParams The map of query parameters.
+	 * @param jobManager The JobManager actor.
+	 *
+	 * @return The JSON string that is the HTTP response.
+	 *
+	 * @throws Exception Handlers may forward exceptions. Exceptions of type
+	 *         {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
+	 *         response with the exception message, other exceptions will cause a HTTP 500 response
+	 *         with the exception stack trace.
+	 */
+	public abstract String handleJsonRequest(
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			ActorGateway jobManager) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index b7389c4..99ef3d9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -36,7 +36,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Responder that returns the status of the Flink cluster, such as how many
  * TaskManagers are currently connected, and how many jobs are running.
  */
-public class ClusterOverviewHandler implements RequestHandler {
+public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 
 	private static final String version = EnvironmentInformation.getVersion();
 
@@ -49,7 +49,7 @@ public class ClusterOverviewHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		// we need no parameters, get all requests
 		try {
 			if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 11f2a3b..b690c56 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -38,7 +37,7 @@ import static java.util.Objects.requireNonNull;
  * May serve the IDs of current jobs, or past jobs, depending on whether this handler is
  * given the JobManager or Archive Actor Reference.
  */
-public class CurrentJobIdsHandler implements RequestHandler {
+public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 
 	private final FiniteDuration timeout;
 	
@@ -47,7 +46,7 @@ public class CurrentJobIdsHandler implements RequestHandler {
 	}
 	
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		// we need no parameters, get all requests
 		try {
 			if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 571f911..07064da 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -36,7 +36,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Request handler that returns a summary of the job status.
  */
-public class CurrentJobsOverviewHandler implements RequestHandler {
+public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 
 	private final FiniteDuration timeout;
 	
@@ -55,7 +55,7 @@ public class CurrentJobsOverviewHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			if (jobManager != null) {
 				Future<Object> future = jobManager.ask(

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index debb24c..6fe072b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -31,7 +31,7 @@ import java.util.TimeZone;
  * against this web server should behave. It defines for example the refresh interval,
  * and time zone of the server timestamps.
  */
-public class DashboardConfigHandler implements RequestHandler {
+public class DashboardConfigHandler extends AbstractJsonRequestHandler {
 	
 	private final String configString;
 	
@@ -67,7 +67,7 @@ public class DashboardConfigHandler implements RequestHandler {
 	}
 	
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		return this.configString;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index 67673e2..ba32d0d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -22,13 +22,13 @@ import org.apache.flink.runtime.instance.ActorGateway;
 
 import java.util.Map;
 
-public class JarAccessDeniedHandler implements RequestHandler {
+public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
 
 	private static final String ERROR_MESSAGE = "{\"error\": \"Web submission interface is not " +
 			"available for this cluster. To enable it, set the configuration key ' jobmanager.web.submit.enable.'\"}";
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		return ERROR_MESSAGE;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index 9da54c1..1e23f1f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -47,7 +47,7 @@ import java.util.Map;
 /**
  * Abstract handler for fetching plan for a jar or running a jar.
  */
-public abstract class JarActionHandler implements RequestHandler {
+public abstract class JarActionHandler extends AbstractJsonRequestHandler {
 	
 	private final File jarDir;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 6e6c520..ae959a5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -29,7 +29,7 @@ import java.util.Map;
 /**
  * Handles requests for deletion of jars.
  */
-public class JarDeleteHandler implements RequestHandler {
+public class JarDeleteHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
@@ -38,7 +38,7 @@ public class JarDeleteHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		final String file = pathParams.get("jarid");
 		try {
 			File[] list = jarDir.listFiles(new FilenameFilter() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index c263628..f3cdc30 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -31,7 +31,7 @@ import java.util.Map;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
 
-public class JarListHandler implements RequestHandler {
+public class JarListHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
@@ -40,7 +40,7 @@ public class JarListHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			StringWriter writer = new StringWriter();
 			JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 7e0a810..3a95d6a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -37,7 +37,7 @@ public class JarPlanHandler extends JarActionHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			JobGraph graph = getJobGraphAndClassLoader(pathParams, queryParams).f0;
 			StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 18b0f15..8d3e57f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -48,7 +48,7 @@ public class JarRunHandler extends JarActionHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(pathParams, queryParams);
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 011e8f9..9a3b0e1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -27,7 +27,7 @@ import java.util.UUID;
 /**
  * Handles requests for uploading of jars.
  */
-public class JarUploadHandler implements RequestHandler {
+public class JarUploadHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
@@ -36,7 +36,7 @@ public class JarUploadHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(
+	public String handleJsonRequest(
 				Map<String, String> pathParams,
 				Map<String, String> queryParams,
 				ActorGateway jobManager) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index b17acdc..9f35719 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -28,10 +28,10 @@ import java.util.Map;
 /**
  * Request handler for the CANCEL request.
  */
-public class JobCancellationHandler implements RequestHandler {
+public class JobCancellationHandler extends AbstractJsonRequestHandler {
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
 			if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index 6d9f7e1..11ca931 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -28,7 +28,7 @@ import java.util.Map;
 /**
  * Returns the Job Manager's configuration.
  */
-public class JobManagerConfigHandler implements RequestHandler {
+public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
 
 	private final Configuration config;
 
@@ -37,7 +37,7 @@ public class JobManagerConfigHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index 791790a..0f8c958 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -28,10 +28,10 @@ import java.util.Map;
 /**
  * Request handler for the STOP request.
  */
-public class JobStoppingHandler implements RequestHandler {
+public class JobStoppingHandler extends AbstractJsonRequestHandler {
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
 			if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 0927b7e..c56cfc3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -18,29 +18,35 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import io.netty.handler.codec.http.FullHttpResponse;
 import org.apache.flink.runtime.instance.ActorGateway;
 
 import java.util.Map;
 
 /**
- * Base interface for all request handlers. The handlers must produce a JSOn response.
+ * Base interface for all request handlers.
+ *
+ * <p>Most handlers will want to use the {@link AbstractJsonRequestHandler}
+ * as a starting point, which produces a valid HTTP response.
  */
 public interface RequestHandler {
 
 	/**
 	 * Core method that handles the request and generates the response. The method needs to
-	 * respond with a valid JSON string. Exceptions may be throws and will be handled.
+	 * respond with a full http response, including content-type, content-length, etc.
+	 *
+	 * <p>Exceptions may be throws and will be handled.
 	 * 
 	 * @param pathParams The map of REST path parameters, decoded by the router.
 	 * @param queryParams The map of query parameters.
 	 * @param jobManager The JobManager actor.
-	 * 
-	 * @return The JSON string that is the HTTP response.
+	 *
+	 * @return The full http response.
 	 * 
 	 * @throws Exception Handlers may forward exceptions. Exceptions of type
 	 *         {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
 	 *         response with the exception message, other exceptions will cause a HTTP 500 response
 	 *         with the exception stack trace.
 	 */
-	String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception;
+	FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index b5e9088..c20d4fe 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers;
 import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
 import org.apache.flink.util.StringUtils;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -38,7 +37,7 @@ import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
-public class TaskManagersHandler implements RequestHandler {
+public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 
 	public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
 	
@@ -49,7 +48,7 @@ public class TaskManagersHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			if (jobManager != null) {
 				// whether one task manager's metrics are requested, or all task manager, we

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index 8374523..80126c6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -19,8 +19,8 @@ package org.apache.flink.runtime.webmonitor.metrics;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -38,7 +38,7 @@ import java.util.Map;
  * The handler will then return a list containing the values of the requested metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
  */
-public abstract class AbstractMetricsHandler implements RequestHandler {
+public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
 	private final MetricFetcher fetcher;
 
 	public AbstractMetricsHandler(MetricFetcher fetcher) {
@@ -46,7 +46,7 @@ public abstract class AbstractMetricsHandler implements RequestHandler {
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		fetcher.update();
 		String requestedMetricsList = queryParams.get("get");
 		return requestedMetricsList != null

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index 483dbf6..13a9067 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -48,7 +48,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		pathParams.put("vertexid", "taskid");
 
 		// get list of available metrics
-		String availableList = handler.handleRequest(pathParams, queryParams, null);
+		String availableList = handler.handleJsonRequest(pathParams, queryParams, null);
 
 		assertEquals("[" +
 				"{\"id\":\"8.opname.abc.metric5\"}," +
@@ -59,7 +59,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		// get value for a single metric
 		queryParams.put("get", "8.opname.abc.metric5");
 
-		String metricValue = handler.handleRequest(pathParams, queryParams, null);
+		String metricValue = handler.handleJsonRequest(pathParams, queryParams, null);
 
 		assertEquals("[" +
 				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" +
@@ -70,7 +70,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		// get values for multiple metrics
 		queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4");
 
-		String metricValues = handler.handleRequest(pathParams, queryParams, null);
+		String metricValues = handler.handleJsonRequest(pathParams, queryParams, null);
 
 		assertEquals("[" +
 				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +