You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/19 22:44:15 UTC
[05/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler
to flink-runtime
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
new file mode 100644
index 0000000..da115ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.files;
+
+/*****************************************************************************
+ * This code is based on the "HttpStaticFileServerHandler" from the
+ * Netty project's HTTP server example.
+ *
+ * See http://netty.io and
+ * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
+ *****************************************************************************/
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.util.MimeTypes;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
+import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.Locale;
+import java.util.TimeZone;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.DATE;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple file server handler that serves requests to web frontend's static files, such as
+ * HTML, CSS, or JS files.
+ *
+ * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
+ * example.</p>
+ */
+@ChannelHandler.Sharable
+public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectHandler<T> {
+
+ /** Timezone in which this server answers its "if-modified" requests. */
+ private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT");
+
+ /** Date format for HTTP. */
+ public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
+
+ /** Be default, we allow files to be cached for 5 minutes. */
+ private static final int HTTP_CACHE_SECONDS = 300;
+
+ // ------------------------------------------------------------------------
+
+ /** The path in which the static documents are. */
+ private final File rootPath;
+
+ public StaticFileServerHandler(
+ GatewayRetriever<T> retriever,
+ CompletableFuture<String> localJobManagerAddressFuture,
+ Time timeout,
+ File rootPath) throws IOException {
+
+ super(localJobManagerAddressFuture, retriever, timeout);
+
+ this.rootPath = checkNotNull(rootPath).getCanonicalFile();
+ }
+
+ // ------------------------------------------------------------------------
+ // Responses to requests
+ // ------------------------------------------------------------------------
+
+ @Override
+ protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception {
+ final HttpRequest request = routed.request();
+ final String requestPath;
+
+ // make sure we request the "index.html" in case there is a directory request
+ if (routed.path().endsWith("/")) {
+ requestPath = routed.path() + "index.html";
+ }
+ // in case the files being accessed are logs or stdout files, find appropriate paths.
+ else if (routed.path().equals("/jobmanager/log") || routed.path().equals("/jobmanager/stdout")) {
+ requestPath = "";
+ } else {
+ requestPath = routed.path();
+ }
+
+ respondToRequest(channelHandlerContext, request, requestPath);
+ }
+
+ /**
+ * Response when running with leading JobManager.
+ */
+ private void respondToRequest(ChannelHandlerContext ctx, HttpRequest request, String requestPath)
+ throws IOException, ParseException, URISyntaxException {
+
+ // convert to absolute path
+ final File file = new File(rootPath, requestPath);
+
+ if (!file.exists()) {
+ // file does not exist. Try to load it with the classloader
+ ClassLoader cl = StaticFileServerHandler.class.getClassLoader();
+
+ try (InputStream resourceStream = cl.getResourceAsStream("web" + requestPath)) {
+ boolean success = false;
+ try {
+ if (resourceStream != null) {
+ URL root = cl.getResource("web");
+ URL requested = cl.getResource("web" + requestPath);
+
+ if (root != null && requested != null) {
+ URI rootURI = new URI(root.getPath()).normalize();
+ URI requestedURI = new URI(requested.getPath()).normalize();
+
+ // Check that we don't load anything from outside of the
+ // expected scope.
+ if (!rootURI.relativize(requestedURI).equals(requestedURI)) {
+ logger.debug("Loading missing file from classloader: {}", requestPath);
+ // ensure that directory to file exists.
+ file.getParentFile().mkdirs();
+ Files.copy(resourceStream, file.toPath());
+
+ success = true;
+ }
+ }
+ }
+ } catch (Throwable t) {
+ logger.error("error while responding", t);
+ } finally {
+ if (!success) {
+ logger.debug("Unable to load requested file {} from classloader", requestPath);
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+ }
+ }
+ }
+
+ if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+
+ if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+
+ // cache validation
+ final String ifModifiedSince = request.headers().get(IF_MODIFIED_SINCE);
+ if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
+ SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
+ Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince);
+
+ // Only compare up to the second because the datetime format we send to the client
+ // does not have milliseconds
+ long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000;
+ long fileLastModifiedSeconds = file.lastModified() / 1000;
+ if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Responding 'NOT MODIFIED' for file '" + file.getAbsolutePath() + '\'');
+ }
+
+ sendNotModified(ctx);
+ return;
+ }
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Responding with file '" + file.getAbsolutePath() + '\'');
+ }
+
+ // Don't need to close this manually. Netty's DefaultFileRegion will take care of it.
+ final RandomAccessFile raf;
+ try {
+ raf = new RandomAccessFile(file, "r");
+ }
+ catch (FileNotFoundException e) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+
+ try {
+ long fileLength = raf.length();
+
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ setContentTypeHeader(response, file);
+
+ // since the log and out files are rapidly changing, we don't want to browser to cache them
+ if (!(requestPath.contains("log") || requestPath.contains("out"))) {
+ setDateAndCacheHeaders(response, file);
+ }
+ if (HttpHeaders.isKeepAlive(request)) {
+ response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ }
+ HttpHeaders.setContentLength(response, fileLength);
+
+ // write the initial line and the header.
+ ctx.write(response);
+
+ // write the content.
+ ChannelFuture lastContentFuture;
+ if (ctx.pipeline().get(SslHandler.class) == null) {
+ ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
+ lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ } else {
+ lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
+ ctx.newProgressivePromise());
+ // HttpChunkedInput will write the end marker (LastHttpContent) for us.
+ }
+
+ // close the connection, if no keep-alive is needed
+ if (!HttpHeaders.isKeepAlive(request)) {
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ } catch (Exception e) {
+ raf.close();
+ logger.error("Failed to serve file.", e);
+ sendError(ctx, INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ if (ctx.channel().isActive()) {
+ logger.error("Caught exception", cause);
+ sendError(ctx, INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities to encode headers and responses
+ // ------------------------------------------------------------------------
+
+ /**
+ * Writes a simple error response message.
+ *
+ * @param ctx The channel context to write the response to.
+ * @param status The response status.
+ */
+ public static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+ FullHttpResponse response = new DefaultFullHttpResponse(
+ HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
+ response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
+
+ // close the connection as soon as the error message is sent.
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ /**
+ * Send the "304 Not Modified" response. This response can be used when the
+ * file timestamp is the same as what the browser is sending up.
+ *
+ * @param ctx The channel context to write the response to.
+ */
+ public static void sendNotModified(ChannelHandlerContext ctx) {
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);
+ setDateHeader(response);
+
+ // close the connection as soon as the error message is sent.
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ /**
+ * Sets the "date" header for the HTTP response.
+ *
+ * @param response HTTP response
+ */
+ public static void setDateHeader(FullHttpResponse response) {
+ SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
+ dateFormatter.setTimeZone(GMT_TIMEZONE);
+
+ Calendar time = new GregorianCalendar();
+ response.headers().set(DATE, dateFormatter.format(time.getTime()));
+ }
+
+ /**
+ * Sets the "date" and "cache" headers for the HTTP Response.
+ *
+ * @param response The HTTP response object.
+ * @param fileToCache File to extract the modification timestamp from.
+ */
+ public static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) {
+ SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
+ dateFormatter.setTimeZone(GMT_TIMEZONE);
+
+ // date header
+ Calendar time = new GregorianCalendar();
+ response.headers().set(DATE, dateFormatter.format(time.getTime()));
+
+ // cache headers
+ time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
+ response.headers().set(EXPIRES, dateFormatter.format(time.getTime()));
+ response.headers().set(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
+ response.headers().set(LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));
+ }
+
+ /**
+ * Sets the content type header for the HTTP Response.
+ *
+ * @param response HTTP response
+ * @param file file to extract content type
+ */
+ public static void setContentTypeHeader(HttpResponse response, File file) {
+ String mimeType = MimeTypes.getMimeTypeForFileName(file.getName());
+ String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType();
+ response.headers().set(CONTENT_TYPE, mimeFinal);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
new file mode 100644
index 0000000..315bdc2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract request handler that returns a list of all available metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
+ private final MetricFetcher fetcher;
+
+ public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor);
+ this.fetcher = Preconditions.checkNotNull(fetcher);
+ }
+
+ @Override
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ fetcher.update();
+ String requestedMetricsList = queryParams.get("get");
+ try {
+ return requestedMetricsList != null
+ ? getMetricsValues(pathParams, requestedMetricsList)
+ : getAvailableMetricsList(pathParams);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not retrieve metrics.", e);
+ }
+ },
+ executor);
+
+ }
+
+ /**
+ * Returns a Map containing the metrics belonging to the entity pointed to by the path parameters.
+ *
+ * @param pathParams REST path parameters
+ * @param metrics MetricStore containing all metrics
+ * @return Map containing metrics, or null if no metric exists
+ */
+ protected abstract Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics);
+
+ private String getMetricsValues(Map<String, String> pathParams, String requestedMetricsList) throws IOException {
+ if (requestedMetricsList.isEmpty()) {
+ /*
+ * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a
+ * request for which the "get" parameter is an empty string.
+ */
+ return "";
+ }
+ MetricStore metricStore = fetcher.getMetricStore();
+ synchronized (metricStore) {
+ Map<String, String> metrics = getMapFor(pathParams, metricStore);
+ if (metrics == null) {
+ return "";
+ }
+ String[] requestedMetrics = requestedMetricsList.split(",");
+
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ gen.writeStartArray();
+ for (String requestedMetric : requestedMetrics) {
+ Object metricValue = metrics.get(requestedMetric);
+ if (metricValue != null) {
+ gen.writeStartObject();
+ gen.writeStringField("id", requestedMetric);
+ gen.writeStringField("value", metricValue.toString());
+ gen.writeEndObject();
+ }
+ }
+ gen.writeEndArray();
+
+ gen.close();
+ return writer.toString();
+ }
+ }
+
+ private String getAvailableMetricsList(Map<String, String> pathParams) throws IOException {
+ MetricStore metricStore = fetcher.getMetricStore();
+ synchronized (metricStore) {
+ Map<String, String> metrics = getMapFor(pathParams, metricStore);
+ if (metrics == null) {
+ return "";
+ }
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ gen.writeStartArray();
+ for (String m : metrics.keySet()) {
+ gen.writeStartObject();
+ gen.writeStringField("id", m);
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+
+ gen.close();
+ return writer.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
new file mode 100644
index 0000000..c568ee0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns for the job manager a list of all available metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobManagerMetricsHandler extends AbstractMetricsHandler {
+
+ private static final String JOBMANAGER_METRICS_REST_PATH = "/jobmanager/metrics";
+
+ public JobManagerMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{JOBMANAGER_METRICS_REST_PATH};
+ }
+
+ @Override
+ protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+ MetricStore.JobManagerMetricStore jobManager = metrics.getJobManagerMetricStore();
+ if (jobManager == null) {
+ return null;
+ } else {
+ return jobManager.metrics;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
new file mode 100644
index 0000000..7341eb8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobMetricsHandler extends AbstractMetricsHandler {
+ public static final String PARAMETER_JOB_ID = "jobid";
+ private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics";
+
+ public JobMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{JOB_METRICS_REST_PATH};
+ }
+
+ @Override
+ protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+ MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
+ return job != null
+ ? job.metrics
+ : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
new file mode 100644
index 0000000..3a701ab
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobVertexMetricsHandler extends AbstractMetricsHandler {
+ public static final String PARAMETER_VERTEX_ID = "vertexid";
+ private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics";
+
+ public JobVertexMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{JOB_VERTEX_METRICS_REST_PATH};
+ }
+
+ @Override
+ protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+ MetricStore.TaskMetricStore task = metrics.getTaskMetricStore(
+ pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID),
+ pathParams.get(PARAMETER_VERTEX_ID));
+ return task != null
+ ? task.metrics
+ : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
new file mode 100644
index 0000000..9f53808
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
+
+/**
+ * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers.
+ *
+ * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since
+ * the last call has passed.
+ */
+public class MetricFetcher {
+ private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
+
+ private final GatewayRetriever<JobManagerGateway> retriever;
+ private final MetricQueryServiceRetriever queryServiceRetriever;
+ private final Executor executor;
+ private final Time timeout;
+
+ private final MetricStore metrics = new MetricStore();
+ private final MetricDumpDeserializer deserializer = new MetricDumpDeserializer();
+
+ private long lastUpdateTime;
+
+ public MetricFetcher(
+ GatewayRetriever<JobManagerGateway> retriever,
+ MetricQueryServiceRetriever queryServiceRetriever,
+ Executor executor,
+ Time timeout) {
+ this.retriever = Preconditions.checkNotNull(retriever);
+ this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ }
+
+ /**
+ * Returns the MetricStore containing all stored metrics.
+ *
+ * @return MetricStore containing all stored metrics;
+ */
+ public MetricStore getMetricStore() {
+ return metrics;
+ }
+
+ /**
+ * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated.
+ */
+ public void update() {
+ synchronized (this) {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update
+ lastUpdateTime = currentTime;
+ fetchMetrics();
+ }
+ }
+ }
+
+ private void fetchMetrics() {
+ try {
+ Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();
+ if (optJobManagerGateway.isPresent()) {
+ final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
+
+ /**
+ * Remove all metrics that belong to a job that is not running and no longer archived.
+ */
+ CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
+
+ jobDetailsFuture.whenCompleteAsync(
+ (MultipleJobsDetails jobDetails, Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.debug("Fetching of JobDetails failed.", throwable);
+ } else {
+ ArrayList<String> toRetain = new ArrayList<>();
+ for (JobDetails job : jobDetails.getRunningJobs()) {
+ toRetain.add(job.getJobId().toString());
+ }
+ for (JobDetails job : jobDetails.getFinishedJobs()) {
+ toRetain.add(job.getJobId().toString());
+ }
+ synchronized (metrics) {
+ metrics.jobs.keySet().retainAll(toRetain);
+ }
+ }
+ },
+ executor);
+
+ String jobManagerPath = jobManagerGateway.getAddress();
+ String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
+
+ retrieveAndQueryMetrics(jmQueryServicePath);
+
+ /**
+ * We first request the list of all registered task managers from the job manager, and then
+ * request the respective metric dump from each task manager.
+ *
+ * <p>All stored metrics that do not belong to a registered task manager will be removed.
+ */
+ CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+ taskManagersFuture.whenCompleteAsync(
+ (Collection<Instance> taskManagers, Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.debug("Fetching list of registered TaskManagers failed.", throwable);
+ } else {
+ List<String> activeTaskManagers = taskManagers.stream().map(
+ taskManagerInstance -> {
+ final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress();
+ final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString();
+
+ retrieveAndQueryMetrics(tmQueryServicePath);
+
+ return taskManagerInstance.getId().toString();
+ }).collect(Collectors.toList());
+
+ synchronized (metrics) {
+ metrics.taskManagers.keySet().retainAll(activeTaskManagers);
+ }
+ }
+ },
+ executor);
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception while fetching metrics.", e);
+ }
+ }
+
+ /**
+ * Retrieves and queries the specified QueryServiceGateway.
+ *
+ * @param queryServicePath specifying the QueryServiceGateway
+ */
+ private void retrieveAndQueryMetrics(String queryServicePath) {
+ final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
+
+ queryServiceGatewayFuture.whenCompleteAsync(
+ (MetricQueryServiceGateway queryServiceGateway, Throwable t) -> {
+ if (t != null) {
+ LOG.debug("Could not retrieve QueryServiceGateway.", t);
+ } else {
+ queryMetrics(queryServiceGateway);
+ }
+ },
+ executor);
+ }
+
+ /**
+ * Query the metrics from the given QueryServiceGateway.
+ *
+ * @param queryServiceGateway to query for metrics
+ */
+ private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
+ queryServiceGateway
+ .queryMetrics(timeout)
+ .whenCompleteAsync(
+ (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
+ if (t != null) {
+ LOG.debug("Fetching metrics failed.", t);
+ } else {
+ List<MetricDump> dumpedMetrics = deserializer.deserialize(result);
+ synchronized (metrics) {
+ for (MetricDump metric : dumpedMetrics) {
+ metrics.add(metric);
+ }
+ }
+ }
+ },
+ executor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
new file mode 100644
index 0000000..6d3fc99
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * <p>This structure is not thread-safe.
+ */
+public class MetricStore {
+ private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
+
+ final JobManagerMetricStore jobManager = new JobManagerMetricStore();
+ final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
+ final Map<String, JobMetricStore> jobs = new HashMap<>();
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // Adding metrics
+ // -----------------------------------------------------------------------------------------------------------------
+ public void add(MetricDump metric) {
+ try {
+ QueryScopeInfo info = metric.scopeInfo;
+ TaskManagerMetricStore tm;
+ JobMetricStore job;
+ TaskMetricStore task;
+ SubtaskMetricStore subtask;
+
+ String name = info.scope.isEmpty()
+ ? metric.name
+ : info.scope + "." + metric.name;
+
+ if (name.isEmpty()) { // malformed transmission
+ return;
+ }
+
+ switch (info.getCategory()) {
+ case INFO_CATEGORY_JM:
+ addMetric(jobManager.metrics, name, metric);
+ break;
+ case INFO_CATEGORY_TM:
+ String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
+ tm = taskManagers.get(tmID);
+ if (tm == null) {
+ tm = new TaskManagerMetricStore();
+ taskManagers.put(tmID, tm);
+ }
+ if (name.contains("GarbageCollector")) {
+ String gcName = name.substring("Status.JVM.GarbageCollector.".length(), name.lastIndexOf('.'));
+ tm.addGarbageCollectorName(gcName);
+ }
+ addMetric(tm.metrics, name, metric);
+ break;
+ case INFO_CATEGORY_JOB:
+ QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
+ job = jobs.get(jobInfo.jobID);
+ if (job == null) {
+ job = new JobMetricStore();
+ jobs.put(jobInfo.jobID, job);
+ }
+ addMetric(job.metrics, name, metric);
+ break;
+ case INFO_CATEGORY_TASK:
+ QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
+ job = jobs.get(taskInfo.jobID);
+ if (job == null) {
+ job = new JobMetricStore();
+ jobs.put(taskInfo.jobID, job);
+ }
+ task = job.tasks.get(taskInfo.vertexID);
+ if (task == null) {
+ task = new TaskMetricStore();
+ job.tasks.put(taskInfo.vertexID, task);
+ }
+ subtask = task.subtasks.get(taskInfo.subtaskIndex);
+ if (subtask == null) {
+ subtask = new SubtaskMetricStore();
+ task.subtasks.put(taskInfo.subtaskIndex, subtask);
+ }
+ /**
+ * The duplication is intended. Metrics scoped by subtask are useful for several job/task handlers,
+ * while the WebInterface task metric queries currently do not account for subtasks, so we don't
+ * divide by subtask and instead use the concatenation of subtask index and metric name as the name
+ * for those.
+ */
+ addMetric(subtask.metrics, name, metric);
+ addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric);
+ break;
+ case INFO_CATEGORY_OPERATOR:
+ QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
+ job = jobs.get(operatorInfo.jobID);
+ if (job == null) {
+ job = new JobMetricStore();
+ jobs.put(operatorInfo.jobID, job);
+ }
+ task = job.tasks.get(operatorInfo.vertexID);
+ if (task == null) {
+ task = new TaskMetricStore();
+ job.tasks.put(operatorInfo.vertexID, task);
+ }
+ /**
+ * As the WebInterface does not account for operators (because it can't) we don't
+ * divide by operator and instead use the concatenation of subtask index, operator name and metric name
+ * as the name.
+ */
+ addMetric(task.metrics, operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, metric);
+ break;
+ default:
+ LOG.debug("Invalid metric dump category: " + info.getCategory());
+ }
+ } catch (Exception e) {
+ LOG.debug("Malformed metric dump.", e);
+ }
+ }
+
+ private void addMetric(Map<String, String> target, String name, MetricDump metric) {
+ switch (metric.getCategory()) {
+ case METRIC_CATEGORY_COUNTER:
+ MetricDump.CounterDump counter = (MetricDump.CounterDump) metric;
+ target.put(name, String.valueOf(counter.count));
+ break;
+ case METRIC_CATEGORY_GAUGE:
+ MetricDump.GaugeDump gauge = (MetricDump.GaugeDump) metric;
+ target.put(name, gauge.value);
+ break;
+ case METRIC_CATEGORY_HISTOGRAM:
+ MetricDump.HistogramDump histogram = (MetricDump.HistogramDump) metric;
+ target.put(name + "_min", String.valueOf(histogram.min));
+ target.put(name + "_max", String.valueOf(histogram.max));
+ target.put(name + "_mean", String.valueOf(histogram.mean));
+ target.put(name + "_median", String.valueOf(histogram.median));
+ target.put(name + "_stddev", String.valueOf(histogram.stddev));
+ target.put(name + "_p75", String.valueOf(histogram.p75));
+ target.put(name + "_p90", String.valueOf(histogram.p90));
+ target.put(name + "_p95", String.valueOf(histogram.p95));
+ target.put(name + "_p98", String.valueOf(histogram.p98));
+ target.put(name + "_p99", String.valueOf(histogram.p99));
+ target.put(name + "_p999", String.valueOf(histogram.p999));
+ break;
+ case METRIC_CATEGORY_METER:
+ MetricDump.MeterDump meter = (MetricDump.MeterDump) metric;
+ target.put(name, String.valueOf(meter.rate));
+ break;
+ }
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // Accessors for sub MetricStores
+ // -----------------------------------------------------------------------------------------------------------------
+
+ /**
+ * Returns the {@link JobManagerMetricStore}.
+ *
+ * @return JobManagerMetricStore
+ */
+ public JobManagerMetricStore getJobManagerMetricStore() {
+ return jobManager;
+ }
+
+ /**
+ * Returns the {@link TaskManagerMetricStore} for the given taskmanager ID.
+ *
+ * @param tmID taskmanager ID
+ * @return TaskManagerMetricStore for the given ID, or null if no store for the given argument exists
+ */
+ public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) {
+ return taskManagers.get(tmID);
+ }
+
+ /**
+ * Returns the {@link JobMetricStore} for the given job ID.
+ *
+ * @param jobID job ID
+ * @return JobMetricStore for the given ID, or null if no store for the given argument exists
+ */
+ public JobMetricStore getJobMetricStore(String jobID) {
+ return jobs.get(jobID);
+ }
+
+ /**
+ * Returns the {@link TaskMetricStore} for the given job/task ID.
+ *
+ * @param jobID job ID
+ * @param taskID task ID
+ * @return TaskMetricStore for given IDs, or null if no store for the given arguments exists
+ */
+ public TaskMetricStore getTaskMetricStore(String jobID, String taskID) {
+ JobMetricStore job = getJobMetricStore(jobID);
+ if (job == null) {
+ return null;
+ }
+ return job.getTaskMetricStore(taskID);
+ }
+
+ /**
+ * Returns the {@link SubtaskMetricStore} for the given job/task ID and subtask index.
+ *
+ * @param jobID job ID
+ * @param taskID task ID
+ * @param subtaskIndex subtask index
+ * @return SubtaskMetricStore for the given IDs and index, or null if no store for the given arguments exists
+ */
+ public SubtaskMetricStore getSubtaskMetricStore(String jobID, String taskID, int subtaskIndex) {
+ TaskMetricStore task = getTaskMetricStore(jobID, taskID);
+ if (task == null) {
+ return null;
+ }
+ return task.getSubtaskMetricStore(subtaskIndex);
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // sub MetricStore classes
+ // -----------------------------------------------------------------------------------------------------------------
+ private abstract static class ComponentMetricStore {
+ public final Map<String, String> metrics = new HashMap<>();
+
+ public String getMetric(String name, String defaultValue) {
+ String value = this.metrics.get(name);
+ return value != null
+ ? value
+ : defaultValue;
+ }
+ }
+
+ /**
+ * Sub-structure containing metrics of the JobManager.
+ */
+ public static class JobManagerMetricStore extends ComponentMetricStore {
+ }
+
+ /**
+ * Sub-structure containing metrics of a single TaskManager.
+ */
+ public static class TaskManagerMetricStore extends ComponentMetricStore {
+ public final Set<String> garbageCollectorNames = new HashSet<>();
+
+ public void addGarbageCollectorName(String name) {
+ garbageCollectorNames.add(name);
+ }
+ }
+
+ /**
+ * Sub-structure containing metrics of a single Job.
+ */
+ public static class JobMetricStore extends ComponentMetricStore {
+ private final Map<String, TaskMetricStore> tasks = new HashMap<>();
+
+ public TaskMetricStore getTaskMetricStore(String taskID) {
+ return tasks.get(taskID);
+ }
+ }
+
+ /**
+ * Sub-structure containing metrics of a single Task.
+ */
+ public static class TaskMetricStore extends ComponentMetricStore {
+ private final Map<Integer, SubtaskMetricStore> subtasks = new HashMap<>();
+
+ public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) {
+ return subtasks.get(subtaskIndex);
+ }
+ }
+
+ /**
+ * Sub-structure containing metrics of a single Subtask.
+ */
+ public static class SubtaskMetricStore extends ComponentMetricStore {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
new file mode 100644
index 0000000..90bafb7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns for a given task manager a list of all available metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
+
+ private static final String TASKMANAGER_METRICS_REST_PATH = "/taskmanagers/:taskmanagerid/metrics";
+
+ public TaskManagerMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{TASKMANAGER_METRICS_REST_PATH};
+ }
+
+ @Override
+ protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+ MetricStore.TaskManagerMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY));
+ if (taskManager == null) {
+ return null;
+ } else {
+ return taskManager.metrics;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
new file mode 100644
index 0000000..e2aaaf7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.util;
+
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.rest.handler.legacy.JobVertexDetailsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * This class is a mutable version of the {@link IOMetrics} class that allows adding up IO-related metrics.
+ *
+ * <p>For finished jobs these metrics are stored in the {@link ExecutionGraph} as another {@link IOMetrics}.
+ * For running jobs these metrics are retrieved using the {@link MetricFetcher}.
+ *
+ * <p>This class provides a common interface to handle both cases, reducing complexity in various handlers (like
+ * the {@link JobVertexDetailsHandler}).
+ */
+public class MutableIOMetrics extends IOMetrics {
+
+ private static final long serialVersionUID = -5460777634971381737L;
+
+ public MutableIOMetrics() {
+ super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
+ }
+
+ /**
+ * Adds the IO metrics for the given attempt to this object. If the {@link AccessExecution} is in
+ * a terminal state the contained {@link IOMetrics} object is added. Otherwise the given {@link MetricFetcher} is
+ * used to retrieve the required metrics.
+ *
+ * @param attempt Attempt whose IO metrics should be added
+ * @param fetcher MetricFetcher to retrieve metrics for running jobs
+ * @param jobID JobID to which the attempt belongs
+ * @param taskID TaskID to which the attempt belongs
+ */
+ public void addIOMetrics(AccessExecution attempt, @Nullable MetricFetcher fetcher, String jobID, String taskID) {
+ if (attempt.getState().isTerminal()) {
+ IOMetrics ioMetrics = attempt.getIOMetrics();
+ if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
+ this.numBytesInLocal += ioMetrics.getNumBytesInLocal();
+ this.numBytesInRemote += ioMetrics.getNumBytesInRemote();
+ this.numBytesOut += ioMetrics.getNumBytesOut();
+ this.numRecordsIn += ioMetrics.getNumRecordsIn();
+ this.numRecordsOut += ioMetrics.getNumRecordsOut();
+ }
+ } else { // execAttempt is still running, use MetricQueryService instead
+ if (fetcher != null) {
+ fetcher.update();
+ MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex());
+ if (metrics != null) {
+ this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
+ this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+ this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+ this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+ this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+ }
+ }
+ }
+ }
+
+ /**
+ * Writes the IO metrics contained in this object to the given {@link JsonGenerator}.
+ *
+ * <p>The JSON structure written is as follows:
+ * "metrics": {
+ * "read-bytes": 1,
+ * "write-bytes": 2,
+ * "read-records": 3,
+ * "write-records": 4
+ * }
+ *
+ * @param gen JsonGenerator to which the metrics should be written
+ * @throws IOException
+ */
+ public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException {
+ gen.writeObjectFieldStart("metrics");
+ gen.writeNumberField("read-bytes", this.numBytesInLocal + this.numBytesInRemote);
+ gen.writeNumberField("write-bytes", this.numBytesOut);
+ gen.writeNumberField("read-records", this.numRecordsIn);
+ gen.writeNumberField("write-records", this.numRecordsOut);
+ gen.writeEndObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java
new file mode 100644
index 0000000..5bfa1f9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the ClusterOverviewHandler.
+ */
+public class ClusterOverviewHandlerTest {
+ @Test
+ public void testGetPaths() {
+ ClusterOverviewHandler handler = new ClusterOverviewHandler(Executors.directExecutor(), Time.seconds(0L));
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/overview", paths[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java
new file mode 100644
index 0000000..0ada30d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the CurrentJobIdsHandler.
+ */
+public class CurrentJobIdsHandlerTest {
+ @Test
+ public void testGetPaths() {
+ CurrentJobIdsHandler handler = new CurrentJobIdsHandler(Executors.directExecutor(), Time.seconds(0L));
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobs", paths[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
new file mode 100644
index 0000000..83bb157
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+
+/**
+ * Tests for the CurrentJobsOverviewHandler.
+ */
+public class CurrentJobsOverviewHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/joboverview", archive.getPath());
+
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(archive.getJson());
+ ArrayNode running = (ArrayNode) result.get("running");
+ Assert.assertEquals(0, running.size());
+
+ ArrayNode finished = (ArrayNode) result.get("finished");
+ Assert.assertEquals(1, finished.size());
+
+ compareJobOverview(expectedDetails, finished.get(0).toString());
+ }
+
+ @Test
+ public void testGetPaths() {
+ CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, true);
+ String[] pathsAll = handlerAll.getPaths();
+ Assert.assertEquals(1, pathsAll.length);
+ Assert.assertEquals("/joboverview", pathsAll[0]);
+
+ CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, false);
+ String[] pathsRunning = handlerRunning.getPaths();
+ Assert.assertEquals(1, pathsRunning.length);
+ Assert.assertEquals("/joboverview/running", pathsRunning[0]);
+
+ CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), false, true);
+ String[] pathsCompleted = handlerCompleted.getPaths();
+ Assert.assertEquals(1, pathsCompleted.length);
+ Assert.assertEquals("/joboverview/completed", pathsCompleted[0]);
+ }
+
+ @Test
+ public void testJsonGeneration() throws Exception {
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
+ StringWriter writer = new StringWriter();
+ try (JsonGenerator gen = ArchivedJobGenerationUtils.JACKSON_FACTORY.createGenerator(writer)) {
+ CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 0);
+ }
+ compareJobOverview(expectedDetails, writer.toString());
+ }
+
+ private static void compareJobOverview(JobDetails expectedDetails, String answer) throws IOException {
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(answer);
+
+ Assert.assertEquals(expectedDetails.getJobId().toString(), result.get("jid").asText());
+ Assert.assertEquals(expectedDetails.getJobName(), result.get("name").asText());
+ Assert.assertEquals(expectedDetails.getStatus().name(), result.get("state").asText());
+
+ Assert.assertEquals(expectedDetails.getStartTime(), result.get("start-time").asLong());
+ Assert.assertEquals(expectedDetails.getEndTime(), result.get("end-time").asLong());
+ Assert.assertEquals(expectedDetails.getEndTime() - expectedDetails.getStartTime(), result.get("duration").asLong());
+ Assert.assertEquals(expectedDetails.getLastUpdateTime(), result.get("last-modification").asLong());
+
+ JsonNode tasks = result.get("tasks");
+ Assert.assertEquals(expectedDetails.getNumTasks(), tasks.get("total").asInt());
+ int[] tasksPerState = expectedDetails.getNumVerticesPerExecutionState();
+ Assert.assertEquals(
+ tasksPerState[ExecutionState.CREATED.ordinal()] + tasksPerState[ExecutionState.SCHEDULED.ordinal()] + tasksPerState[ExecutionState.DEPLOYING.ordinal()],
+ tasks.get("pending").asInt());
+ Assert.assertEquals(tasksPerState[ExecutionState.RUNNING.ordinal()], tasks.get("running").asInt());
+ Assert.assertEquals(tasksPerState[ExecutionState.FINISHED.ordinal()], tasks.get("finished").asInt());
+ Assert.assertEquals(tasksPerState[ExecutionState.CANCELING.ordinal()], tasks.get("canceling").asInt());
+ Assert.assertEquals(tasksPerState[ExecutionState.CANCELED.ordinal()], tasks.get("canceled").asInt());
+ Assert.assertEquals(tasksPerState[ExecutionState.FAILED.ordinal()], tasks.get("failed").asInt());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
new file mode 100644
index 0000000..06a99fe
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.TimeZone;
+
+/**
+ * Tests for the DashboardConfigHandler.
+ */
+public class DashboardConfigHandlerTest {
+ @Test
+ public void testGetPaths() {
+ DashboardConfigHandler handler = new DashboardConfigHandler(Executors.directExecutor(), 10000L);
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/config", paths[0]);
+ }
+
+ @Test
+ public void testJsonGeneration() throws Exception {
+ long refreshInterval = 12345;
+ TimeZone timeZone = TimeZone.getDefault();
+ EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
+
+ String json = DashboardConfigHandler.createConfigJson(refreshInterval);
+
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+ Assert.assertEquals(refreshInterval, result.get("refresh-interval").asLong());
+ Assert.assertEquals(timeZone.getDisplayName(), result.get("timezone-name").asText());
+ Assert.assertEquals(timeZone.getRawOffset(), result.get("timezone-offset").asLong());
+ Assert.assertEquals(EnvironmentInformation.getVersion(), result.get("flink-version").asText());
+ Assert.assertEquals(revision.commitId + " @ " + revision.commitDate, result.get("flink-revision").asText());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java
new file mode 100644
index 0000000..7e96835
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the HandlerRedirectUtils.
+ */
+public class HandlerRedirectUtilsTest extends TestLogger {
+
+ private static final String localRestAddress = "http://127.0.0.1:1234";
+ private static final String remoteRestAddress = "http://127.0.0.2:1234";
+
+ @Test
+ public void testGetRedirectAddressWithLocalEqualsRemoteRESTAddress() throws Exception {
+ JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
+ when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(localRestAddress));
+
+ CompletableFuture<Optional<String>> redirectingAddressFuture = HandlerRedirectUtils.getRedirectAddress(
+ localRestAddress,
+ jobManagerGateway,
+ Time.seconds(3L));
+
+ Assert.assertTrue(redirectingAddressFuture.isDone());
+ // no redirection needed
+ Assert.assertFalse(redirectingAddressFuture.get().isPresent());
+ }
+
+ @Test
+ public void testGetRedirectAddressWithRemoteAkkaPath() throws Exception {
+ JobManagerGateway jobManagerGateway = mock(AkkaJobManagerGateway.class);
+ when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(remoteRestAddress));
+
+ CompletableFuture<Optional<String>> optRedirectingAddress = HandlerRedirectUtils.getRedirectAddress(
+ localRestAddress,
+ jobManagerGateway,
+ Time.seconds(3L));
+
+ Assert.assertTrue(optRedirectingAddress.isDone());
+
+ Assert.assertEquals(remoteRestAddress, optRedirectingAddress.get().get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..e1736c1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobAccumulatorsHandler.
+ */
+public class JobAccumulatorsHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/accumulators", archive.getPath());
+ compareAccumulators(originalJob, archive.getJson());
+ }
+
+ @Test
+ public void testGetPaths() {
+ JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]);
+ }
+
+ @Test
+ public void testJsonGeneration() throws Exception {
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ String json = JobAccumulatorsHandler.createJobAccumulatorsJson(originalJob);
+
+ compareAccumulators(originalJob, json);
+ }
+
+ private static void compareAccumulators(AccessExecutionGraph originalJob, String json) throws IOException {
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+ ArrayNode accs = (ArrayNode) result.get("job-accumulators");
+ Assert.assertEquals(0, accs.size());
+
+ Assert.assertTrue(originalJob.getAccumulatorResultsStringified().length > 0);
+ ArchivedJobGenerationUtils.compareStringifiedAccumulators(
+ originalJob.getAccumulatorResultsStringified(),
+ (ArrayNode) result.get("user-task-accumulators"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java
new file mode 100644
index 0000000..cab8835
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Tests for the JobCancellationHandler.
+ */
+public class JobCancellationHandlerTest {
+ @Test
+ public void testGetPaths() {
+ JobCancellationHandler handler = new JobCancellationHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(2, paths.length);
+ List<String> pathsList = Lists.newArrayList(paths);
+ Assert.assertTrue(pathsList.contains("/jobs/:jobid/cancel"));
+ Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-cancel"));
+ }
+}