You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/19 22:44:15 UTC

[05/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler to flink-runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
new file mode 100644
index 0000000..da115ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.files;
+
+/*****************************************************************************
+ * This code is based on the "HttpStaticFileServerHandler" from the
+ * Netty project's HTTP server example.
+ *
+ * See http://netty.io and
+ * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
+ *****************************************************************************/
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.util.MimeTypes;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
+import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.Locale;
+import java.util.TimeZone;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.DATE;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple file server handler that serves requests to web frontend's static files, such as
+ * HTML, CSS, or JS files.
+ *
+ * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
+ * example.</p>
+ */
+@ChannelHandler.Sharable
+public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectHandler<T> {
+
+	/** Timezone in which this server answers its "if-modified" requests. */
+	private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT");
+
+	/** Date format for HTTP. */
+	public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
+
+	/** Be default, we allow files to be cached for 5 minutes. */
+	private static final int HTTP_CACHE_SECONDS = 300;
+
+	// ------------------------------------------------------------------------
+
+	/** The path in which the static documents are. */
+	private final File rootPath;
+
+	public StaticFileServerHandler(
+			GatewayRetriever<T> retriever,
+			CompletableFuture<String> localJobManagerAddressFuture,
+			Time timeout,
+			File rootPath) throws IOException {
+
+		super(localJobManagerAddressFuture, retriever, timeout);
+
+		this.rootPath = checkNotNull(rootPath).getCanonicalFile();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Responses to requests
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception {
+		final HttpRequest request = routed.request();
+		final String requestPath;
+
+		// make sure we request the "index.html" in case there is a directory request
+		if (routed.path().endsWith("/")) {
+			requestPath = routed.path() + "index.html";
+		}
+		// in case the files being accessed are logs or stdout files, find appropriate paths.
+		else if (routed.path().equals("/jobmanager/log") || routed.path().equals("/jobmanager/stdout")) {
+			requestPath = "";
+		} else {
+			requestPath = routed.path();
+		}
+
+		respondToRequest(channelHandlerContext, request, requestPath);
+	}
+
+	/**
+	 * Response when running with leading JobManager.
+	 */
+	private void respondToRequest(ChannelHandlerContext ctx, HttpRequest request, String requestPath)
+			throws IOException, ParseException, URISyntaxException {
+
+		// convert to absolute path
+		final File file = new File(rootPath, requestPath);
+
+		if (!file.exists()) {
+			// file does not exist. Try to load it with the classloader
+			ClassLoader cl = StaticFileServerHandler.class.getClassLoader();
+
+			try (InputStream resourceStream = cl.getResourceAsStream("web" + requestPath)) {
+				boolean success = false;
+				try {
+					if (resourceStream != null) {
+						URL root = cl.getResource("web");
+						URL requested = cl.getResource("web" + requestPath);
+
+						if (root != null && requested != null) {
+							URI rootURI = new URI(root.getPath()).normalize();
+							URI requestedURI = new URI(requested.getPath()).normalize();
+
+							// Check that we don't load anything from outside of the
+							// expected scope.
+							if (!rootURI.relativize(requestedURI).equals(requestedURI)) {
+								logger.debug("Loading missing file from classloader: {}", requestPath);
+								// ensure that directory to file exists.
+								file.getParentFile().mkdirs();
+								Files.copy(resourceStream, file.toPath());
+
+								success = true;
+							}
+						}
+					}
+				} catch (Throwable t) {
+					logger.error("error while responding", t);
+				} finally {
+					if (!success) {
+						logger.debug("Unable to load requested file {} from classloader", requestPath);
+						sendError(ctx, NOT_FOUND);
+						return;
+					}
+				}
+			}
+		}
+
+		if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) {
+			sendError(ctx, NOT_FOUND);
+			return;
+		}
+
+		if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
+			sendError(ctx, NOT_FOUND);
+			return;
+		}
+
+		// cache validation
+		final String ifModifiedSince = request.headers().get(IF_MODIFIED_SINCE);
+		if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
+			SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
+			Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince);
+
+			// Only compare up to the second because the datetime format we send to the client
+			// does not have milliseconds
+			long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000;
+			long fileLastModifiedSeconds = file.lastModified() / 1000;
+			if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) {
+				if (logger.isDebugEnabled()) {
+					logger.debug("Responding 'NOT MODIFIED' for file '" + file.getAbsolutePath() + '\'');
+				}
+
+				sendNotModified(ctx);
+				return;
+			}
+		}
+
+		if (logger.isDebugEnabled()) {
+			logger.debug("Responding with file '" + file.getAbsolutePath() + '\'');
+		}
+
+		// Don't need to close this manually. Netty's DefaultFileRegion will take care of it.
+		final RandomAccessFile raf;
+		try {
+			raf = new RandomAccessFile(file, "r");
+		}
+		catch (FileNotFoundException e) {
+			sendError(ctx, NOT_FOUND);
+			return;
+		}
+
+		try {
+			long fileLength = raf.length();
+
+			HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+			setContentTypeHeader(response, file);
+
+			// since the log and out files are rapidly changing, we don't want to browser to cache them
+			if (!(requestPath.contains("log") || requestPath.contains("out"))) {
+				setDateAndCacheHeaders(response, file);
+			}
+			if (HttpHeaders.isKeepAlive(request)) {
+				response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+			}
+			HttpHeaders.setContentLength(response, fileLength);
+
+			// write the initial line and the header.
+			ctx.write(response);
+
+			// write the content.
+			ChannelFuture lastContentFuture;
+			if (ctx.pipeline().get(SslHandler.class) == null) {
+				ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
+				lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+			} else {
+				lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
+					ctx.newProgressivePromise());
+				// HttpChunkedInput will write the end marker (LastHttpContent) for us.
+			}
+
+			// close the connection, if no keep-alive is needed
+			if (!HttpHeaders.isKeepAlive(request)) {
+				lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+			}
+		} catch (Exception e) {
+			raf.close();
+			logger.error("Failed to serve file.", e);
+			sendError(ctx, INTERNAL_SERVER_ERROR);
+		}
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+		if (ctx.channel().isActive()) {
+			logger.error("Caught exception", cause);
+			sendError(ctx, INTERNAL_SERVER_ERROR);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities to encode headers and responses
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Writes a simple  error response message.
+	 *
+	 * @param ctx    The channel context to write the response to.
+	 * @param status The response status.
+	 */
+	public static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+		FullHttpResponse response = new DefaultFullHttpResponse(
+				HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
+		response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
+
+		// close the connection as soon as the error message is sent.
+		ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+	}
+
+	/**
+	 * Send the "304 Not Modified" response. This response can be used when the
+	 * file timestamp is the same as what the browser is sending up.
+	 *
+	 * @param ctx The channel context to write the response to.
+	 */
+	public static void sendNotModified(ChannelHandlerContext ctx) {
+		FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);
+		setDateHeader(response);
+
+		// close the connection as soon as the error message is sent.
+		ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+	}
+
+	/**
+	 * Sets the "date" header for the HTTP response.
+	 *
+	 * @param response HTTP response
+	 */
+	public static void setDateHeader(FullHttpResponse response) {
+		SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
+		dateFormatter.setTimeZone(GMT_TIMEZONE);
+
+		Calendar time = new GregorianCalendar();
+		response.headers().set(DATE, dateFormatter.format(time.getTime()));
+	}
+
+	/**
+	 * Sets the "date" and "cache" headers for the HTTP Response.
+	 *
+	 * @param response    The HTTP response object.
+	 * @param fileToCache File to extract the modification timestamp from.
+	 */
+	public static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) {
+		SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
+		dateFormatter.setTimeZone(GMT_TIMEZONE);
+
+		// date header
+		Calendar time = new GregorianCalendar();
+		response.headers().set(DATE, dateFormatter.format(time.getTime()));
+
+		// cache headers
+		time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
+		response.headers().set(EXPIRES, dateFormatter.format(time.getTime()));
+		response.headers().set(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
+		response.headers().set(LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));
+	}
+
+	/**
+	 * Sets the content type header for the HTTP Response.
+	 *
+	 * @param response HTTP response
+	 * @param file     file to extract content type
+	 */
+	public static void setContentTypeHeader(HttpResponse response, File file) {
+		String mimeType = MimeTypes.getMimeTypeForFileName(file.getName());
+		String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType();
+		response.headers().set(CONTENT_TYPE, mimeFinal);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
new file mode 100644
index 0000000..315bdc2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract request handler that returns a list of all available metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
+	private final MetricFetcher fetcher;
+
+	public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor);
+		this.fetcher = Preconditions.checkNotNull(fetcher);
+	}
+
+	@Override
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				fetcher.update();
+				String requestedMetricsList = queryParams.get("get");
+				try {
+					return requestedMetricsList != null
+						? getMetricsValues(pathParams, requestedMetricsList)
+						: getAvailableMetricsList(pathParams);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not retrieve metrics.", e);
+				}
+			},
+			executor);
+
+	}
+
+	/**
+	 * Returns a Map containing the metrics belonging to the entity pointed to by the path parameters.
+	 *
+	 * @param pathParams REST path parameters
+	 * @param metrics MetricStore containing all metrics
+	 * @return Map containing metrics, or null if no metric exists
+	 */
+	protected abstract Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics);
+
+	private String getMetricsValues(Map<String, String> pathParams, String requestedMetricsList) throws IOException {
+		if (requestedMetricsList.isEmpty()) {
+			/*
+			 * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a
+			 * request for which the "get" parameter is an empty string.
+			 */
+			return "";
+		}
+		MetricStore metricStore = fetcher.getMetricStore();
+		synchronized (metricStore) {
+			Map<String, String> metrics = getMapFor(pathParams, metricStore);
+			if (metrics == null) {
+				return "";
+			}
+			String[] requestedMetrics = requestedMetricsList.split(",");
+
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+			gen.writeStartArray();
+			for (String requestedMetric : requestedMetrics) {
+				Object metricValue = metrics.get(requestedMetric);
+				if (metricValue != null) {
+					gen.writeStartObject();
+					gen.writeStringField("id", requestedMetric);
+					gen.writeStringField("value", metricValue.toString());
+					gen.writeEndObject();
+				}
+			}
+			gen.writeEndArray();
+
+			gen.close();
+			return writer.toString();
+		}
+	}
+
+	private String getAvailableMetricsList(Map<String, String> pathParams) throws IOException {
+		MetricStore metricStore = fetcher.getMetricStore();
+		synchronized (metricStore) {
+			Map<String, String> metrics = getMapFor(pathParams, metricStore);
+			if (metrics == null) {
+				return "";
+			}
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+			gen.writeStartArray();
+			for (String m : metrics.keySet()) {
+				gen.writeStartObject();
+				gen.writeStringField("id", m);
+				gen.writeEndObject();
+			}
+			gen.writeEndArray();
+
+			gen.close();
+			return writer.toString();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
new file mode 100644
index 0000000..c568ee0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns for the job manager a list of all available metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobManagerMetricsHandler extends AbstractMetricsHandler {
+
+	private static final String JOBMANAGER_METRICS_REST_PATH = "/jobmanager/metrics";
+
+	public JobManagerMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor, fetcher);
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{JOBMANAGER_METRICS_REST_PATH};
+	}
+
+	@Override
+	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+		MetricStore.JobManagerMetricStore jobManager = metrics.getJobManagerMetricStore();
+		if (jobManager == null) {
+			return null;
+		} else {
+			return jobManager.metrics;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
new file mode 100644
index 0000000..7341eb8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobMetricsHandler extends AbstractMetricsHandler {
+	public static final String PARAMETER_JOB_ID = "jobid";
+	private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics";
+
+	public JobMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor, fetcher);
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_METRICS_REST_PATH};
+	}
+
+	@Override
+	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+		MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
+		return job != null
+			? job.metrics
+			: null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
new file mode 100644
index 0000000..3a701ab
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobVertexMetricsHandler extends AbstractMetricsHandler {
+	public static final String PARAMETER_VERTEX_ID = "vertexid";
+	private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics";
+
+	public JobVertexMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor, fetcher);
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_VERTEX_METRICS_REST_PATH};
+	}
+
+	@Override
+	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+		MetricStore.TaskMetricStore task = metrics.getTaskMetricStore(
+			pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID),
+			pathParams.get(PARAMETER_VERTEX_ID));
+		return task != null
+			? task.metrics
+			: null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
new file mode 100644
index 0000000..9f53808
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
+
+/**
+ * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers.
+ *
+ * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since
+ * the last call has passed.
+ */
+public class MetricFetcher {
+	private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
+
+	private final GatewayRetriever<JobManagerGateway> retriever;
+	private final MetricQueryServiceRetriever queryServiceRetriever;
+	private final Executor executor;
+	private final Time timeout;
+
+	private final MetricStore metrics = new MetricStore();
+	private final MetricDumpDeserializer deserializer = new MetricDumpDeserializer();
+
+	private long lastUpdateTime;
+
+	public MetricFetcher(
+			GatewayRetriever<JobManagerGateway> retriever,
+			MetricQueryServiceRetriever queryServiceRetriever,
+			Executor executor,
+			Time timeout) {
+		this.retriever = Preconditions.checkNotNull(retriever);
+		this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever);
+		this.executor = Preconditions.checkNotNull(executor);
+		this.timeout = Preconditions.checkNotNull(timeout);
+	}
+
+	/**
+	 * Returns the MetricStore containing all stored metrics.
+	 *
+	 * @return MetricStore containing all stored metrics;
+	 */
+	public MetricStore getMetricStore() {
+		return metrics;
+	}
+
+	/**
+	 * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated.
+	 */
+	public void update() {
+		synchronized (this) {
+			long currentTime = System.currentTimeMillis();
+			if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update
+				lastUpdateTime = currentTime;
+				fetchMetrics();
+			}
+		}
+	}
+
+	private void fetchMetrics() {
+		try {
+			Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();
+			if (optJobManagerGateway.isPresent()) {
+				final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
+
+				/**
+				 * Remove all metrics that belong to a job that is not running and no longer archived.
+				 */
+				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
+
+				jobDetailsFuture.whenCompleteAsync(
+					(MultipleJobsDetails jobDetails, Throwable throwable) -> {
+						if (throwable != null) {
+							LOG.debug("Fetching of JobDetails failed.", throwable);
+						} else {
+							ArrayList<String> toRetain = new ArrayList<>();
+							for (JobDetails job : jobDetails.getRunningJobs()) {
+								toRetain.add(job.getJobId().toString());
+							}
+							for (JobDetails job : jobDetails.getFinishedJobs()) {
+								toRetain.add(job.getJobId().toString());
+							}
+							synchronized (metrics) {
+								metrics.jobs.keySet().retainAll(toRetain);
+							}
+						}
+					},
+					executor);
+
+				String jobManagerPath = jobManagerGateway.getAddress();
+				String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
+
+				retrieveAndQueryMetrics(jmQueryServicePath);
+
+				/**
+				 * We first request the list of all registered task managers from the job manager, and then
+				 * request the respective metric dump from each task manager.
+				 *
+				 * <p>All stored metrics that do not belong to a registered task manager will be removed.
+				 */
+				CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+				taskManagersFuture.whenCompleteAsync(
+					(Collection<Instance> taskManagers, Throwable throwable) -> {
+						if (throwable != null) {
+							LOG.debug("Fetching list of registered TaskManagers failed.", throwable);
+						} else {
+							List<String> activeTaskManagers = taskManagers.stream().map(
+								taskManagerInstance -> {
+									final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress();
+									final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString();
+
+									retrieveAndQueryMetrics(tmQueryServicePath);
+
+									return taskManagerInstance.getId().toString();
+								}).collect(Collectors.toList());
+
+							synchronized (metrics) {
+								metrics.taskManagers.keySet().retainAll(activeTaskManagers);
+							}
+						}
+					},
+					executor);
+			}
+		} catch (Exception e) {
+			LOG.warn("Exception while fetching metrics.", e);
+		}
+	}
+
+	/**
+	 * Retrieves and queries the specified QueryServiceGateway.
+	 *
+	 * @param queryServicePath specifying the QueryServiceGateway
+	 */
+	private void retrieveAndQueryMetrics(String queryServicePath) {
+		final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
+
+		queryServiceGatewayFuture.whenCompleteAsync(
+			(MetricQueryServiceGateway queryServiceGateway, Throwable t) -> {
+				if (t != null) {
+					LOG.debug("Could not retrieve QueryServiceGateway.", t);
+				} else {
+					queryMetrics(queryServiceGateway);
+				}
+			},
+			executor);
+	}
+
+	/**
+	 * Query the metrics from the given QueryServiceGateway.
+	 *
+	 * @param queryServiceGateway to query for metrics
+	 */
+	private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
+		queryServiceGateway
+			.queryMetrics(timeout)
+			.whenCompleteAsync(
+				(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
+					if (t != null) {
+						LOG.debug("Fetching metrics failed.", t);
+					} else {
+						List<MetricDump> dumpedMetrics = deserializer.deserialize(result);
+						synchronized (metrics) {
+							for (MetricDump metric : dumpedMetrics) {
+								metrics.add(metric);
+							}
+						}
+					}
+				},
+				executor);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
new file mode 100644
index 0000000..6d3fc99
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * <p>This structure is not thread-safe.
+ */
+public class MetricStore {
+	private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
+
+	final JobManagerMetricStore jobManager = new JobManagerMetricStore();
+	final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
+	final Map<String, JobMetricStore> jobs = new HashMap<>();
+
+	// -----------------------------------------------------------------------------------------------------------------
+	// Adding metrics
+	// -----------------------------------------------------------------------------------------------------------------
+	public void add(MetricDump metric) {
+		try {
+			QueryScopeInfo info = metric.scopeInfo;
+			TaskManagerMetricStore tm;
+			JobMetricStore job;
+			TaskMetricStore task;
+			SubtaskMetricStore subtask;
+
+			String name = info.scope.isEmpty()
+				? metric.name
+				: info.scope + "." + metric.name;
+
+			if (name.isEmpty()) { // malformed transmission
+				return;
+			}
+
+			switch (info.getCategory()) {
+				case INFO_CATEGORY_JM:
+					addMetric(jobManager.metrics, name, metric);
+					break;
+				case INFO_CATEGORY_TM:
+					String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
+					tm = taskManagers.get(tmID);
+					if (tm == null) {
+						tm = new TaskManagerMetricStore();
+						taskManagers.put(tmID, tm);
+					}
+					if (name.contains("GarbageCollector")) {
+						String gcName = name.substring("Status.JVM.GarbageCollector.".length(), name.lastIndexOf('.'));
+						tm.addGarbageCollectorName(gcName);
+					}
+					addMetric(tm.metrics, name, metric);
+					break;
+				case INFO_CATEGORY_JOB:
+					QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
+					job = jobs.get(jobInfo.jobID);
+					if (job == null) {
+						job = new JobMetricStore();
+						jobs.put(jobInfo.jobID, job);
+					}
+					addMetric(job.metrics, name, metric);
+					break;
+				case INFO_CATEGORY_TASK:
+					QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
+					job = jobs.get(taskInfo.jobID);
+					if (job == null) {
+						job = new JobMetricStore();
+						jobs.put(taskInfo.jobID, job);
+					}
+					task = job.tasks.get(taskInfo.vertexID);
+					if (task == null) {
+						task = new TaskMetricStore();
+						job.tasks.put(taskInfo.vertexID, task);
+					}
+					subtask = task.subtasks.get(taskInfo.subtaskIndex);
+					if (subtask == null) {
+						subtask = new SubtaskMetricStore();
+						task.subtasks.put(taskInfo.subtaskIndex, subtask);
+					}
+					/**
+					 * The duplication is intended. Metrics scoped by subtask are useful for several job/task handlers,
+					 * while the WebInterface task metric queries currently do not account for subtasks, so we don't
+					 * divide by subtask and instead use the concatenation of subtask index and metric name as the name
+					 * for those.
+					 */
+					addMetric(subtask.metrics, name, metric);
+					addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric);
+					break;
+				case INFO_CATEGORY_OPERATOR:
+					QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
+					job = jobs.get(operatorInfo.jobID);
+					if (job == null) {
+						job = new JobMetricStore();
+						jobs.put(operatorInfo.jobID, job);
+					}
+					task = job.tasks.get(operatorInfo.vertexID);
+					if (task == null) {
+						task = new TaskMetricStore();
+						job.tasks.put(operatorInfo.vertexID, task);
+					}
+					/**
+					 * As the WebInterface does not account for operators (because it can't) we don't
+					 * divide by operator and instead use the concatenation of subtask index, operator name and metric name
+					 * as the name.
+					 */
+					addMetric(task.metrics, operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, metric);
+					break;
+				default:
+					LOG.debug("Invalid metric dump category: " + info.getCategory());
+			}
+		} catch (Exception e) {
+			LOG.debug("Malformed metric dump.", e);
+		}
+	}
+
+	private void addMetric(Map<String, String> target, String name, MetricDump metric) {
+		switch (metric.getCategory()) {
+			case METRIC_CATEGORY_COUNTER:
+				MetricDump.CounterDump counter = (MetricDump.CounterDump) metric;
+				target.put(name, String.valueOf(counter.count));
+				break;
+			case METRIC_CATEGORY_GAUGE:
+				MetricDump.GaugeDump gauge = (MetricDump.GaugeDump) metric;
+				target.put(name, gauge.value);
+				break;
+			case METRIC_CATEGORY_HISTOGRAM:
+				MetricDump.HistogramDump histogram = (MetricDump.HistogramDump) metric;
+				target.put(name + "_min", String.valueOf(histogram.min));
+				target.put(name + "_max", String.valueOf(histogram.max));
+				target.put(name + "_mean", String.valueOf(histogram.mean));
+				target.put(name + "_median", String.valueOf(histogram.median));
+				target.put(name + "_stddev", String.valueOf(histogram.stddev));
+				target.put(name + "_p75", String.valueOf(histogram.p75));
+				target.put(name + "_p90", String.valueOf(histogram.p90));
+				target.put(name + "_p95", String.valueOf(histogram.p95));
+				target.put(name + "_p98", String.valueOf(histogram.p98));
+				target.put(name + "_p99", String.valueOf(histogram.p99));
+				target.put(name + "_p999", String.valueOf(histogram.p999));
+				break;
+			case METRIC_CATEGORY_METER:
+				MetricDump.MeterDump meter = (MetricDump.MeterDump) metric;
+				target.put(name, String.valueOf(meter.rate));
+				break;
+		}
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	// Accessors for sub MetricStores
+	// -----------------------------------------------------------------------------------------------------------------
+
+	/**
+	 * Returns the {@link JobManagerMetricStore}.
+	 *
+	 * @return JobManagerMetricStore
+	 */
+	public JobManagerMetricStore getJobManagerMetricStore() {
+		return jobManager;
+	}
+
+	/**
+	 * Returns the {@link TaskManagerMetricStore} for the given taskmanager ID.
+	 *
+	 * @param tmID taskmanager ID
+	 * @return TaskManagerMetricStore for the given ID, or null if no store for the given argument exists
+	 */
+	public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) {
+		return taskManagers.get(tmID);
+	}
+
+	/**
+	 * Returns the {@link JobMetricStore} for the given job ID.
+	 *
+	 * @param jobID job ID
+	 * @return JobMetricStore for the given ID, or null if no store for the given argument exists
+	 */
+	public JobMetricStore getJobMetricStore(String jobID) {
+		return jobs.get(jobID);
+	}
+
+	/**
+	 * Returns the {@link TaskMetricStore} for the given job/task ID.
+	 *
+	 * @param jobID  job ID
+	 * @param taskID task ID
+	 * @return TaskMetricStore for given IDs, or null if no store for the given arguments exists
+	 */
+	public TaskMetricStore getTaskMetricStore(String jobID, String taskID) {
+		JobMetricStore job = getJobMetricStore(jobID);
+		if (job == null) {
+			return null;
+		}
+		return job.getTaskMetricStore(taskID);
+	}
+
+	/**
+	 * Returns the {@link SubtaskMetricStore} for the given job/task ID and subtask index.
+	 *
+	 * @param jobID        job ID
+	 * @param taskID       task ID
+	 * @param subtaskIndex subtask index
+	 * @return SubtaskMetricStore for the given IDs and index, or null if no store for the given arguments exists
+	 */
+	public SubtaskMetricStore getSubtaskMetricStore(String jobID, String taskID, int subtaskIndex) {
+		TaskMetricStore task = getTaskMetricStore(jobID, taskID);
+		if (task == null) {
+			return null;
+		}
+		return task.getSubtaskMetricStore(subtaskIndex);
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	// sub MetricStore classes
+	// -----------------------------------------------------------------------------------------------------------------
+	private abstract static class ComponentMetricStore {
+		public final Map<String, String> metrics = new HashMap<>();
+
+		public String getMetric(String name, String defaultValue) {
+			String value = this.metrics.get(name);
+			return value != null
+				? value
+				: defaultValue;
+		}
+	}
+
+	/**
+	 * Sub-structure containing metrics of the JobManager.
+	 */
+	public static class JobManagerMetricStore extends ComponentMetricStore {
+	}
+
+	/**
+	 * Sub-structure containing metrics of a single TaskManager.
+	 */
+	public static class TaskManagerMetricStore extends ComponentMetricStore {
+		public final Set<String> garbageCollectorNames = new HashSet<>();
+
+		public void addGarbageCollectorName(String name) {
+			garbageCollectorNames.add(name);
+		}
+	}
+
+	/**
+	 * Sub-structure containing metrics of a single Job.
+	 */
+	public static class JobMetricStore extends ComponentMetricStore {
+		private final Map<String, TaskMetricStore> tasks = new HashMap<>();
+
+		public TaskMetricStore getTaskMetricStore(String taskID) {
+			return tasks.get(taskID);
+		}
+	}
+
+	/**
+	 * Sub-structure containing metrics of a single Task.
+	 */
+	public static class TaskMetricStore extends ComponentMetricStore {
+		private final Map<Integer, SubtaskMetricStore> subtasks = new HashMap<>();
+
+		public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) {
+			return subtasks.get(subtaskIndex);
+		}
+	}
+
+	/**
+	 * Sub-structure containing metrics of a single Subtask.
+	 */
+	public static class SubtaskMetricStore extends ComponentMetricStore {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
new file mode 100644
index 0000000..90bafb7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns for a given task manager a list of all available metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
+
+	private static final String TASKMANAGER_METRICS_REST_PATH = "/taskmanagers/:taskmanagerid/metrics";
+
+	public TaskManagerMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor, fetcher);
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{TASKMANAGER_METRICS_REST_PATH};
+	}
+
+	@Override
+	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+		MetricStore.TaskManagerMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY));
+		if (taskManager == null) {
+			return null;
+		} else {
+			return taskManager.metrics;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
new file mode 100644
index 0000000..e2aaaf7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.util;
+
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.rest.handler.legacy.JobVertexDetailsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * This class is a mutable version of the {@link IOMetrics} class that allows adding up IO-related metrics.
+ *
+ * <p>For finished jobs these metrics are stored in the {@link ExecutionGraph} as another {@link IOMetrics}.
+ * For running jobs these metrics are retrieved using the {@link MetricFetcher}.
+ *
+ * <p>This class provides a common interface to handle both cases, reducing complexity in various handlers (like
+ * the {@link JobVertexDetailsHandler}).
+ */
+public class MutableIOMetrics extends IOMetrics {
+
+	private static final long serialVersionUID = -5460777634971381737L;
+
+	public MutableIOMetrics() {
+		super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
+	}
+
+	/**
+	 * Adds the IO metrics for the given attempt to this object. If the {@link AccessExecution} is in
+	 * a terminal state the contained {@link IOMetrics} object is added. Otherwise the given {@link MetricFetcher} is
+	 * used to retrieve the required metrics.
+	 *
+	 * @param attempt Attempt whose IO metrics should be added
+	 * @param fetcher MetricFetcher to retrieve metrics for running jobs
+	 * @param jobID JobID to which the attempt belongs
+	 * @param taskID TaskID to which the attempt belongs
+	 */
+	public void addIOMetrics(AccessExecution attempt, @Nullable MetricFetcher fetcher, String jobID, String taskID) {
+		if (attempt.getState().isTerminal()) {
+			IOMetrics ioMetrics = attempt.getIOMetrics();
+			if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
+				this.numBytesInLocal += ioMetrics.getNumBytesInLocal();
+				this.numBytesInRemote += ioMetrics.getNumBytesInRemote();
+				this.numBytesOut += ioMetrics.getNumBytesOut();
+				this.numRecordsIn += ioMetrics.getNumRecordsIn();
+				this.numRecordsOut += ioMetrics.getNumRecordsOut();
+			}
+		} else { // execAttempt is still running, use MetricQueryService instead
+			if (fetcher != null) {
+				fetcher.update();
+				MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex());
+				if (metrics != null) {
+					this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
+					this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+					this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+					this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+					this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+				}
+			}
+		}
+	}
+
+	/**
+	 * Writes the IO metrics contained in this object to the given {@link JsonGenerator}.
+	 *
+	 * <p>The JSON structure written is as follows:
+	 * "metrics": {
+	 *     "read-bytes": 1,
+	 *     "write-bytes": 2,
+	 *     "read-records": 3,
+	 *     "write-records": 4
+	 * }
+	 *
+	 * @param gen JsonGenerator to which the metrics should be written
+	 * @throws IOException
+	 */
+	public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException {
+		gen.writeObjectFieldStart("metrics");
+		gen.writeNumberField("read-bytes", this.numBytesInLocal + this.numBytesInRemote);
+		gen.writeNumberField("write-bytes", this.numBytesOut);
+		gen.writeNumberField("read-records", this.numRecordsIn);
+		gen.writeNumberField("write-records", this.numRecordsOut);
+		gen.writeEndObject();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java
new file mode 100644
index 0000000..5bfa1f9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the ClusterOverviewHandler.
+ */
+public class ClusterOverviewHandlerTest {
+	@Test
+	public void testGetPaths() {
+		ClusterOverviewHandler handler = new ClusterOverviewHandler(Executors.directExecutor(), Time.seconds(0L));
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/overview", paths[0]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java
new file mode 100644
index 0000000..0ada30d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the CurrentJobIdsHandler.
+ */
+public class CurrentJobIdsHandlerTest {
+	@Test
+	public void testGetPaths() {
+		CurrentJobIdsHandler handler = new CurrentJobIdsHandler(Executors.directExecutor(), Time.seconds(0L));
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs", paths[0]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
new file mode 100644
index 0000000..83bb157
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+
+/**
+ * Tests for the CurrentJobsOverviewHandler.
+ */
+public class CurrentJobsOverviewHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/joboverview", archive.getPath());
+
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(archive.getJson());
+		ArrayNode running = (ArrayNode) result.get("running");
+		Assert.assertEquals(0, running.size());
+
+		ArrayNode finished = (ArrayNode) result.get("finished");
+		Assert.assertEquals(1, finished.size());
+
+		compareJobOverview(expectedDetails, finished.get(0).toString());
+	}
+
+	@Test
+	public void testGetPaths() {
+		CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, true);
+		String[] pathsAll = handlerAll.getPaths();
+		Assert.assertEquals(1, pathsAll.length);
+		Assert.assertEquals("/joboverview", pathsAll[0]);
+
+		CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, false);
+		String[] pathsRunning = handlerRunning.getPaths();
+		Assert.assertEquals(1, pathsRunning.length);
+		Assert.assertEquals("/joboverview/running", pathsRunning[0]);
+
+		CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), false, true);
+		String[] pathsCompleted = handlerCompleted.getPaths();
+		Assert.assertEquals(1, pathsCompleted.length);
+		Assert.assertEquals("/joboverview/completed", pathsCompleted[0]);
+	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
+		StringWriter writer = new StringWriter();
+		try (JsonGenerator gen = ArchivedJobGenerationUtils.JACKSON_FACTORY.createGenerator(writer)) {
+			CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 0);
+		}
+		compareJobOverview(expectedDetails, writer.toString());
+	}
+
+	private static void compareJobOverview(JobDetails expectedDetails, String answer) throws IOException {
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(answer);
+
+		Assert.assertEquals(expectedDetails.getJobId().toString(), result.get("jid").asText());
+		Assert.assertEquals(expectedDetails.getJobName(), result.get("name").asText());
+		Assert.assertEquals(expectedDetails.getStatus().name(), result.get("state").asText());
+
+		Assert.assertEquals(expectedDetails.getStartTime(), result.get("start-time").asLong());
+		Assert.assertEquals(expectedDetails.getEndTime(), result.get("end-time").asLong());
+		Assert.assertEquals(expectedDetails.getEndTime() - expectedDetails.getStartTime(), result.get("duration").asLong());
+		Assert.assertEquals(expectedDetails.getLastUpdateTime(), result.get("last-modification").asLong());
+
+		JsonNode tasks = result.get("tasks");
+		Assert.assertEquals(expectedDetails.getNumTasks(), tasks.get("total").asInt());
+		int[] tasksPerState = expectedDetails.getNumVerticesPerExecutionState();
+		Assert.assertEquals(
+			tasksPerState[ExecutionState.CREATED.ordinal()] + tasksPerState[ExecutionState.SCHEDULED.ordinal()] + tasksPerState[ExecutionState.DEPLOYING.ordinal()],
+			tasks.get("pending").asInt());
+		Assert.assertEquals(tasksPerState[ExecutionState.RUNNING.ordinal()], tasks.get("running").asInt());
+		Assert.assertEquals(tasksPerState[ExecutionState.FINISHED.ordinal()], tasks.get("finished").asInt());
+		Assert.assertEquals(tasksPerState[ExecutionState.CANCELING.ordinal()], tasks.get("canceling").asInt());
+		Assert.assertEquals(tasksPerState[ExecutionState.CANCELED.ordinal()], tasks.get("canceled").asInt());
+		Assert.assertEquals(tasksPerState[ExecutionState.FAILED.ordinal()], tasks.get("failed").asInt());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
new file mode 100644
index 0000000..06a99fe
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.TimeZone;
+
+/**
+ * Tests for the DashboardConfigHandler.
+ */
+public class DashboardConfigHandlerTest {
+	@Test
+	public void testGetPaths() {
+		DashboardConfigHandler handler = new DashboardConfigHandler(Executors.directExecutor(), 10000L);
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/config", paths[0]);
+	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		long refreshInterval = 12345;
+		TimeZone timeZone = TimeZone.getDefault();
+		EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
+
+		String json = DashboardConfigHandler.createConfigJson(refreshInterval);
+
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+		Assert.assertEquals(refreshInterval, result.get("refresh-interval").asLong());
+		Assert.assertEquals(timeZone.getDisplayName(), result.get("timezone-name").asText());
+		Assert.assertEquals(timeZone.getRawOffset(), result.get("timezone-offset").asLong());
+		Assert.assertEquals(EnvironmentInformation.getVersion(), result.get("flink-version").asText());
+		Assert.assertEquals(revision.commitId + " @ " + revision.commitDate, result.get("flink-revision").asText());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java
new file mode 100644
index 0000000..7e96835
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the HandlerRedirectUtils.
+ */
+public class HandlerRedirectUtilsTest extends TestLogger {
+
+	private static final String localRestAddress = "http://127.0.0.1:1234";
+	private static final String remoteRestAddress = "http://127.0.0.2:1234";
+
+	@Test
+	public void testGetRedirectAddressWithLocalEqualsRemoteRESTAddress() throws Exception {
+		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
+		when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(localRestAddress));
+
+		CompletableFuture<Optional<String>> redirectingAddressFuture = HandlerRedirectUtils.getRedirectAddress(
+			localRestAddress,
+			jobManagerGateway,
+			Time.seconds(3L));
+
+		Assert.assertTrue(redirectingAddressFuture.isDone());
+		// no redirection needed
+		Assert.assertFalse(redirectingAddressFuture.get().isPresent());
+	}
+
+	@Test
+	public void testGetRedirectAddressWithRemoteAkkaPath() throws Exception {
+		JobManagerGateway jobManagerGateway = mock(AkkaJobManagerGateway.class);
+		when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(remoteRestAddress));
+
+		CompletableFuture<Optional<String>> optRedirectingAddress = HandlerRedirectUtils.getRedirectAddress(
+			localRestAddress,
+			jobManagerGateway,
+			Time.seconds(3L));
+
+		Assert.assertTrue(optRedirectingAddress.isDone());
+
+		Assert.assertEquals(remoteRestAddress, optRedirectingAddress.get().get());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..e1736c1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobAccumulatorsHandler.
+ */
+public class JobAccumulatorsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/accumulators", archive.getPath());
+		compareAccumulators(originalJob, archive.getJson());
+	}
+
+	@Test
+	public void testGetPaths() {
+		JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]);
+	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		String json = JobAccumulatorsHandler.createJobAccumulatorsJson(originalJob);
+
+		compareAccumulators(originalJob, json);
+	}
+
+	private static void compareAccumulators(AccessExecutionGraph originalJob, String json) throws IOException {
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+		ArrayNode accs = (ArrayNode) result.get("job-accumulators");
+		Assert.assertEquals(0, accs.size());
+
+		Assert.assertTrue(originalJob.getAccumulatorResultsStringified().length > 0);
+		ArchivedJobGenerationUtils.compareStringifiedAccumulators(
+			originalJob.getAccumulatorResultsStringified(),
+			(ArrayNode) result.get("user-task-accumulators"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java
new file mode 100644
index 0000000..cab8835
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Tests for the JobCancellationHandler.
+ */
+public class JobCancellationHandlerTest {
+	@Test
+	public void testGetPaths() {
+		JobCancellationHandler handler = new JobCancellationHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(2, paths.length);
+		List<String> pathsList = Lists.newArrayList(paths);
+		Assert.assertTrue(pathsList.contains("/jobs/:jobid/cancel"));
+		Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-cancel"));
+	}
+}