You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/20 08:35:41 UTC

flink git commit: [FLINK-7532] Add web content handler to DispatcherRestEndpoint

Repository: flink
Updated Branches:
  refs/heads/master 2cb37cb93 -> 55b76d54f


[FLINK-7532] Add web content handler to DispatcherRestEndpoint

Adds the StaticFileContentHandler to the DispatcherRestEndpoint if the
flink-runtime-web dependency is in the classpath. In order to setup the
respective channel handler, this commit introduces the setupChannelHandlers
method to the RestServerEndpoint.

Refactor RestServerEndpoint#initializeHandler to support StaticFileServerHandler registration

This closes #4601.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55b76d54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55b76d54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55b76d54

Branch: refs/heads/master
Commit: 55b76d54f0dcc4bdaa96eaa463ce0bfcad23d239
Parents: 2cb37cb
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Aug 18 14:05:11 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 20 10:35:30 2017 +0200

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      | 62 ++++++++++++++++++--
 .../entrypoint/SessionClusterEntrypoint.java    | 53 ++++++++++++++++-
 .../flink/runtime/rest/RestServerEndpoint.java  | 29 +++++----
 .../rest/handler/RestHandlerSpecification.java  | 41 +++++++++++++
 .../files/WebContentHandlerSpecification.java   | 46 +++++++++++++++
 .../runtime/rest/messages/MessageHeaders.java   | 18 +-----
 .../runtime/webmonitor/WebMonitorUtils.java     | 39 ++++++++++++
 .../flink/runtime/rest/RestEndpointITCase.java  |  7 ++-
 8 files changed, 260 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 63b1a4c..debd674 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -18,24 +18,78 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
+import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * REST endpoint for the {@link Dispatcher} component.
  */
 public class DispatcherRestEndpoint extends RestServerEndpoint {
 
-	public DispatcherRestEndpoint(RestServerEndpointConfiguration configuration) {
+	private final GatewayRetriever<DispatcherGateway> leaderRetriever;
+	private final Time timeout;
+	private final File tmpDir;
+
+	public DispatcherRestEndpoint(
+			RestServerEndpointConfiguration configuration,
+			GatewayRetriever<DispatcherGateway> leaderRetriever,
+			Time timeout,
+			File tmpDir) {
 		super(configuration);
+		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
+		this.timeout = Preconditions.checkNotNull(timeout);
+		this.tmpDir = Preconditions.checkNotNull(tmpDir);
+	}
+
+	@Override
+	protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
+
+		try {
+			optWebContent = WebMonitorUtils.tryLoadWebContent(
+				leaderRetriever,
+				restAddressFuture,
+				timeout,
+				tmpDir);
+		} catch (IOException e) {
+			log.warn("Could not load web content handler.", e);
+			optWebContent = Optional.empty();
+		}
+
+		return optWebContent
+			.map(webContent ->
+				Collections.singleton(
+					Tuple2.<RestHandlerSpecification, ChannelInboundHandler>of(WebContentHandlerSpecification.getInstance(), webContent)))
+			.orElseGet(() -> Collections.emptySet());
 	}
 
 	@Override
-	protected Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers() {
-		return Collections.emptySet();
+	public void shutdown(Time timeout) {
+		super.shutdown(timeout);
+
+		try {
+			log.info("Removing cache directory {}", tmpDir);
+			FileUtils.deleteDirectory(tmpDir);
+		} catch (Throwable t) {
+			log.warn("Error while deleting cache directory {}", tmpDir, t);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 0d14443..80bd384 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -18,22 +18,30 @@
 
 package org.apache.flink.runtime.entrypoint;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
+import java.io.File;
 import java.util.Optional;
 
 /**
@@ -45,6 +53,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 
 	private Dispatcher dispatcher;
 
+	private LeaderRetrievalService dispatcherLeaderRetrievalService;
+
 	private DispatcherRestEndpoint dispatcherRestEndpoint;
 
 	public SessionClusterEntrypoint(Configuration configuration) {
@@ -60,8 +70,18 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry) throws Exception {
 
-		dispatcherRestEndpoint = new DispatcherRestEndpoint(
-			RestServerEndpointConfiguration.fromConfiguration(configuration));
+		dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
+
+		LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
+			rpcService,
+			DispatcherGateway.class,
+			uuid -> new DispatcherId(uuid),
+			10,
+			Time.milliseconds(50L));
+
+		dispatcherRestEndpoint = createDispatcherRestEndpoint(
+			configuration,
+			dispatcherGatewayRetriever);
 
 		LOG.debug("Starting Dispatcher REST endpoint.");
 		dispatcherRestEndpoint.start();
@@ -90,17 +110,30 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 
 		LOG.debug("Starting Dispatcher.");
 		dispatcher.start();
+		dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
 	}
 
 	@Override
 	protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
 		Throwable exception = null;
 
+		if (dispatcherRestEndpoint != null) {
+			dispatcherRestEndpoint.shutdown(Time.seconds(10L));
+		}
+
+		if (dispatcherLeaderRetrievalService != null) {
+			try {
+				dispatcherLeaderRetrievalService.stop();
+			} catch (Throwable t) {
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
+			}
+		}
+
 		if (dispatcher != null) {
 			try {
 				dispatcher.shutDown();
 			} catch (Throwable t) {
-				exception = t;
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
 			}
 		}
 
@@ -117,6 +150,20 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 		}
 	}
 
+	protected DispatcherRestEndpoint createDispatcherRestEndpoint(
+		Configuration configuration,
+		LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever) throws Exception {
+
+		Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+		File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));
+
+		return new DispatcherRestEndpoint(
+			RestServerEndpointConfiguration.fromConfiguration(configuration),
+			dispatcherGatewayRetriever,
+			timeout,
+			tmpDir);
+	}
+
 	protected Dispatcher createDispatcher(
 		Configuration configuration,
 		RpcService rpcService,

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index fc37381..ec6e5b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -19,16 +19,16 @@
 package org.apache.flink.runtime.rest;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.RouterHandler;
-import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
@@ -82,13 +82,18 @@ public abstract class RestServerEndpoint {
 	/**
 	 * This method is called at the beginning of {@link #start()} to setup all handlers that the REST server endpoint
 	 * implementation requires.
+	 *
+	 * @param restAddressFuture future rest address of the RestServerEndpoint
+	 * @return Collection of AbstractRestHandler which are added to the server endpoint
 	 */
-	protected abstract Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers();
+	protected abstract Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture);
 
 	/**
 	 * Starts this REST server endpoint.
+	 *
+	 * @throws Exception if we cannot start the RestServerEndpoint
 	 */
-	public void start() {
+	public void start() throws Exception {
 		synchronized (lock) {
 			if (started) {
 				// RestServerEndpoint already started
@@ -98,8 +103,9 @@ public abstract class RestServerEndpoint {
 			log.info("Starting rest endpoint.");
 
 			final Router router = new Router();
+			final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
 
-			initializeHandlers().forEach(handler -> registerHandler(router, handler));
+			initializeHandlers(restAddressFuture).forEach(handler -> registerHandler(router, handler));
 
 			ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
 
@@ -150,8 +156,11 @@ public abstract class RestServerEndpoint {
 			} else {
 				protocol = "http://";
 			}
+
 			restAddress = protocol + address + ':' + port;
 
+			restAddressFuture.complete(restAddress);
+
 			started = true;
 		}
 	}
@@ -239,13 +248,13 @@ public abstract class RestServerEndpoint {
 		}
 	}
 
-	private static <R extends RequestBody, P extends ResponseBody> void registerHandler(Router router, AbstractRestHandler<?, R, P, ?> handler) {
-		switch (handler.getMessageHeaders().getHttpMethod()) {
+	private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
+		switch (specificationHandler.f0.getHttpMethod()) {
 			case GET:
-				router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+				router.GET(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
 				break;
 			case POST:
-				router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+				router.POST(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
 				break;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
new file mode 100644
index 0000000..4ebcd49
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+/**
+ * Rest handler interface which all rest handler implementation have to implement.
+ */
+public interface RestHandlerSpecification {
+
+	/**
+	 * Returns the {@link HttpMethodWrapper} to be used for the request.
+	 *
+	 * @return http method to be used for the request
+	 */
+	HttpMethodWrapper getHttpMethod();
+
+	/**
+	 * Returns the generalized endpoint url that this request should be sent to, for example {@code /job/:jobid}.
+	 *
+	 * @return endpoint url that this request should be sent to
+	 */
+	String getTargetRestEndpointURL();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java
new file mode 100644
index 0000000..98b805a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.files;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+
+/**
+ * Rest handler specification for the web content handler.
+ */
+public final class WebContentHandlerSpecification implements RestHandlerSpecification {
+
+	private static final WebContentHandlerSpecification INSTANCE = new WebContentHandlerSpecification();
+
+	private WebContentHandlerSpecification() {}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/:*";
+	}
+
+	public static WebContentHandlerSpecification getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
index 254c231..e5ec794 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages;
 
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
@@ -31,7 +31,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
  * @param <P> response message type
  * @param <M> message parameters type
  */
-public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> {
+public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends RestHandlerSpecification {
 
 	/**
 	 * Returns the class of the request message.
@@ -41,20 +41,6 @@ public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M
 	Class<R> getRequestClass();
 
 	/**
-	 * Returns the {@link HttpMethodWrapper} to be used for the request.
-	 *
-	 * @return http method to be used for the request
-	 */
-	HttpMethodWrapper getHttpMethod();
-
-	/**
-	 * Returns the generalized endpoint url that this request should be sent to, for example {@code /job/:jobid}.
-	 *
-	 * @return endpoint url that this request should be sent to
-	 */
-	String getTargetRestEndpointURL();
-
-	/**
 	 * Returns the class of the response message.
 	 *
 	 * @return class of the response message

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 57996bd..e0f1823 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -32,7 +32,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
@@ -43,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -50,6 +53,8 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /**
@@ -170,6 +175,40 @@ public final class WebMonitorUtils {
 		}
 	}
 
+	/**
+	 * Checks whether the flink-runtime-web dependency is available and if so returns a
+	 * StaticFileServerHandler which can serve the static file contents.
+	 *
+	 * @param leaderRetriever to be used by the StaticFileServerHandler
+	 * @param restAddressFuture of the underlying REST server endpoint
+	 * @param timeout for lookup requests
+	 * @param tmpDir to be used by the StaticFileServerHandler to store temporary files
+	 * @param <T> type of the gateway to retrieve
+	 * @return StaticFileServerHandler if flink-runtime-web is in the classpath; Otherwise Optional.empty
+	 * @throws IOException if we cannot create the StaticFileServerHandler
+	 */
+	public static <T extends RestfulGateway> Optional<StaticFileServerHandler<T>> tryLoadWebContent(
+			GatewayRetriever<T> leaderRetriever,
+			CompletableFuture<String> restAddressFuture,
+			Time timeout,
+			File tmpDir) throws IOException {
+
+		// 1. Check if flink-runtime-web is in the classpath
+		try {
+			final String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+			Class.forName(classname).asSubclass(WebMonitor.class);
+
+			return Optional.of(new StaticFileServerHandler<>(
+				leaderRetriever,
+				restAddressFuture,
+				timeout,
+				tmpDir));
+		} catch (ClassNotFoundException ignored) {
+			// class not found means that there is no flink-runtime-web in the classpath
+			return Optional.empty();
+		}
+	}
+
 	public static JsonArchivist[] getJsonArchivists() {
 		try {
 			String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
index 4e06d1f..8dfb5ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -20,10 +20,12 @@ package org.apache.flink.runtime.rest;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
@@ -37,6 +39,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -137,8 +140,8 @@ public class RestEndpointITCase extends TestLogger {
 		}
 
 		@Override
-		protected Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers() {
-			return Collections.singleton(testHandler);
+		protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+			return Collections.singleton(Tuple2.of(new TestHeaders(), testHandler));
 		}
 	}