You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/20 08:35:41 UTC
flink git commit: [FLINK-7532] Add web content handler to
DispatcherRestEndpoint
Repository: flink
Updated Branches:
refs/heads/master 2cb37cb93 -> 55b76d54f
[FLINK-7532] Add web content handler to DispatcherRestEndpoint
Adds the StaticFileContentHandler to the DispatcherRestEndpoint if the
flink-runtime-web dependency is in the classpath. In order to setup the
respective channel handler, this commit introduces the setupChannelHandlers
method to the RestServerEndpoint.
Refactor RestServerEndpoint#initializeHandler to support StaticFileServerHandler registration
This closes #4601.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55b76d54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55b76d54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55b76d54
Branch: refs/heads/master
Commit: 55b76d54f0dcc4bdaa96eaa463ce0bfcad23d239
Parents: 2cb37cb
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Aug 18 14:05:11 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 20 10:35:30 2017 +0200
----------------------------------------------------------------------
.../dispatcher/DispatcherRestEndpoint.java | 62 ++++++++++++++++++--
.../entrypoint/SessionClusterEntrypoint.java | 53 ++++++++++++++++-
.../flink/runtime/rest/RestServerEndpoint.java | 29 +++++----
.../rest/handler/RestHandlerSpecification.java | 41 +++++++++++++
.../files/WebContentHandlerSpecification.java | 46 +++++++++++++++
.../runtime/rest/messages/MessageHeaders.java | 18 +-----
.../runtime/webmonitor/WebMonitorUtils.java | 39 ++++++++++++
.../flink/runtime/rest/RestEndpointITCase.java | 7 ++-
8 files changed, 260 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 63b1a4c..debd674 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -18,24 +18,78 @@
package org.apache.flink.runtime.dispatcher;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
+import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
/**
* REST endpoint for the {@link Dispatcher} component.
*/
public class DispatcherRestEndpoint extends RestServerEndpoint {
- public DispatcherRestEndpoint(RestServerEndpointConfiguration configuration) {
+ private final GatewayRetriever<DispatcherGateway> leaderRetriever;
+ private final Time timeout;
+ private final File tmpDir;
+
+ public DispatcherRestEndpoint(
+ RestServerEndpointConfiguration configuration,
+ GatewayRetriever<DispatcherGateway> leaderRetriever,
+ Time timeout,
+ File tmpDir) {
super(configuration);
+ this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ this.tmpDir = Preconditions.checkNotNull(tmpDir);
+ }
+
+ @Override
+ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+ Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
+
+ try {
+ optWebContent = WebMonitorUtils.tryLoadWebContent(
+ leaderRetriever,
+ restAddressFuture,
+ timeout,
+ tmpDir);
+ } catch (IOException e) {
+ log.warn("Could not load web content handler.", e);
+ optWebContent = Optional.empty();
+ }
+
+ return optWebContent
+ .map(webContent ->
+ Collections.singleton(
+ Tuple2.<RestHandlerSpecification, ChannelInboundHandler>of(WebContentHandlerSpecification.getInstance(), webContent)))
+ .orElseGet(() -> Collections.emptySet());
}
@Override
- protected Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers() {
- return Collections.emptySet();
+ public void shutdown(Time timeout) {
+ super.shutdown(timeout);
+
+ try {
+ log.info("Removing cache directory {}", tmpDir);
+ FileUtils.deleteDirectory(tmpDir);
+ } catch (Throwable t) {
+ log.warn("Error while deleting cache directory {}", tmpDir, t);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 0d14443..80bd384 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -18,22 +18,30 @@
package org.apache.flink.runtime.entrypoint;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
+import java.io.File;
import java.util.Optional;
/**
@@ -45,6 +53,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
private Dispatcher dispatcher;
+ private LeaderRetrievalService dispatcherLeaderRetrievalService;
+
private DispatcherRestEndpoint dispatcherRestEndpoint;
public SessionClusterEntrypoint(Configuration configuration) {
@@ -60,8 +70,18 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry) throws Exception {
- dispatcherRestEndpoint = new DispatcherRestEndpoint(
- RestServerEndpointConfiguration.fromConfiguration(configuration));
+ dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
+
+ LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
+ rpcService,
+ DispatcherGateway.class,
+ uuid -> new DispatcherId(uuid),
+ 10,
+ Time.milliseconds(50L));
+
+ dispatcherRestEndpoint = createDispatcherRestEndpoint(
+ configuration,
+ dispatcherGatewayRetriever);
LOG.debug("Starting Dispatcher REST endpoint.");
dispatcherRestEndpoint.start();
@@ -90,17 +110,30 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
LOG.debug("Starting Dispatcher.");
dispatcher.start();
+ dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
}
@Override
protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
Throwable exception = null;
+ if (dispatcherRestEndpoint != null) {
+ dispatcherRestEndpoint.shutdown(Time.seconds(10L));
+ }
+
+ if (dispatcherLeaderRetrievalService != null) {
+ try {
+ dispatcherLeaderRetrievalService.stop();
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
if (dispatcher != null) {
try {
dispatcher.shutDown();
} catch (Throwable t) {
- exception = t;
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}
@@ -117,6 +150,20 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
}
}
+ protected DispatcherRestEndpoint createDispatcherRestEndpoint(
+ Configuration configuration,
+ LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever) throws Exception {
+
+ Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+ File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));
+
+ return new DispatcherRestEndpoint(
+ RestServerEndpointConfiguration.fromConfiguration(configuration),
+ dispatcherGatewayRetriever,
+ timeout,
+ tmpDir);
+ }
+
protected Dispatcher createDispatcher(
Configuration configuration,
RpcService rpcService,
http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index fc37381..ec6e5b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -19,16 +19,16 @@
package org.apache.flink.runtime.rest;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.RouterHandler;
-import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
@@ -82,13 +82,18 @@ public abstract class RestServerEndpoint {
/**
* This method is called at the beginning of {@link #start()} to setup all handlers that the REST server endpoint
* implementation requires.
+ *
+ * @param restAddressFuture future rest address of the RestServerEndpoint
+ * @return Collection of AbstractRestHandler which are added to the server endpoint
*/
- protected abstract Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers();
+ protected abstract Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture);
/**
* Starts this REST server endpoint.
+ *
+ * @throws Exception if we cannot start the RestServerEndpoint
*/
- public void start() {
+ public void start() throws Exception {
synchronized (lock) {
if (started) {
// RestServerEndpoint already started
@@ -98,8 +103,9 @@ public abstract class RestServerEndpoint {
log.info("Starting rest endpoint.");
final Router router = new Router();
+ final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
- initializeHandlers().forEach(handler -> registerHandler(router, handler));
+ initializeHandlers(restAddressFuture).forEach(handler -> registerHandler(router, handler));
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@@ -150,8 +156,11 @@ public abstract class RestServerEndpoint {
} else {
protocol = "http://";
}
+
restAddress = protocol + address + ':' + port;
+ restAddressFuture.complete(restAddress);
+
started = true;
}
}
@@ -239,13 +248,13 @@ public abstract class RestServerEndpoint {
}
}
- private static <R extends RequestBody, P extends ResponseBody> void registerHandler(Router router, AbstractRestHandler<?, R, P, ?> handler) {
- switch (handler.getMessageHeaders().getHttpMethod()) {
+ private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
+ switch (specificationHandler.f0.getHttpMethod()) {
case GET:
- router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+ router.GET(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
break;
case POST:
- router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+ router.POST(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
break;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
new file mode 100644
index 0000000..4ebcd49
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+/**
+ * Rest handler interface which all rest handler implementation have to implement.
+ */
+public interface RestHandlerSpecification {
+
+ /**
+ * Returns the {@link HttpMethodWrapper} to be used for the request.
+ *
+ * @return http method to be used for the request
+ */
+ HttpMethodWrapper getHttpMethod();
+
+ /**
+ * Returns the generalized endpoint url that this request should be sent to, for example {@code /job/:jobid}.
+ *
+ * @return endpoint url that this request should be sent to
+ */
+ String getTargetRestEndpointURL();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java
new file mode 100644
index 0000000..98b805a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.files;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+
+/**
+ * Rest handler specification for the web content handler.
+ */
+public final class WebContentHandlerSpecification implements RestHandlerSpecification {
+
+ private static final WebContentHandlerSpecification INSTANCE = new WebContentHandlerSpecification();
+
+ private WebContentHandlerSpecification() {}
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/:*";
+ }
+
+ public static WebContentHandlerSpecification getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
index 254c231..e5ec794 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.rest.messages;
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
@@ -31,7 +31,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
* @param <P> response message type
* @param <M> message parameters type
*/
-public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> {
+public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends RestHandlerSpecification {
/**
* Returns the class of the request message.
@@ -41,20 +41,6 @@ public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M
Class<R> getRequestClass();
/**
- * Returns the {@link HttpMethodWrapper} to be used for the request.
- *
- * @return http method to be used for the request
- */
- HttpMethodWrapper getHttpMethod();
-
- /**
- * Returns the generalized endpoint url that this request should be sent to, for example {@code /job/:jobid}.
- *
- * @return endpoint url that this request should be sent to
- */
- String getTargetRestEndpointURL();
-
- /**
* Returns the class of the response message.
*
* @return class of the response message
http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 57996bd..e0f1823 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -32,7 +32,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -43,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -50,6 +53,8 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
@@ -170,6 +175,40 @@ public final class WebMonitorUtils {
}
}
+ /**
+ * Checks whether the flink-runtime-web dependency is available and if so returns a
+ * StaticFileServerHandler which can serve the static file contents.
+ *
+ * @param leaderRetriever to be used by the StaticFileServerHandler
+ * @param restAddressFuture of the underlying REST server endpoint
+ * @param timeout for lookup requests
+ * @param tmpDir to be used by the StaticFileServerHandler to store temporary files
+ * @param <T> type of the gateway to retrieve
+ * @return StaticFileServerHandler if flink-runtime-web is in the classpath; Otherwise Optional.empty
+ * @throws IOException if we cannot create the StaticFileServerHandler
+ */
+ public static <T extends RestfulGateway> Optional<StaticFileServerHandler<T>> tryLoadWebContent(
+ GatewayRetriever<T> leaderRetriever,
+ CompletableFuture<String> restAddressFuture,
+ Time timeout,
+ File tmpDir) throws IOException {
+
+ // 1. Check if flink-runtime-web is in the classpath
+ try {
+ final String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+ Class.forName(classname).asSubclass(WebMonitor.class);
+
+ return Optional.of(new StaticFileServerHandler<>(
+ leaderRetriever,
+ restAddressFuture,
+ timeout,
+ tmpDir));
+ } catch (ClassNotFoundException ignored) {
+ // class not found means that there is no flink-runtime-web in the classpath
+ return Optional.empty();
+ }
+ }
+
public static JsonArchivist[] getJsonArchivists() {
try {
String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
index 4e06d1f..8dfb5ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -20,10 +20,12 @@ package org.apache.flink.runtime.rest;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
@@ -37,6 +39,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -137,8 +140,8 @@ public class RestEndpointITCase extends TestLogger {
}
@Override
- protected Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers() {
- return Collections.singleton(testHandler);
+ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+ return Collections.singleton(Tuple2.of(new TestHeaders(), testHandler));
}
}