You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/28 09:04:29 UTC
[2/2] flink git commit: [FLINK-4787] [runtime-web] Return generic
HttpResponse in RequestHandler
[FLINK-4787] [runtime-web] Return generic HttpResponse in RequestHandler
- Let RequestHandler return a generic HttpResponse instead of a String. This
enables handlers to return custom reponses (differnt http codes, etc.)
- Introduce AbstractJsonRequestHandler for default JSON responses
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fb60091
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fb60091
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fb60091
Branch: refs/heads/master
Commit: 2fb600916860acf2256464659ca60424bbf26857
Parents: e9b20ec
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Oct 11 10:08:14 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Oct 28 11:04:12 2016 +0200
----------------------------------------------------------------------
.../webmonitor/RuntimeMonitorHandler.java | 23 +++---
.../AbstractExecutionGraphRequestHandler.java | 4 +-
.../handlers/AbstractJsonRequestHandler.java | 73 ++++++++++++++++++++
.../handlers/ClusterOverviewHandler.java | 4 +-
.../handlers/CurrentJobIdsHandler.java | 5 +-
.../handlers/CurrentJobsOverviewHandler.java | 4 +-
.../handlers/DashboardConfigHandler.java | 4 +-
.../handlers/JarAccessDeniedHandler.java | 4 +-
.../webmonitor/handlers/JarActionHandler.java | 2 +-
.../webmonitor/handlers/JarDeleteHandler.java | 4 +-
.../webmonitor/handlers/JarListHandler.java | 4 +-
.../webmonitor/handlers/JarPlanHandler.java | 2 +-
.../webmonitor/handlers/JarRunHandler.java | 2 +-
.../webmonitor/handlers/JarUploadHandler.java | 4 +-
.../handlers/JobCancellationHandler.java | 4 +-
.../handlers/JobManagerConfigHandler.java | 4 +-
.../webmonitor/handlers/JobStoppingHandler.java | 4 +-
.../webmonitor/handlers/RequestHandler.java | 16 +++--
.../handlers/TaskManagersHandler.java | 5 +-
.../metrics/AbstractMetricsHandler.java | 6 +-
.../metrics/AbstractMetricsHandlerTest.java | 6 +-
21 files changed, 130 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 5008a8c..aba4e17 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -23,6 +23,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
@@ -61,7 +62,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address";
- private final RequestHandler handler;
+ private final RequestHandler handler;
public RuntimeMonitorHandler(
RequestHandler handler,
@@ -75,7 +76,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
@Override
protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) {
- DefaultFullHttpResponse response;
+ FullHttpResponse response;
try {
// we only pass the first element in the list to the handlers.
@@ -93,14 +94,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
queryParams.put(WEB_MONITOR_ADDRESS_KEY,
(httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort());
- String result = handler.handleRequest(pathParams, queryParams, jobManager);
- byte[] bytes = result.getBytes(ENCODING);
-
- response = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
-
- response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+ response = handler.handleRequest(pathParams, queryParams, jobManager);
}
catch (NotFoundException e) {
// this should result in a 404 error code (not found)
@@ -108,6 +102,8 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
+ response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
LOG.debug("Error while handling request", e);
}
catch (Exception e) {
@@ -115,11 +111,14 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
+ response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
LOG.debug("Error while handling request", e);
}
- response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+ response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
+
KeepAliveWrite.flush(ctx, routed.request(), response);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index ff28d4e..8cd70e9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -30,7 +30,7 @@ import java.util.Map;
* Base class for request handlers whose response depends on an ExecutionGraph
* that can be retrieved via "jobid" parameter.
*/
-public abstract class AbstractExecutionGraphRequestHandler implements RequestHandler {
+public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonRequestHandler {
private final ExecutionGraphHolder executionGraphHolder;
@@ -39,7 +39,7 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
}
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
String jidString = pathParams.get("jobid");
if (jidString == null) {
throw new RuntimeException("JobId parameter missing");
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
new file mode 100644
index 0000000..ae163cb
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.runtime.instance.ActorGateway;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+/**
+ * Base class for most request handlers. The handlers must produce a JSON response.
+ */
+public abstract class AbstractJsonRequestHandler implements RequestHandler {
+
+ private static final Charset ENCODING = Charset.forName("UTF-8");
+
+ @Override
+ public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ String result = handleJsonRequest(pathParams, queryParams, jobManager);
+ byte[] bytes = result.getBytes(ENCODING);
+
+ DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
+
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+ return response;
+ }
+
+ /**
+ * Core method that handles the request and generates the response. The method needs to
+ * respond with a valid JSON string. Exceptions may be thrown and will be handled.
+ *
+ * @param pathParams The map of REST path parameters, decoded by the router.
+ * @param queryParams The map of query parameters.
+ * @param jobManager The JobManager actor.
+ *
+ * @return The JSON string that is the HTTP response.
+ *
+ * @throws Exception Handlers may forward exceptions. Exceptions of type
+ * {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
+ * response with the exception message, other exceptions will cause a HTTP 500 response
+ * with the exception stack trace.
+ */
+ public abstract String handleJsonRequest(
+ Map<String, String> pathParams,
+ Map<String, String> queryParams,
+ ActorGateway jobManager) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index b7389c4..99ef3d9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -36,7 +36,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Responder that returns the status of the Flink cluster, such as how many
* TaskManagers are currently connected, and how many jobs are running.
*/
-public class ClusterOverviewHandler implements RequestHandler {
+public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
private static final String version = EnvironmentInformation.getVersion();
@@ -49,7 +49,7 @@ public class ClusterOverviewHandler implements RequestHandler {
}
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
// we need no parameters, get all requests
try {
if (jobManager != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 11f2a3b..b690c56 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
-
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -38,7 +37,7 @@ import static java.util.Objects.requireNonNull;
* May serve the IDs of current jobs, or past jobs, depending on whether this handler is
* given the JobManager or Archive Actor Reference.
*/
-public class CurrentJobIdsHandler implements RequestHandler {
+public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
private final FiniteDuration timeout;
@@ -47,7 +46,7 @@ public class CurrentJobIdsHandler implements RequestHandler {
}
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
// we need no parameters, get all requests
try {
if (jobManager != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 571f911..07064da 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -36,7 +36,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Request handler that returns a summary of the job status.
*/
-public class CurrentJobsOverviewHandler implements RequestHandler {
+public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
private final FiniteDuration timeout;
@@ -55,7 +55,7 @@ public class CurrentJobsOverviewHandler implements RequestHandler {
}
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
try {
if (jobManager != null) {
Future<Object> future = jobManager.ask(
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index debb24c..6fe072b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -31,7 +31,7 @@ import java.util.TimeZone;
* against this web server should behave. It defines for example the refresh interval,
* and time zone of the server timestamps.
*/
-public class DashboardConfigHandler implements RequestHandler {
+public class DashboardConfigHandler extends AbstractJsonRequestHandler {
private final String configString;
@@ -67,7 +67,7 @@ public class DashboardConfigHandler implements RequestHandler {
}
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
return this.configString;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index 67673e2..ba32d0d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -22,13 +22,13 @@ import org.apache.flink.runtime.instance.ActorGateway;
import java.util.Map;
-public class JarAccessDeniedHandler implements RequestHandler {
+public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
private static final String ERROR_MESSAGE = "{\"error\": \"Web submission interface is not " +
"available for this cluster. To enable it, set the configuration key ' jobmanager.web.submit.enable.'\"}";
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
return ERROR_MESSAGE;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index 9da54c1..1e23f1f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -47,7 +47,7 @@ import java.util.Map;
/**
* Abstract handler for fetching plan for a jar or running a jar.
*/
-public abstract class JarActionHandler implements RequestHandler {
+public abstract class JarActionHandler extends AbstractJsonRequestHandler {
private final File jarDir;
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 6e6c520..ae959a5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -29,7 +29,7 @@ import java.util.Map;
/**
* Handles requests for deletion of jars.
*/
-public class JarDeleteHandler implements RequestHandler {
+public class JarDeleteHandler extends AbstractJsonRequestHandler {
private final File jarDir;
@@ -38,7 +38,7 @@ public class JarDeleteHandler implements RequestHandler {
}
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
final String file = pathParams.get("jarid");
try {
File[] list = jarDir.listFiles(new FilenameFilter() {
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index c263628..f3cdc30 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -31,7 +31,7 @@ import java.util.Map;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
-public class JarListHandler implements RequestHandler {
+public class JarListHandler extends AbstractJsonRequestHandler {
private final File jarDir;
@@ -40,7 +40,7 @@ public class JarListHandler implements RequestHandler {
}
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
try {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 7e0a810..3a95d6a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -37,7 +37,7 @@ public class JarPlanHandler extends JarActionHandler {
}
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
try {
JobGraph graph = getJobGraphAndClassLoader(pathParams, queryParams).f0;
StringWriter writer = new StringWriter();
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 18b0f15..8d3e57f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -48,7 +48,7 @@ public class JarRunHandler extends JarActionHandler {
}
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
try {
Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(pathParams, queryParams);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 011e8f9..9a3b0e1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -27,7 +27,7 @@ import java.util.UUID;
/**
* Handles requests for uploading of jars.
*/
-public class JarUploadHandler implements RequestHandler {
+public class JarUploadHandler extends AbstractJsonRequestHandler {
private final File jarDir;
@@ -36,7 +36,7 @@ public class JarUploadHandler implements RequestHandler {
}
@Override
- public String handleRequest(
+ public String handleJsonRequest(
Map<String, String> pathParams,
Map<String, String> queryParams,
ActorGateway jobManager) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index b17acdc..9f35719 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -28,10 +28,10 @@ import java.util.Map;
/**
* Request handler for the CANCEL request.
*/
-public class JobCancellationHandler implements RequestHandler {
+public class JobCancellationHandler extends AbstractJsonRequestHandler {
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
try {
JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
if (jobManager != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index 6d9f7e1..11ca931 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -28,7 +28,7 @@ import java.util.Map;
/**
* Returns the Job Manager's configuration.
*/
-public class JobManagerConfigHandler implements RequestHandler {
+public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
private final Configuration config;
@@ -37,7 +37,7 @@ public class JobManagerConfigHandler implements RequestHandler {
}
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index 791790a..0f8c958 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -28,10 +28,10 @@ import java.util.Map;
/**
* Request handler for the STOP request.
*/
-public class JobStoppingHandler implements RequestHandler {
+public class JobStoppingHandler extends AbstractJsonRequestHandler {
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
try {
JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
if (jobManager != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 0927b7e..c56cfc3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -18,29 +18,35 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.runtime.instance.ActorGateway;
import java.util.Map;
/**
- * Base interface for all request handlers. The handlers must produce a JSOn response.
+ * Base interface for all request handlers.
+ *
+ * <p>Most handlers will want to use the {@link AbstractJsonRequestHandler}
+ * as a starting point, which produces a valid HTTP response.
*/
public interface RequestHandler {
/**
* Core method that handles the request and generates the response. The method needs to
- * respond with a valid JSON string. Exceptions may be throws and will be handled.
+ * respond with a full http response, including content-type, content-length, etc.
+ *
+ * <p>Exceptions may be throws and will be handled.
*
* @param pathParams The map of REST path parameters, decoded by the router.
* @param queryParams The map of query parameters.
* @param jobManager The JobManager actor.
- *
- * @return The JSON string that is the HTTP response.
+ *
+ * @return The full http response.
*
* @throws Exception Handlers may forward exceptions. Exceptions of type
* {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
* response with the exception message, other exceptions will cause a HTTP 500 response
* with the exception stack trace.
*/
- String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception;
+ FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index b5e9088..c20d4fe 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers;
import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
import org.apache.flink.util.StringUtils;
-
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -38,7 +37,7 @@ import java.util.Map;
import static java.util.Objects.requireNonNull;
-public class TaskManagersHandler implements RequestHandler {
+public class TaskManagersHandler extends AbstractJsonRequestHandler {
public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
@@ -49,7 +48,7 @@ public class TaskManagersHandler implements RequestHandler {
}
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
try {
if (jobManager != null) {
// whether one task manager's metrics are requested, or all task manager, we
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index 8374523..80126c6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -19,8 +19,8 @@ package org.apache.flink.runtime.webmonitor.metrics;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -38,7 +38,7 @@ import java.util.Map;
* The handler will then return a list containing the values of the requested metrics.
* {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
*/
-public abstract class AbstractMetricsHandler implements RequestHandler {
+public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
private final MetricFetcher fetcher;
public AbstractMetricsHandler(MetricFetcher fetcher) {
@@ -46,7 +46,7 @@ public abstract class AbstractMetricsHandler implements RequestHandler {
}
@Override
- public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
fetcher.update();
String requestedMetricsList = queryParams.get("get");
return requestedMetricsList != null
http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index 483dbf6..13a9067 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -48,7 +48,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
pathParams.put("vertexid", "taskid");
// get list of available metrics
- String availableList = handler.handleRequest(pathParams, queryParams, null);
+ String availableList = handler.handleJsonRequest(pathParams, queryParams, null);
assertEquals("[" +
"{\"id\":\"8.opname.abc.metric5\"}," +
@@ -59,7 +59,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
// get value for a single metric
queryParams.put("get", "8.opname.abc.metric5");
- String metricValue = handler.handleRequest(pathParams, queryParams, null);
+ String metricValue = handler.handleJsonRequest(pathParams, queryParams, null);
assertEquals("[" +
"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" +
@@ -70,7 +70,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
// get values for multiple metrics
queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4");
- String metricValues = handler.handleRequest(pathParams, queryParams, null);
+ String metricValues = handler.handleJsonRequest(pathParams, queryParams, null);
assertEquals("[" +
"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +