You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/19 22:26:28 UTC
flink git commit: [FLINK-7527] [rest] Let AbstractRestHandler extend
RedirectHandler
Repository: flink
Updated Branches:
refs/heads/master a66315a5c -> 75e84e04f
[FLINK-7527] [rest] Let AbstractRestHandler extend RedirectHandler
By letting the AbstractRestHandler extend the RedirectHandler, we add redirection
capabilities to the AbstractRestHandler.
This closes #4597.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75e84e04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75e84e04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75e84e04
Branch: refs/heads/master
Commit: 75e84e04f5a3e2766e331fd05ddb725fe9b00d99
Parents: a66315a
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 17 15:04:19 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 20 00:22:27 2017 +0200
----------------------------------------------------------------------
.../runtime/webmonitor/RedirectHandler.java | 142 ---------------
.../webmonitor/RuntimeMonitorHandler.java | 5 +-
.../runtime/webmonitor/files/MimeTypes.java | 121 -------------
.../files/StaticFileServerHandler.java | 3 +-
.../handlers/HandlerRedirectUtils.java | 129 --------------
.../handlers/TaskManagerLogHandler.java | 2 +-
.../runtime/webmonitor/RedirectHandlerTest.java | 3 +-
.../webmonitor/WebRuntimeMonitorITCase.java | 4 +-
.../runtime/webmonitor/files/MimeTypesTest.java | 2 +
.../handlers/HandlerRedirectUtilsTest.java | 1 +
.../flink/runtime/rest/RestServerEndpoint.java | 4 +-
.../rest/handler/AbstractRestHandler.java | 100 +++--------
.../rest/handler/PipelineErrorHandler.java | 7 +-
.../runtime/rest/handler/RedirectHandler.java | 173 +++++++++++++++++++
.../runtime/rest/handler/RouterHandler.java | 3 +-
.../rest/handler/util/HandlerRedirectUtils.java | 128 ++++++++++++++
.../runtime/rest/handler/util/HandlerUtils.java | 141 +++++++++++++++
.../runtime/rest/handler/util/MimeTypes.java | 121 +++++++++++++
.../flink/runtime/rest/RestEndpointITCase.java | 54 ++++--
19 files changed, 651 insertions(+), 492 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RedirectHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RedirectHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RedirectHandler.java
deleted file mode 100644
index 589d1a5..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RedirectHandler.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.OptionalConsumer;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
- * REST endpoints.
- *
- * @param <T> type of the leader to retrieve
- */
-@ChannelHandler.Sharable
-public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
-
- protected final Logger logger = LoggerFactory.getLogger(getClass());
-
- private final CompletableFuture<String> localAddressFuture;
-
- protected final GatewayRetriever<T> leaderRetriever;
-
- protected final Time timeout;
-
- /** Whether the web service has https enabled. */
- protected final boolean httpsEnabled;
-
- private String localAddress;
-
- protected RedirectHandler(
- @Nonnull CompletableFuture<String> localAddressFuture,
- @Nonnull GatewayRetriever<T> leaderRetriever,
- @Nonnull Time timeout,
- boolean httpsEnabled) {
- this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
- this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
- this.timeout = Preconditions.checkNotNull(timeout);
- this.httpsEnabled = httpsEnabled;
- localAddress = null;
- }
-
- @Override
- protected void channelRead0(
- ChannelHandlerContext channelHandlerContext,
- Routed routed) throws Exception {
-
- if (localAddressFuture.isDone()) {
- if (localAddress == null) {
- try {
- localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- } catch (Exception e) {
- logger.error("Could not obtain local address.", e);
- KeepAliveWrite.flush(channelHandlerContext, routed.request(), HandlerRedirectUtils.getErrorResponse(e));
- }
- }
-
- OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
-
- optLeaderConsumer.ifPresent(
- (T gateway) -> {
- OptionalConsumer<CompletableFuture<String>> optRedirectAddressConsumer = OptionalConsumer.of(
- HandlerRedirectUtils.getRedirectAddress(
- localAddress,
- gateway,
- timeout));
-
- optRedirectAddressConsumer
- .ifPresent(
- (CompletableFuture<String> redirectAddressFuture) ->
- redirectAddressFuture.whenComplete(
- (String redirectAddress, Throwable throwable) -> {
- HttpResponse response;
-
- if (throwable != null) {
- logger.error("Could not retrieve the redirect address.", throwable);
- response = HandlerRedirectUtils.getErrorResponse(throwable);
- } else {
- response = HandlerRedirectUtils.getRedirectResponse(
- redirectAddress,
- routed.path(),
- httpsEnabled);
- }
-
- KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
- }
- ))
- .ifNotPresent(
- () -> {
- try {
- respondAsLeader(channelHandlerContext, routed, gateway);
- } catch (Exception e) {
- logger.error("Error while responding as leader.", e);
- KeepAliveWrite.flush(
- channelHandlerContext,
- routed.request(),
- HandlerRedirectUtils.getErrorResponse(e));
- }
- });
- }
- ).ifNotPresent(
- () -> KeepAliveWrite.flush(channelHandlerContext, routed.request(), HandlerRedirectUtils.getUnavailableResponse()));
- } else {
- KeepAliveWrite.flush(channelHandlerContext, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
- }
- }
-
- protected abstract void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 6e388e1..cdcd0fe 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
@@ -45,7 +46,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import static org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils.ENCODING;
+import static org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils.ENCODING;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
deleted file mode 100644
index 4834cbc..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.files;
-
-import java.util.HashMap;
-
-/**
- * Simple utility class that resolves file extensions to MIME types.
- *
- * <p>There are various solutions built into Java that depend on extra resource and configuration
- * files. They are designed to be composable and extensible, but also unfortunately tricky to control.
- * This is meant to be a simple solution that may eventually be subsumed by a better one.
- */
-public class MimeTypes {
-
- /** The default mime type. */
- private static final String DEFAULT_MIME_TYPE = "application/octet-stream";
-
- /** The map with the constants. */
- private static final HashMap<String, String> MIME_MAP = new HashMap<String, String>();
-
- /**
- * Gets the MIME type for the file with the given extension. If the mime type is not recognized,
- * this method returns null.
- *
- * @param fileExtension The file extension.
- * @return The MIME type, or {@code null}, if the file extension is not recognized.
- */
- public static String getMimeTypeForExtension(String fileExtension) {
- return MIME_MAP.get(fileExtension.toLowerCase());
- }
-
- /**
- * Gets the MIME type for the file with the given name, by extension. This method tries to extract
- * the file extension and then use the {@link #getMimeTypeForExtension(String)} to determine the
- * MIME type. If the extension cannot be determined, or the extension is unrecognized, this method
- * return {@code null}.
- *
- * @param fileName The file name.
- * @return The MIME type, or {@code null}, if the file's extension is not recognized.
- */
- public static String getMimeTypeForFileName(String fileName) {
- int extensionPos = fileName.lastIndexOf('.');
- if (extensionPos >= 1 && extensionPos < fileName.length() - 1) {
- String extension = fileName.substring(extensionPos + 1);
- return getMimeTypeForExtension(extension);
- }
- else {
- return null;
- }
- }
-
- /**
- * Gets the default MIME type, which is {@code "application/octet-stream"}.
- *
- * @return The default MIME type.
- */
- public static String getDefaultMimeType() {
- return DEFAULT_MIME_TYPE;
- }
-
- // ------------------------------------------------------------------------
- // prevent instantiation
- // ------------------------------------------------------------------------
-
- private MimeTypes() {}
-
- // ------------------------------------------------------------------------
- // initialization
- // ------------------------------------------------------------------------
-
- static {
- // text types
- MIME_MAP.put("html", "text/html");
- MIME_MAP.put("htm", "text/html");
- MIME_MAP.put("css", "text/css");
- MIME_MAP.put("txt", "text/plain");
- MIME_MAP.put("log", "text/plain");
- MIME_MAP.put("out", "text/plain");
- MIME_MAP.put("err", "text/plain");
- MIME_MAP.put("xml", "text/xml");
- MIME_MAP.put("csv", "text/csv");
-
- // application types
- MIME_MAP.put("js", "application/javascript");
- MIME_MAP.put("json", "application/json");
-
- // image types
- MIME_MAP.put("png", "image/png");
- MIME_MAP.put("jpg", "image/jpeg");
- MIME_MAP.put("jpeg", "image/jpeg");
- MIME_MAP.put("gif", "image/gif");
- MIME_MAP.put("svg", "image/svg+xml");
- MIME_MAP.put("tiff", "image/tiff");
- MIME_MAP.put("tff", "image/tiff");
- MIME_MAP.put("bmp", "image/bmp");
-
- // fonts
- MIME_MAP.put("woff", "application/font-woff");
- MIME_MAP.put("woff2", "application/font-woff2");
- MIME_MAP.put("ttf", "font/ttf");
- MIME_MAP.put("otf", "font/opentype");
- MIME_MAP.put("eot", "font/application/vnd.ms-fontobject");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index 520aa53..e6e632e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -27,7 +27,8 @@ package org.apache.flink.runtime.webmonitor.files;
*****************************************************************************/
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.webmonitor.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.util.MimeTypes;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
deleted file mode 100644
index 9bb93cc..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.files.MimeTypes;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-
-import javax.annotation.Nullable;
-
-import java.nio.charset.Charset;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Utilities to extract a redirect address.
- *
- * <p>This is necessary at the moment, because many execution graph structures are not serializable.
- * The proper solution here is to have these serializable and transparently work with the leading
- * job manager instead of redirecting.
- */
-public class HandlerRedirectUtils {
-
- public static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
-
- public static Optional<CompletableFuture<String>> getRedirectAddress(
- String localJobManagerAddress,
- RestfulGateway restfulGateway,
- Time timeout) {
-
- final String leaderAddress = restfulGateway.getAddress();
-
- final String jobManagerName = localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1);
-
- if (!localJobManagerAddress.equals(leaderAddress) &&
- !leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) {
-
- return Optional.of(restfulGateway.requestRestAddress(timeout));
-
- } else {
- return Optional.empty();
- }
- }
-
- public static HttpResponse getRedirectResponse(String redirectAddress, String path, boolean httpsEnabled) {
- checkNotNull(redirectAddress, "Redirect address");
- checkNotNull(path, "Path");
-
- String protocol = httpsEnabled ? "https" : "http";
- String newLocation = String.format("%s://%s%s", protocol, redirectAddress, path);
-
- HttpResponse redirectResponse = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.TEMPORARY_REDIRECT);
- redirectResponse.headers().set(HttpHeaders.Names.LOCATION, newLocation);
- redirectResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
-
- return redirectResponse;
- }
-
- public static HttpResponse getUnavailableResponse() {
- String result = "Service temporarily unavailable due to an ongoing leader election. Please refresh.";
- byte[] bytes = result.getBytes(ConfigConstants.DEFAULT_CHARSET);
-
- HttpResponse unavailableResponse = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE, Unpooled.wrappedBuffer(bytes));
-
- unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytes.length);
- unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, MimeTypes.getMimeTypeForExtension("txt"));
-
- return unavailableResponse;
- }
-
- public static HttpResponse getResponse(HttpResponseStatus status, @Nullable String message) {
- ByteBuf messageByteBuf = message == null ? Unpooled.buffer(0)
- : Unpooled.wrappedBuffer(message.getBytes(ENCODING));
- FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, messageByteBuf);
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
- return response;
- }
-
- public static HttpResponse getErrorResponse(Throwable throwable) {
- return getErrorResponse(throwable, HttpResponseStatus.INTERNAL_SERVER_ERROR);
- }
-
- public static HttpResponse getErrorResponse(Throwable throwable, HttpResponseStatus status) {
- byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING);
- FullHttpResponse response = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1,
- status,
- Unpooled.wrappedBuffer(bytes));
-
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
- return response;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index b3238af..bb9b7f5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.RedirectHandler;
import org.apache.flink.runtime.webmonitor.WebHandler;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
index e434a1d..2bd6673 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 59f8b9d..72498ee 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -31,10 +31,10 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.rest.handler.util.MimeTypes;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.webmonitor.files.MimeTypes;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
@@ -330,7 +330,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
HttpTestClient.SimpleHttpResponse response = client.getNextResponse();
assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE, response.getStatus());
- assertEquals(MimeTypes.getMimeTypeForExtension("txt"), response.getType());
+ assertEquals(MimeTypes.getMimeTypeForExtension("json"), response.getType());
assertTrue(response.getContent().contains("refresh"));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java
index 2594b11..0a8d9d8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/files/MimeTypesTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.webmonitor.files;
+import org.apache.flink.runtime.rest.handler.util.MimeTypes;
+
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
index a8562b3..81553e4 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 735d3f8..88c7f78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -74,7 +74,7 @@ public abstract class RestServerEndpoint {
* This method is called at the beginning of {@link #start()} to setup all handlers that the REST server endpoint
* implementation requires.
*/
- protected abstract Collection<AbstractRestHandler<?, ?, ?>> initializeHandlers();
+ protected abstract Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers();
/**
* Starts this REST server endpoint.
@@ -185,7 +185,7 @@ public abstract class RestServerEndpoint {
}
}
- private static <R extends RequestBody, P extends ResponseBody> void registerHandler(Router router, AbstractRestHandler<R, P, ?> handler) {
+ private static <R extends RequestBody, P extends ResponseBody> void registerHandler(Router router, AbstractRestHandler<?, R, P, ?> handler) {
switch (handler.getMessageHeaders().getHttpMethod()) {
case GET:
router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 864d74c..95483c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -18,31 +18,25 @@
package org.apache.flink.runtime.rest.handler;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import com.fasterxml.jackson.core.JsonParseException;
@@ -53,14 +47,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.io.StringWriter;
import java.util.concurrent.CompletableFuture;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
/**
* Super class for netty-based handlers that work with {@link RequestBody}s and {@link ResponseBody}s.
*
@@ -70,14 +58,20 @@ import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVer
* @param <P> type of outgoing responses
*/
@ChannelHandler.Sharable
-public abstract class AbstractRestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends SimpleChannelInboundHandler<Routed> {
+public abstract class AbstractRestHandler<T extends RestfulGateway, R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends RedirectHandler<T> {
protected final Logger log = LoggerFactory.getLogger(getClass());
private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper();
private final MessageHeaders<R, P, M> messageHeaders;
- protected AbstractRestHandler(MessageHeaders<R, P, M> messageHeaders) {
+ protected AbstractRestHandler(
+ CompletableFuture<String> localAddressFuture,
+ GatewayRetriever<T> leaderRetriever,
+ Time timeout,
+ boolean httpsEnabled,
+ MessageHeaders<R, P, M> messageHeaders) {
+ super(localAddressFuture, leaderRetriever, timeout, httpsEnabled);
this.messageHeaders = messageHeaders;
}
@@ -86,7 +80,7 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
}
@Override
- protected void channelRead0(final ChannelHandlerContext ctx, Routed routed) throws Exception {
+ protected void respondAsLeader(final ChannelHandlerContext ctx, Routed routed, T gateway) throws Exception {
if (log.isDebugEnabled()) {
log.debug("Received request " + routed.request().getUri() + '.');
}
@@ -98,7 +92,7 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
// FullHttpRequests.
log.error("Implementation error: Received a request that wasn't a FullHttpRequest.");
- sendErrorResponse(new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST, ctx, httpRequest);
+ HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST);
return;
}
@@ -110,7 +104,7 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
request = mapper.readValue("{}", messageHeaders.getRequestClass());
} catch (JsonParseException | JsonMappingException je) {
log.error("Implementation error: Get request bodies must have a no-argument constructor.", je);
- sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+ HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
return;
}
} else {
@@ -119,7 +113,7 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
request = mapper.readValue(in, messageHeaders.getRequestClass());
} catch (JsonParseException | JsonMappingException je) {
log.error("Failed to read request.", je);
- sendErrorResponse(new ErrorResponseBody(String.format("Request did not match expected format %s.", messageHeaders.getRequestClass().getSimpleName())), HttpResponseStatus.BAD_REQUEST, ctx, httpRequest);
+ HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody(String.format("Request did not match expected format %s.", messageHeaders.getRequestClass().getSimpleName())), HttpResponseStatus.BAD_REQUEST);
return;
}
}
@@ -127,7 +121,7 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
CompletableFuture<P> response;
try {
HandlerRequest<R, M> handlerRequest = new HandlerRequest<>(request, messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
- response = handleRequest(handlerRequest);
+ response = handleRequest(handlerRequest, gateway);
} catch (Exception e) {
response = FutureUtils.completedExceptionally(e);
}
@@ -136,18 +130,18 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
if (error != null) {
if (error instanceof RestHandlerException) {
RestHandlerException rhe = (RestHandlerException) error;
- sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getHttpResponseStatus(), ctx, httpRequest);
+ HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody(rhe.getErrorMessage()), rhe.getHttpResponseStatus());
} else {
log.error("Implementation error: Unhandled exception.", error);
- sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+ HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
} else {
- sendResponse(messageHeaders.getResponseStatusCode(), resp, ctx, httpRequest);
+ HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode());
}
});
} catch (Exception e) {
log.error("Request processing failed.", e);
- sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+ HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
@@ -162,57 +156,9 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
* {@link HttpResponseStatus#INTERNAL_SERVER_ERROR} will be returned.
*
* @param request request that should be handled
+ * @param gateway leader gateway
* @return future containing a handler response
* @throws RestHandlerException if the handling failed
*/
- protected abstract CompletableFuture<P> handleRequest(@Nonnull HandlerRequest<R, M> request) throws RestHandlerException;
-
- private static <P extends ResponseBody> void sendResponse(HttpResponseStatus statusCode, P response, ChannelHandlerContext ctx, HttpRequest httpRequest) {
- StringWriter sw = new StringWriter();
- try {
- mapper.writeValue(sw, response);
- } catch (IOException ioe) {
- sendErrorResponse(new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
- return;
- }
- sendResponse(ctx, httpRequest, statusCode, sw.toString());
- }
-
- static void sendErrorResponse(ErrorResponseBody error, HttpResponseStatus statusCode, ChannelHandlerContext ctx, HttpRequest httpRequest) {
-
- StringWriter sw = new StringWriter();
- try {
- mapper.writeValue(sw, error);
- } catch (IOException e) {
- // this should never happen
- sendResponse(ctx, httpRequest, HttpResponseStatus.INTERNAL_SERVER_ERROR, "Internal server error. Could not map error response to JSON.");
- }
- sendResponse(ctx, httpRequest, statusCode, sw.toString());
- }
-
- private static void sendResponse(@Nonnull ChannelHandlerContext ctx, @Nonnull HttpRequest httpRequest, @Nonnull HttpResponseStatus statusCode, @Nonnull String message) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, statusCode);
-
- response.headers().set(CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE);
-
- if (HttpHeaders.isKeepAlive(httpRequest)) {
- response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
- }
-
- byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET);
- ByteBuf b = Unpooled.copiedBuffer(buf);
- HttpHeaders.setContentLength(response, buf.length);
-
- // write the initial line and the header.
- ctx.write(response);
-
- ctx.write(b);
-
- ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-
- // close the connection, if no keep-alive is needed
- if (!HttpHeaders.isKeepAlive(httpRequest)) {
- lastContentFuture.addListener(ChannelFutureListener.CLOSE);
- }
- }
+ protected abstract CompletableFuture<P> handleRequest(@Nonnull HandlerRequest<R, M> request, @Nonnull T gateway) throws RestHandlerException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
index 14e643c..b43afdc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rest.handler;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -45,7 +46,11 @@ public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpReques
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest message) {
// we can't deal with this message. No one in the pipeline handled it. Log it.
logger.warn("Unknown message received: {}", message);
- AbstractRestHandler.sendErrorResponse(new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST, ctx, message);
+ HandlerUtils.sendErrorResponse(
+ ctx,
+ message,
+ new ErrorResponseBody("Bad request received."),
+ HttpResponseStatus.BAD_REQUEST);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
new file mode 100644
index 0000000..9f63456
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.OptionalConsumer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
+ * REST endpoints.
+ *
+ * @param <T> type of the leader to retrieve
+ */
+@ChannelHandler.Sharable
+public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private final CompletableFuture<String> localAddressFuture;
+
+ protected final GatewayRetriever<T> leaderRetriever;
+
+ protected final Time timeout;
+
+ /** Whether the web service has https enabled. */
+ protected final boolean httpsEnabled;
+
+ private String localAddress;
+
+ protected RedirectHandler(
+ @Nonnull CompletableFuture<String> localAddressFuture,
+ @Nonnull GatewayRetriever<T> leaderRetriever,
+ @Nonnull Time timeout,
+ boolean httpsEnabled) {
+ this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
+ this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ this.httpsEnabled = httpsEnabled;
+ localAddress = null;
+ }
+
+ @Override
+ protected void channelRead0(
+ ChannelHandlerContext channelHandlerContext,
+ Routed routed) throws Exception {
+
+ try {
+ if (localAddressFuture.isDone()) {
+ if (localAddress == null) {
+ try {
+ localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ logger.error("Could not obtain local address.", e);
+
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody("Fatal error. Could not obtain local address. Please try to refresh."),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
+
+ optLeaderConsumer.ifPresent(
+ (T gateway) -> {
+ OptionalConsumer<CompletableFuture<String>> optRedirectAddressConsumer = OptionalConsumer.of(
+ HandlerRedirectUtils.getRedirectAddress(
+ localAddress,
+ gateway,
+ timeout));
+
+ optRedirectAddressConsumer
+ .ifPresent(
+ (CompletableFuture<String> redirectAddressFuture) ->
+ redirectAddressFuture.whenComplete(
+ (String redirectAddress, Throwable throwable) -> {
+ if (throwable != null) {
+ logger.error("Could not retrieve the redirect address.", throwable);
+
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody("Could not retrieve the redirect address of the current leader. Please try to refresh."),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ } else {
+ HttpResponse response = HandlerRedirectUtils.getRedirectResponse(
+ redirectAddress,
+ routed.path(),
+ httpsEnabled);
+
+ KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
+ }
+ }
+ ))
+ .ifNotPresent(
+ () -> {
+ try {
+ respondAsLeader(channelHandlerContext, routed, gateway);
+ } catch (Exception e) {
+ logger.error("Error while responding as leader.", e);
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody("Error while responding to the request."),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ });
+ }
+ ).ifNotPresent(
+ () ->
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody("Service temporarily unavailable due to an ongoing leader election. Please refresh."),
+ HttpResponseStatus.SERVICE_UNAVAILABLE));
+ } else {
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody("Service temporarily unavailable due to an ongoing leader election. Please refresh."),
+ HttpResponseStatus.SERVICE_UNAVAILABLE);
+ }
+ } catch (Throwable throwable) {
+ logger.warn("Error occurred while processing web request.", throwable);
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody("Error occurred in RedirectHandler: " + throwable.getMessage() + '.'),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ protected abstract void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
index 72b779b..cfc456f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rest.handler;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -42,6 +43,6 @@ public class RouterHandler extends Handler {
@Override
protected void respondNotFound(ChannelHandlerContext ctx, HttpRequest request) {
- AbstractRestHandler.sendErrorResponse(new ErrorResponseBody("Not found."), HttpResponseStatus.NOT_FOUND, ctx, request);
+ HandlerUtils.sendErrorResponse(ctx, request, new ErrorResponseBody("Not found."), HttpResponseStatus.NOT_FOUND);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRedirectUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRedirectUtils.java
new file mode 100644
index 0000000..6f988a4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRedirectUtils.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.util;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+
+import javax.annotation.Nullable;
+
+import java.nio.charset.Charset;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utilities to extract a redirect address.
+ *
+ * <p>This is necessary at the moment, because many execution graph structures are not serializable.
+ * The proper solution here is to have these serializable and transparently work with the leading
+ * job manager instead of redirecting.
+ */
+public class HandlerRedirectUtils {
+
+ public static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
+
+ public static Optional<CompletableFuture<String>> getRedirectAddress(
+ String localJobManagerAddress,
+ RestfulGateway restfulGateway,
+ Time timeout) {
+
+ final String leaderAddress = restfulGateway.getAddress();
+
+ final String jobManagerName = localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1);
+
+ if (!localJobManagerAddress.equals(leaderAddress) &&
+ !leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) {
+
+ return Optional.of(restfulGateway.requestRestAddress(timeout));
+
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ public static HttpResponse getRedirectResponse(String redirectAddress, String path, boolean httpsEnabled) {
+ checkNotNull(redirectAddress, "Redirect address");
+ checkNotNull(path, "Path");
+
+ String protocol = httpsEnabled ? "https" : "http";
+ String newLocation = String.format("%s://%s%s", protocol, redirectAddress, path);
+
+ HttpResponse redirectResponse = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.TEMPORARY_REDIRECT);
+ redirectResponse.headers().set(HttpHeaders.Names.LOCATION, newLocation);
+ redirectResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
+
+ return redirectResponse;
+ }
+
+ public static HttpResponse getUnavailableResponse() {
+ String result = "Service temporarily unavailable due to an ongoing leader election. Please refresh.";
+ byte[] bytes = result.getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+ HttpResponse unavailableResponse = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE, Unpooled.wrappedBuffer(bytes));
+
+ unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytes.length);
+ unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, MimeTypes.getMimeTypeForExtension("txt"));
+
+ return unavailableResponse;
+ }
+
+ public static HttpResponse getResponse(HttpResponseStatus status, @Nullable String message) {
+ ByteBuf messageByteBuf = message == null ? Unpooled.buffer(0)
+ : Unpooled.wrappedBuffer(message.getBytes(ENCODING));
+ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, messageByteBuf);
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+ return response;
+ }
+
+ public static HttpResponse getErrorResponse(Throwable throwable) {
+ return getErrorResponse(throwable, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+
+ public static HttpResponse getErrorResponse(Throwable throwable, HttpResponseStatus status) {
+ byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING);
+ FullHttpResponse response = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1,
+ status,
+ Unpooled.wrappedBuffer(bytes));
+
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+ return response;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
new file mode 100644
index 0000000..0d7483a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Utilities for the REST handlers.
+ */
+public class HandlerUtils {
+
+ private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper();
+
+ /**
+ * Sends the given response and status code to the given channel.
+ *
+ * @param channelHandlerContext identifying the open channel
+ * @param httpRequest originating http request
+ * @param response which should be sent
+ * @param statusCode of the message to send
+ * @param <P> type of the response
+ */
+ public static <P extends ResponseBody> void sendResponse(
+ ChannelHandlerContext channelHandlerContext,
+ HttpRequest httpRequest,
+ P response,
+ HttpResponseStatus statusCode) {
+ StringWriter sw = new StringWriter();
+ try {
+ mapper.writeValue(sw, response);
+ } catch (IOException ioe) {
+ sendErrorResponse(channelHandlerContext, httpRequest, new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ return;
+ }
+ sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode);
+ }
+
+ /**
+ * Sends the given error response and status code to the given channel.
+ *
+ * @param channelHandlerContext identifying the open channel
+ * @param httpRequest originating http request
+ * @param errorMessage which should be sent
+ * @param statusCode of the message to send
+ */
+ public static void sendErrorResponse(
+ ChannelHandlerContext channelHandlerContext,
+ HttpRequest httpRequest,
+ ErrorResponseBody errorMessage,
+ HttpResponseStatus statusCode) {
+
+ StringWriter sw = new StringWriter();
+ try {
+ mapper.writeValue(sw, errorMessage);
+ } catch (IOException e) {
+ // this should never happen
+ sendResponse(channelHandlerContext, httpRequest, "Internal server error. Could not map error response to JSON.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode);
+ }
+
+ /**
+ * Sends the given response and status code to the given channel.
+ *
+ * @param channelHandlerContext identifying the open channel
+ * @param httpRequest originating http request
+ * @param message which should be sent
+ * @param statusCode of the message to send
+ */
+ public static void sendResponse(
+ @Nonnull ChannelHandlerContext channelHandlerContext,
+ @Nonnull HttpRequest httpRequest,
+ @Nonnull String message,
+ @Nonnull HttpResponseStatus statusCode) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, statusCode);
+
+ response.headers().set(CONTENT_TYPE, "application/json");
+
+ if (HttpHeaders.isKeepAlive(httpRequest)) {
+ response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ }
+
+ byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET);
+ ByteBuf b = Unpooled.copiedBuffer(buf);
+ HttpHeaders.setContentLength(response, buf.length);
+
+ // write the initial line and the header.
+ channelHandlerContext.write(response);
+
+ channelHandlerContext.write(b);
+
+ ChannelFuture lastContentFuture = channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+
+ // close the connection, if no keep-alive is needed
+ if (!HttpHeaders.isKeepAlive(httpRequest)) {
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MimeTypes.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MimeTypes.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MimeTypes.java
new file mode 100644
index 0000000..6d54bea
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MimeTypes.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.util;
+
+import java.util.HashMap;
+
+/**
+ * Simple utility class that resolves file extensions to MIME types.
+ *
+ * <p>There are various solutions built into Java that depend on extra resource and configuration
+ * files. They are designed to be composable and extensible, but also unfortunately tricky to control.
+ * This is meant to be a simple solution that may eventually be subsumed by a better one.
+ */
+public class MimeTypes {
+
+ /** The default mime type. */
+ private static final String DEFAULT_MIME_TYPE = "application/octet-stream";
+
+ /** The map with the constants. */
+ private static final HashMap<String, String> MIME_MAP = new HashMap<String, String>();
+
+ /**
+ * Gets the MIME type for the file with the given extension. If the mime type is not recognized,
+ * this method returns null.
+ *
+ * @param fileExtension The file extension.
+ * @return The MIME type, or {@code null}, if the file extension is not recognized.
+ */
+ public static String getMimeTypeForExtension(String fileExtension) {
+ return MIME_MAP.get(fileExtension.toLowerCase());
+ }
+
+ /**
+ * Gets the MIME type for the file with the given name, by extension. This method tries to extract
+ * the file extension and then use the {@link #getMimeTypeForExtension(String)} to determine the
+ * MIME type. If the extension cannot be determined, or the extension is unrecognized, this method
+ * return {@code null}.
+ *
+ * @param fileName The file name.
+ * @return The MIME type, or {@code null}, if the file's extension is not recognized.
+ */
+ public static String getMimeTypeForFileName(String fileName) {
+ int extensionPos = fileName.lastIndexOf('.');
+ if (extensionPos >= 1 && extensionPos < fileName.length() - 1) {
+ String extension = fileName.substring(extensionPos + 1);
+ return getMimeTypeForExtension(extension);
+ }
+ else {
+ return null;
+ }
+ }
+
+ /**
+ * Gets the default MIME type, which is {@code "application/octet-stream"}.
+ *
+ * @return The default MIME type.
+ */
+ public static String getDefaultMimeType() {
+ return DEFAULT_MIME_TYPE;
+ }
+
+ // ------------------------------------------------------------------------
+ // prevent instantiation
+ // ------------------------------------------------------------------------
+
+ private MimeTypes() {}
+
+ // ------------------------------------------------------------------------
+ // initialization
+ // ------------------------------------------------------------------------
+
+ static {
+ // text types
+ MIME_MAP.put("html", "text/html");
+ MIME_MAP.put("htm", "text/html");
+ MIME_MAP.put("css", "text/css");
+ MIME_MAP.put("txt", "text/plain");
+ MIME_MAP.put("log", "text/plain");
+ MIME_MAP.put("out", "text/plain");
+ MIME_MAP.put("err", "text/plain");
+ MIME_MAP.put("xml", "text/xml");
+ MIME_MAP.put("csv", "text/csv");
+
+ // application types
+ MIME_MAP.put("js", "application/javascript");
+ MIME_MAP.put("json", "application/json");
+
+ // image types
+ MIME_MAP.put("png", "image/png");
+ MIME_MAP.put("jpg", "image/jpeg");
+ MIME_MAP.put("jpeg", "image/jpeg");
+ MIME_MAP.put("gif", "image/gif");
+ MIME_MAP.put("svg", "image/svg+xml");
+ MIME_MAP.put("tiff", "image/tiff");
+ MIME_MAP.put("tff", "image/tiff");
+ MIME_MAP.put("bmp", "image/bmp");
+
+ // fonts
+ MIME_MAP.put("woff", "application/font-woff");
+ MIME_MAP.put("woff2", "application/font-woff2");
+ MIME_MAP.put("ttf", "font/ttf");
+ MIME_MAP.put("otf", "font/opentype");
+ MIME_MAP.put("eot", "font/application/vnd.ms-fontobject");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/75e84e04/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
index ab43f77..89d87f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -30,8 +30,11 @@ import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
@@ -43,11 +46,13 @@ import org.junit.Test;
import javax.annotation.Nonnull;
-import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* IT cases for {@link RestClient} and {@link RestServerEndpoint}.
@@ -60,13 +65,25 @@ public class RestEndpointITCase extends TestLogger {
private static final Time timeout = Time.seconds(10L);
@Test
- public void testEndpoints() throws ConfigurationException, IOException, InterruptedException, ExecutionException {
+ public void testEndpoints() throws Exception {
Configuration config = new Configuration();
RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration(config);
- RestServerEndpoint serverEndpoint = new TestRestServerEndpoint(serverConfig);
+ final String address = "localhost";
+ RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
+ when(mockRestfulGateway.getAddress()).thenReturn(address);
+ GatewayRetriever<RestfulGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+ when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+
+ TestHandler testHandler = new TestHandler(
+ CompletableFuture.completedFuture(address),
+ mockGatewayRetriever,
+ RpcUtils.INF_TIMEOUT,
+ true);
+
+ RestServerEndpoint serverEndpoint = new TestRestServerEndpoint(serverConfig, testHandler);
RestClient clientEndpoint = new TestRestClient(clientConfig);
try {
@@ -111,26 +128,39 @@ public class RestEndpointITCase extends TestLogger {
private static class TestRestServerEndpoint extends RestServerEndpoint {
- TestRestServerEndpoint(RestServerEndpointConfiguration configuration) {
+ private final TestHandler testHandler;
+
+ TestRestServerEndpoint(RestServerEndpointConfiguration configuration, TestHandler testHandler) {
super(configuration);
+
+ this.testHandler = Preconditions.checkNotNull(testHandler);
}
@Override
- protected Collection<AbstractRestHandler<?, ?, ?>> initializeHandlers() {
- return Collections.singleton(new TestHandler());
+ protected Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers() {
+ return Collections.singleton(testHandler);
}
}
- private static class TestHandler extends AbstractRestHandler<TestRequest, TestResponse, TestParameters> {
+ private static class TestHandler extends AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {
public static final Object LOCK = new Object();
- TestHandler() {
- super(new TestHeaders());
+ TestHandler(
+ CompletableFuture<String> localAddressFuture,
+ GatewayRetriever<RestfulGateway> leaderRetriever,
+ Time timeout,
+ boolean httpsEnabled) {
+ super(
+ localAddressFuture,
+ leaderRetriever,
+ timeout,
+ httpsEnabled,
+ new TestHeaders());
}
@Override
- protected CompletableFuture<TestResponse> handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request) throws RestHandlerException {
+ protected CompletableFuture<TestResponse> handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request, RestfulGateway gateway) throws RestHandlerException {
Assert.assertEquals(request.getPathParameter(JobIDPathParameter.class), PATH_JOB_ID);
Assert.assertEquals(request.getQueryParameter(JobIDQueryParameter.class).get(0), QUERY_JOB_ID);