You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/16 19:18:23 UTC
[10/11] flink git commit: [FLINK-8370][REST] Port
AggregatingMetricsHandler to flip6
[FLINK-8370][REST] Port AggregatingMetricsHandler to flip6
This closes #5805.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0410d80
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0410d80
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0410d80
Branch: refs/heads/master
Commit: c0410d801e406e77b1e6e7134224f7946906a49f
Parents: 4645d3c
Author: zentol <ch...@apache.org>
Authored: Wed Mar 28 12:52:07 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:17:54 2018 +0200
----------------------------------------------------------------------
.../AbstractAggregatingMetricsHandler.java | 300 ++++++++++++++
.../metrics/AggregatingJobsMetricsHandler.java | 77 ++++
.../AggregatingSubtasksMetricsHandler.java | 119 ++++++
.../AggregatingTaskManagersMetricsHandler.java | 77 ++++
.../handler/job/metrics/DoubleAccumulator.java | 257 ++++++++++++
.../AbstractAggregatedMetricsHeaders.java | 50 +++
.../AbstractAggregatedMetricsParameters.java | 48 +++
.../AggregateTaskManagerMetricsParameters.java | 38 ++
.../metrics/AggregatedJobMetricsHeaders.java | 44 +++
.../metrics/AggregatedJobMetricsParameters.java | 39 ++
.../messages/job/metrics/AggregatedMetric.java | 118 ++++++
.../metrics/AggregatedMetricsResponseBody.java | 112 ++++++
.../AggregatedSubtaskMetricsHeaders.java | 47 +++
.../AggregatedSubtaskMetricsParameters.java | 51 +++
.../AggregatedTaskManagerMetricsHeaders.java | 44 +++
.../job/metrics/JobsFilterQueryParameter.java | 48 +++
.../metrics/MetricsAggregationParameter.java | 58 +++
.../metrics/SubtasksFilterQueryParameter.java | 41 ++
.../TaskManagersFilterQueryParameter.java | 42 ++
.../runtime/webmonitor/WebMonitorEndpoint.java | 33 ++
.../AggregatingJobsMetricsHandlerTest.java | 81 ++++
.../AggregatingMetricsHandlerTestBase.java | 389 +++++++++++++++++++
.../AggregatingSubtasksMetricsHandlerTest.java | 93 +++++
...gregatingTaskManagersMetricsHandlerTest.java | 82 ++++
24 files changed, 2288 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
new file mode 100644
index 0000000..338bb46
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned.
+ * {@code /metrics?get=X,Y&agg=min,max}
+ * The handler will then return a list of objects containing the aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> {
+
+ private final Executor executor;
+ private final MetricFetcher<?> fetcher;
+
+ protected AbstractAggregatingMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ AbstractAggregatedMetricsHeaders<P> messageHeaders,
+ Executor executor,
+ MetricFetcher<?> fetcher) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.fetcher = Preconditions.checkNotNull(fetcher);
+ }
+
+ @Nonnull
+ abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, P> request);
+
+ @Override
+ protected CompletableFuture<AggregatedMetricsResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, P> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ fetcher.update();
+ List<String> requestedMetrics = request.getQueryParameter(MetricsFilterParameter.class);
+ List<MetricsAggregationParameter.AggregationMode> requestedAggregations = request.getQueryParameter(MetricsAggregationParameter.class);
+ MetricStore store = fetcher.getMetricStore();
+
+ Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, request);
+
+ if (requestedMetrics.isEmpty()) {
+ Collection<String> list = getAvailableMetrics(stores);
+ return new AggregatedMetricsResponseBody(
+ list.stream()
+ .map(AggregatedMetric::new)
+ .collect(Collectors.toList())
+ );
+ }
+
+ DoubleAccumulator.DoubleMinimumFactory minimumFactory = null;
+ DoubleAccumulator.DoubleMaximumFactory maximumFactory = null;
+ DoubleAccumulator.DoubleAverageFactory averageFactory = null;
+ DoubleAccumulator.DoubleSumFactory sumFactory = null;
+ // by default we return all aggregations
+ if (requestedAggregations.isEmpty()) {
+ minimumFactory = DoubleAccumulator.DoubleMinimumFactory.get();
+ maximumFactory = DoubleAccumulator.DoubleMaximumFactory.get();
+ averageFactory = DoubleAccumulator.DoubleAverageFactory.get();
+ sumFactory = DoubleAccumulator.DoubleSumFactory.get();
+ } else {
+ for (MetricsAggregationParameter.AggregationMode aggregation : requestedAggregations) {
+ switch (aggregation) {
+ case MIN:
+ minimumFactory = DoubleAccumulator.DoubleMinimumFactory.get();
+ break;
+ case MAX:
+ maximumFactory = DoubleAccumulator.DoubleMaximumFactory.get();
+ break;
+ case AVG:
+ averageFactory = DoubleAccumulator.DoubleAverageFactory.get();
+ break;
+ case SUM:
+ sumFactory = DoubleAccumulator.DoubleSumFactory.get();
+ break;
+ default:
+ log.warn("Unsupported aggregation specified: {}", aggregation);
+ }
+ }
+ }
+ MetricAccumulatorFactory metricAccumulatorFactory = new MetricAccumulatorFactory(minimumFactory, maximumFactory, averageFactory, sumFactory);
+
+ return getAggregatedMetricValues(stores, requestedMetrics, metricAccumulatorFactory);
+ } catch (Exception e) {
+ log.warn("Could not retrieve metrics.", e);
+ throw new CompletionException(new RestHandlerException("Could not retrieve metrics.", HttpResponseStatus.INTERNAL_SERVER_ERROR));
+ }
+ },
+ executor);
+ }
+
+ /**
+ * Returns a JSON string containing a list of all available metrics in the given stores. Effectively this method maps
+ * the union of all key-sets to JSON.
+ *
+ * @param stores metrics
+ * @return JSON string containing a list of all available metrics
+ */
+ private static Collection<String> getAvailableMetrics(Collection<? extends MetricStore.ComponentMetricStore> stores) {
+ Set<String> uniqueMetrics = new HashSet<>(32);
+ for (MetricStore.ComponentMetricStore store : stores) {
+ uniqueMetrics.addAll(store.metrics.keySet());
+ }
+ return uniqueMetrics;
+ }
+
+ /**
+ * Extracts and aggregates all requested metrics from the given metric stores, and maps the result to a JSON string.
+ *
+ * @param stores available metrics
+ * @param requestedMetrics ids of requested metrics
+ * @param requestedAggregationsFactories requested aggregations
+ * @return JSON string containing the requested metrics
+ */
+ private AggregatedMetricsResponseBody getAggregatedMetricValues(
+ Collection<? extends MetricStore.ComponentMetricStore> stores,
+ List<String> requestedMetrics,
+ MetricAccumulatorFactory requestedAggregationsFactories) {
+
+ Collection<AggregatedMetric> aggregatedMetrics = new ArrayList<>(requestedMetrics.size());
+ for (String requestedMetric : requestedMetrics) {
+ final Collection<Double> values = new ArrayList<>(stores.size());
+ try {
+ for (MetricStore.ComponentMetricStore store : stores) {
+ String stringValue = store.metrics.get(requestedMetric);
+ if (stringValue != null) {
+ values.add(Double.valueOf(stringValue));
+ }
+ }
+ } catch (NumberFormatException nfe) {
+ log.warn("The metric {} is not numeric and can't be aggregated.", requestedMetric, nfe);
+ // metric is not numeric so we can't perform aggregations => ignore it
+ continue;
+ }
+ if (!values.isEmpty()) {
+
+ Iterator<Double> valuesIterator = values.iterator();
+ MetricAccumulator acc = requestedAggregationsFactories.get(requestedMetric, valuesIterator.next());
+ valuesIterator.forEachRemaining(acc::add);
+
+ aggregatedMetrics.add(acc.get());
+ } else {
+ return new AggregatedMetricsResponseBody(Collections.emptyList());
+ }
+ }
+ return new AggregatedMetricsResponseBody(aggregatedMetrics);
+ }
+
+ private static class MetricAccumulatorFactory {
+
+ @Nullable
+ private final DoubleAccumulator.DoubleMinimumFactory minimumFactory;
+
+ @Nullable
+ private final DoubleAccumulator.DoubleMaximumFactory maximumFactory;
+
+ @Nullable
+ private final DoubleAccumulator.DoubleAverageFactory averageFactory;
+
+ @Nullable
+ private final DoubleAccumulator.DoubleSumFactory sumFactory;
+
+ private MetricAccumulatorFactory(
+ @Nullable DoubleAccumulator.DoubleMinimumFactory minimumFactory,
+ @Nullable DoubleAccumulator.DoubleMaximumFactory maximumFactory,
+ @Nullable DoubleAccumulator.DoubleAverageFactory averageFactory,
+ @Nullable DoubleAccumulator.DoubleSumFactory sumFactory) {
+ this.minimumFactory = minimumFactory;
+ this.maximumFactory = maximumFactory;
+ this.averageFactory = averageFactory;
+ this.sumFactory = sumFactory;
+ }
+
+ MetricAccumulator get(String metricName, double init) {
+ return new MetricAccumulator(
+ metricName,
+ minimumFactory == null ? null : minimumFactory.get(init),
+ maximumFactory == null ? null : maximumFactory.get(init),
+ averageFactory == null ? null : averageFactory.get(init),
+ sumFactory == null ? null : sumFactory.get(init)
+ );
+ }
+ }
+
+ private static class MetricAccumulator {
+ private final String metricName;
+
+ @Nullable
+ private final DoubleAccumulator min;
+ @Nullable
+ private final DoubleAccumulator max;
+ @Nullable
+ private final DoubleAccumulator avg;
+ @Nullable
+ private final DoubleAccumulator sum;
+
+ private MetricAccumulator(
+ String metricName,
+ @Nullable DoubleAccumulator min,
+ @Nullable DoubleAccumulator max,
+ @Nullable DoubleAccumulator avg,
+ @Nullable DoubleAccumulator sum) {
+ this.metricName = Preconditions.checkNotNull(metricName);
+ this.min = min;
+ this.max = max;
+ this.avg = avg;
+ this.sum = sum;
+ }
+
+ void add(double value) {
+ if (min != null) {
+ min.add(value);
+ }
+ if (max != null) {
+ max.add(value);
+ }
+ if (avg != null) {
+ avg.add(value);
+ }
+ if (sum != null) {
+ sum.add(value);
+ }
+ }
+
+ AggregatedMetric get() {
+ return new AggregatedMetric(
+ metricName,
+ min == null ? null : min.getValue(),
+ max == null ? null : max.getValue(),
+ avg == null ? null : avg.getValue(),
+ sum == null ? null : sum.getValue()
+ );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
new file mode 100644
index 0000000..42928a4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedJobMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedJobMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobsFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across jobs, a list of all available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific jobs can be selected for aggregation by specifying a comma-separated list of job IDs.
+ * {@code /metrics?get=X,Y&jobs=A,B}
+ */
+public class AggregatingJobsMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedJobMetricsParameters> {
+
+ public AggregatingJobsMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout, Map<String, String> responseHeaders,
+ Executor executor,
+ MetricFetcher<?> fetcher) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedJobMetricsHeaders.getInstance(), executor, fetcher);
+ }
+
+ @Nonnull
+ @Override
+ Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedJobMetricsParameters> request) {
+ List<JobID> jobs = request.getQueryParameter(JobsFilterQueryParameter.class);
+ if (jobs.isEmpty()) {
+ return store.getJobs().values();
+ } else {
+ Collection<MetricStore.ComponentMetricStore> jobStores = new ArrayList<>(jobs.size());
+ for (JobID job : jobs) {
+ MetricStore.ComponentMetricStore jobMetricStore = store.getJobMetricStore(job.toString());
+ if (jobMetricStore != null) {
+ jobStores.add(jobMetricStore);
+ }
+ }
+ return jobStores;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
new file mode 100644
index 0000000..f95deaa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.IntStream;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedSubtaskMetricsParameters> {
+
+ public AggregatingSubtasksMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ Executor executor,
+ MetricFetcher<?> fetcher) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
+ }
+
+ @Nonnull
+ @Override
+ Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedSubtaskMetricsParameters> request) {
+ JobID jobID = request.getPathParameter(JobIDPathParameter.class);
+ JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class);
+
+ Collection<String> subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class);
+ if (subtaskRanges.isEmpty()) {
+ MetricStore.TaskMetricStore taskMetricStore = store.getTaskMetricStore(jobID.toString(), taskID.toString());
+ if (taskMetricStore != null) {
+ return taskMetricStore.getAllSubtaskMetricStores();
+ } else {
+ return Collections.emptyList();
+ }
+ } else {
+ Iterable<Integer> subtasks = getIntegerRangeFromString(subtaskRanges);
+ Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>(8);
+ for (int subtask : subtasks) {
+ MetricStore.ComponentMetricStore subtaskMetricStore = store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask);
+ if (subtaskMetricStore != null) {
+ subtaskStores.add(subtaskMetricStore);
+ }
+ }
+ return subtaskStores;
+ }
+ }
+
+ private Iterable<Integer> getIntegerRangeFromString(Collection<String> ranges) {
+ UnionIterator<Integer> iterators = new UnionIterator<>();
+
+ for (String rawRange : ranges) {
+ try {
+ Iterator<Integer> rangeIterator;
+ String range = rawRange.trim();
+ int dashIdx = range.indexOf('-');
+ if (dashIdx == -1) {
+ // only one value in range:
+ rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
+ } else {
+ // evaluate range
+ final int start = Integer.valueOf(range.substring(0, dashIdx));
+ final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
+ rangeIterator = IntStream.rangeClosed(start, end).iterator();
+ }
+ iterators.add(rangeIterator);
+ } catch (NumberFormatException nfe) {
+ log.warn("Invalid value {} specified for integer range. Not a number.", rawRange, nfe);
+ }
+ }
+
+ return iterators;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
new file mode 100644
index 0000000..2e15cac
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregateTaskManagerMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedTaskManagerMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagersFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across task managers, a list of all available metrics or the values for
+ * a set of metrics.
+ *
+ * <p>Specific taskmanagers can be selected for aggregation by specifying a comma-separated list of taskmanager IDs.
+ * {@code /metrics?get=X,Y&taskmanagers=A,B}
+ */
+public class AggregatingTaskManagersMetricsHandler extends AbstractAggregatingMetricsHandler<AggregateTaskManagerMetricsParameters> {
+
+ public AggregatingTaskManagersMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout, Map<String, String> responseHeaders,
+ Executor executor,
+ MetricFetcher<?> fetcher) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedTaskManagerMetricsHeaders.getInstance(), executor, fetcher);
+ }
+
+ @Nonnull
+ @Override
+ Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregateTaskManagerMetricsParameters> request) {
+ List<ResourceID> taskmanagers = request.getQueryParameter(TaskManagersFilterQueryParameter.class);
+ if (taskmanagers.isEmpty()) {
+ return store.getTaskManagers().values();
+ } else {
+ Collection<MetricStore.TaskManagerMetricStore> taskmanagerStores = new ArrayList<>(taskmanagers.size());
+ for (ResourceID taskmanager : taskmanagers) {
+ MetricStore.TaskManagerMetricStore taskManagerMetricStore = store.getTaskManagerMetricStore(taskmanager.getResourceIdString());
+ if (taskManagerMetricStore != null) {
+ taskmanagerStores.add(taskManagerMetricStore);
+ }
+ }
+ return taskmanagerStores;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
new file mode 100644
index 0000000..dc701d9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
+
+ /**
+ * Adds the given value to this accumulator.
+ *
+ * @param value value to add
+ */
+ void add(double value);
+
+ /**
+ * Returns the current value of this accumulator.
+ *
+ * @return current value of this accumulator
+ */
+ double getValue();
+
+ /**
+ * Returns the name of this accumulator type. This name is used as a suffix for exposed metrics.
+ *
+ * @return name of this accumulator type
+ */
+ String getName();
+
+ /**
+ * A factory for {@link DoubleAccumulator}s. This allows us to regenerate a new set of accumulators for each metrics
+ * without re-evaluating the "agg" query parameter or re-using existing accumulators.
+ *
+ * @param <A> DoubleAccumulator subclass
+ */
+ interface DoubleAccumulatorFactory<A extends DoubleAccumulator> {
+ /**
+ * Creates a new accumulator with the given initial value.
+ *
+ * @param init initial value
+ * @return new accumulator with the given initial value
+ */
+ A get(double init);
+ }
+
+ /**
+ * Factory for {@link DoubleMaximum}.
+ */
+ final class DoubleMaximumFactory implements DoubleAccumulatorFactory<DoubleMaximum> {
+ private static final DoubleMaximumFactory INSTANCE = new DoubleMaximumFactory();
+
+ private DoubleMaximumFactory(){
+ }
+
+ @Override
+ public DoubleMaximum get(double init) {
+ return new DoubleMaximum(init);
+ }
+
+ public static DoubleMaximumFactory get() {
+ return INSTANCE;
+ }
+ }
+
+ /**
+ * Factory for {@link DoubleMinimum}.
+ */
+ final class DoubleMinimumFactory implements DoubleAccumulatorFactory<DoubleMinimum> {
+ private static final DoubleMinimumFactory INSTANCE = new DoubleMinimumFactory();
+
+ private DoubleMinimumFactory(){
+ }
+
+ @Override
+ public DoubleMinimum get(double init) {
+ return new DoubleMinimum(init);
+ }
+
+ public static DoubleMinimumFactory get() {
+ return INSTANCE;
+ }
+ }
+
+ /**
+ * Factory for {@link DoubleSum}.
+ */
+ final class DoubleSumFactory implements DoubleAccumulatorFactory<DoubleSum> {
+ private static final DoubleSumFactory INSTANCE = new DoubleSumFactory();
+
+ private DoubleSumFactory(){
+ }
+
+ @Override
+ public DoubleSum get(double init) {
+ return new DoubleSum(init);
+ }
+
+ public static DoubleSumFactory get() {
+ return INSTANCE;
+ }
+ }
+
+ /**
+ * Factory for {@link DoubleAverage}.
+ */
+ final class DoubleAverageFactory implements DoubleAccumulatorFactory<DoubleAverage> {
+ private static final DoubleAverageFactory INSTANCE = new DoubleAverageFactory();
+
+ private DoubleAverageFactory(){
+ }
+
+ @Override
+ public DoubleAverage get(double init) {
+ return new DoubleAverage(init);
+ }
+
+ public static DoubleAverageFactory get() {
+ return INSTANCE;
+ }
+ }
+
+ /**
+ * {@link DoubleAccumulator} that returns the maximum value.
+ */
+ final class DoubleMaximum implements DoubleAccumulator {
+
+ public static final String NAME = "max";
+
+ private double value;
+
+ private DoubleMaximum(double init) {
+ value = init;
+ }
+
+ @Override
+ public void add(double value) {
+ this.value = Math.max(this.value, value);
+ }
+
+ @Override
+ public double getValue() {
+ return value;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+ }
+
+ /**
+ * {@link DoubleAccumulator} that returns the minimum value.
+ */
+ final class DoubleMinimum implements DoubleAccumulator {
+
+ public static final String NAME = "min";
+
+ private double value;
+
+ private DoubleMinimum(double init) {
+ value = init;
+ }
+
+ @Override
+ public void add(double value) {
+ this.value = Math.min(this.value, value);
+ }
+
+ @Override
+ public double getValue() {
+ return value;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+ }
+
+ /**
+ * {@link DoubleAccumulator} that returns the sum of all values.
+ */
+ final class DoubleSum implements DoubleAccumulator {
+
+ public static final String NAME = "sum";
+
+ private double value;
+
+ private DoubleSum(double init) {
+ value = init;
+ }
+
+ @Override
+ public void add(double value) {
+ this.value += value;
+ }
+
+ @Override
+ public double getValue() {
+ return value;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+ }
+
+ /**
+ * {@link DoubleAccumulator} that returns the average over all values.
+ */
+ final class DoubleAverage implements DoubleAccumulator {
+
+ public static final String NAME = "avg";
+
+ private double sum;
+ private int count;
+
+ private DoubleAverage(double init) {
+ sum = init;
+ count = 1;
+ }
+
+ @Override
+ public void add(double value) {
+ this.sum += value;
+ this.count++;
+ }
+
+ @Override
+ public double getValue() {
+ return sum / count;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
new file mode 100644
index 0000000..4100802
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Based {@link MessageHeaders} class for aggregating metrics.
+ */
+public abstract class AbstractAggregatedMetricsHeaders<P extends AbstractAggregatedMetricsParameters<?>> implements MessageHeaders<EmptyRequestBody, AggregatedMetricsResponseBody, P> {
+ @Override
+ public Class<AggregatedMetricsResponseBody> getResponseClass() {
+ return AggregatedMetricsResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java
new file mode 100644
index 0000000..a07141d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Base {@link MessageParameters} class for aggregating metrics.
+ */
+public abstract class AbstractAggregatedMetricsParameters<M extends MessageQueryParameter<?>> extends MessageParameters {
+ private final MetricsFilterParameter metrics = new MetricsFilterParameter();
+ private final MetricsAggregationParameter aggs = new MetricsAggregationParameter();
+ private final M selector;
+
+ AbstractAggregatedMetricsParameters(M selector) {
+ this.selector = selector;
+ }
+
+ @Override
+ public Collection<MessageQueryParameter<?>> getQueryParameters() {
+ return Collections.unmodifiableCollection(Arrays.asList(
+ metrics,
+ aggs,
+ selector
+ ));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java
new file mode 100644
index 0000000..0a053e6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for aggregating task manager metrics.
+ */
+public class AggregateTaskManagerMetricsParameters extends AbstractAggregatedMetricsParameters<TaskManagersFilterQueryParameter> {
+ public AggregateTaskManagerMetricsParameters() {
+ super(new TaskManagersFilterQueryParameter());
+ }
+
+ @Override
+ public Collection<MessagePathParameter<?>> getPathParameters() {
+ return Collections.emptyList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java
new file mode 100644
index 0000000..265512e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+/**
+ * Headers for aggregating job metrics.
+ */
+public class AggregatedJobMetricsHeaders extends AbstractAggregatedMetricsHeaders<AggregatedJobMetricsParameters> {
+
+ private static final AggregatedJobMetricsHeaders INSTANCE = new AggregatedJobMetricsHeaders();
+
+ private AggregatedJobMetricsHeaders() {
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/jobs/metrics";
+ }
+
+ @Override
+ public AggregatedJobMetricsParameters getUnresolvedMessageParameters() {
+ return new AggregatedJobMetricsParameters();
+ }
+
+ public static AggregatedJobMetricsHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java
new file mode 100644
index 0000000..25df609
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for aggregating job metrics.
+ */
+public class AggregatedJobMetricsParameters extends AbstractAggregatedMetricsParameters<JobsFilterQueryParameter> {
+
+ public AggregatedJobMetricsParameters() {
+ super(new JobsFilterQueryParameter());
+ }
+
+ @Override
+ public Collection<MessagePathParameter<?>> getPathParameters() {
+ return Collections.emptyList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
new file mode 100644
index 0000000..acafc3a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Response type for aggregated metrics. Contains the metric name and optionally the sum, average, minimum and maximum.
+ */
+public class AggregatedMetric {
+
+ private static final String FIELD_NAME_ID = "id";
+
+ private static final String FIELD_NAME_MIN = "min";
+
+ private static final String FIELD_NAME_MAX = "max";
+
+ private static final String FIELD_NAME_AVG = "avg";
+
+ private static final String FIELD_NAME_SUM = "sum";
+
+ @JsonProperty(value = FIELD_NAME_ID, required = true)
+ private final String id;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_NAME_MIN)
+ private final Double min;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_NAME_MAX)
+ private final Double max;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_NAME_AVG)
+ private final Double avg;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_NAME_SUM)
+ private final Double sum;
+
+ @JsonCreator
+ public AggregatedMetric(
+ final @JsonProperty(value = FIELD_NAME_ID, required = true) String id,
+ final @Nullable @JsonProperty(FIELD_NAME_MIN) Double min,
+ final @Nullable @JsonProperty(FIELD_NAME_MAX) Double max,
+ final @Nullable @JsonProperty(FIELD_NAME_AVG) Double avg,
+ final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum) {
+
+ this.id = requireNonNull(id, "id must not be null");
+ this.min = min;
+ this.max = max;
+ this.avg = avg;
+ this.sum = sum;
+ }
+
+ public AggregatedMetric(final @JsonProperty(value = FIELD_NAME_ID, required = true) String id) {
+ this(id, null, null, null, null);
+ }
+
+ @JsonIgnore
+ public String getId() {
+ return id;
+ }
+
+ @JsonIgnore
+ public Double getMin() {
+ return min;
+ }
+
+ @JsonIgnore
+ public Double getMax() {
+ return max;
+ }
+
+ @JsonIgnore
+ public Double getSum() {
+ return sum;
+ }
+
+ @JsonIgnore
+ public Double getAvg() {
+ return avg;
+ }
+
+ @Override
+ public String toString() {
+ return "AggregatedMetric{" +
+ "id='" + id + '\'' +
+ ", mim='" + min + '\'' +
+ ", max='" + max + '\'' +
+ ", avg='" + avg + '\'' +
+ ", sum='" + sum + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java
new file mode 100644
index 0000000..b6b8dcc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Response type for a collection of aggregated metrics.
+ *
+ * <p>As JSON this type will be represented as an array of
+ * metrics, i.e., the field <code>metrics</code> will not show up. For example, a collection with a
+ * single metric will be represented as follows:
+ * <pre>
+ * {@code
+ * [{"id": "metricName", "min": "1"}]
+ * }
+ * </pre>
+ *
+ * @see AggregatedMetricsResponseBody.Serializer
+ * @see AggregatedMetricsResponseBody.Deserializer
+ * @see org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore
+ */
+@JsonSerialize(using = AggregatedMetricsResponseBody.Serializer.class)
+@JsonDeserialize(using = AggregatedMetricsResponseBody.Deserializer.class)
+public class AggregatedMetricsResponseBody implements ResponseBody {
+
+ private final Collection<AggregatedMetric> metrics;
+
+ public AggregatedMetricsResponseBody(Collection<AggregatedMetric> metrics) {
+
+ this.metrics = metrics;
+ }
+
+ @JsonIgnore
+ public Collection<AggregatedMetric> getMetrics() {
+ return metrics;
+ }
+
+ /**
+ * JSON serializer for {@link AggregatedMetricsResponseBody}.
+ */
+ public static class Serializer extends StdSerializer<AggregatedMetricsResponseBody> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected Serializer() {
+ super(AggregatedMetricsResponseBody.class);
+ }
+
+ @Override
+ public void serialize(
+ AggregatedMetricsResponseBody metricCollectionResponseBody,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException {
+
+ jsonGenerator.writeObject(metricCollectionResponseBody.getMetrics());
+ }
+ }
+
+ /**
+ * JSON deserializer for {@link AggregatedMetricsResponseBody}.
+ */
+ public static class Deserializer extends StdDeserializer<AggregatedMetricsResponseBody> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected Deserializer() {
+ super(AggregatedMetricsResponseBody.class);
+ }
+
+ @Override
+ public AggregatedMetricsResponseBody deserialize(
+ JsonParser jsonParser,
+ DeserializationContext deserializationContext) throws IOException {
+
+ return new AggregatedMetricsResponseBody(jsonParser.readValueAs(
+ new TypeReference<List<AggregatedMetric>>() {
+ }));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
new file mode 100644
index 0000000..e1d0790
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+
+/**
+ * Headers for aggregating subtask metrics.
+ */
+public class AggregatedSubtaskMetricsHeaders extends AbstractAggregatedMetricsHeaders<AggregatedSubtaskMetricsParameters> {
+
+ private static final AggregatedSubtaskMetricsHeaders INSTANCE = new AggregatedSubtaskMetricsHeaders();
+
+ private AggregatedSubtaskMetricsHeaders() {
+ }
+
+ @Override
+ public AggregatedSubtaskMetricsParameters getUnresolvedMessageParameters() {
+ return new AggregatedSubtaskMetricsParameters();
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/jobs/" + JobIDPathParameter.KEY + "/vertices/" + JobVertexIdPathParameter.KEY + "/subtasks/metrics";
+ }
+
+ public static AggregatedSubtaskMetricsHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
new file mode 100644
index 0000000..34e1b52
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for aggregating subtask metrics.
+ */
+public class AggregatedSubtaskMetricsParameters extends AbstractAggregatedMetricsParameters<SubtasksFilterQueryParameter> {
+
+ private final JobIDPathParameter jobId = new JobIDPathParameter();
+ private final JobVertexIdPathParameter vertexId = new JobVertexIdPathParameter();
+ private final SubtaskIndexPathParameter subtaskIndex = new SubtaskIndexPathParameter();
+
+ public AggregatedSubtaskMetricsParameters() {
+ super(new SubtasksFilterQueryParameter());
+ }
+
+ @Override
+ public Collection<MessagePathParameter<?>> getPathParameters() {
+ return Collections.unmodifiableCollection(Arrays.asList(
+ jobId,
+ vertexId,
+ subtaskIndex
+ ));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java
new file mode 100644
index 0000000..5b5fe4c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+/**
+ * Headers for aggregating task manager metrics.
+ */
+public class AggregatedTaskManagerMetricsHeaders extends AbstractAggregatedMetricsHeaders<AggregateTaskManagerMetricsParameters> {
+
+ private static final AggregatedTaskManagerMetricsHeaders INSTANCE = new AggregatedTaskManagerMetricsHeaders();
+
+ private AggregatedTaskManagerMetricsHeaders() {
+ }
+
+ @Override
+ public AggregateTaskManagerMetricsParameters getUnresolvedMessageParameters() {
+ return new AggregateTaskManagerMetricsParameters();
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/taskmanagers/metrics";
+ }
+
+ public static AggregatedTaskManagerMetricsHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java
new file mode 100644
index 0000000..fb57f87
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+
+/**
+ * {@link MessageQueryParameter} for selecting jobs when aggregating metrics.
+ */
+public class JobsFilterQueryParameter extends MessageQueryParameter<JobID> {
+
+ JobsFilterQueryParameter() {
+ super("jobs", MessageParameterRequisiteness.OPTIONAL);
+ }
+
+ @Override
+ public JobID convertStringToValue(String value) throws ConversionException {
+ try {
+ return JobID.fromHexString(value);
+ } catch (IllegalArgumentException iae) {
+ throw new ConversionException("Not a valid job ID: " + value, iae);
+ }
+ }
+
+ @Override
+ public String convertValueToString(JobID value) {
+ return value.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
new file mode 100644
index 0000000..1057788
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Locale;
+
+/**
+ * TODO: add javadoc.
+ */
+public class MetricsAggregationParameter extends MessageQueryParameter<MetricsAggregationParameter.AggregationMode> {
+
+ protected MetricsAggregationParameter() {
+ super("agg", MessageParameterRequisiteness.OPTIONAL);
+ }
+
+ @Override
+ public AggregationMode convertStringToValue(String value) throws ConversionException {
+ try {
+ return AggregationMode.valueOf(value.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException iae) {
+ throw new ConversionException("Not a valid aggregation: " + value, iae);
+ }
+ }
+
+ @Override
+ public String convertValueToString(AggregationMode value) {
+ return value.name().toLowerCase();
+ }
+
+ /**
+ * The available aggregations.
+ */
+ public enum AggregationMode {
+ MIN,
+ MAX,
+ SUM,
+ AVG
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java
new file mode 100644
index 0000000..fe5d37e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+/**
+ * {@link MessageQueryParameter} for selecting subtasks when aggregating metrics.
+ */
+public class SubtasksFilterQueryParameter extends MessageQueryParameter<String> {
+
+ SubtasksFilterQueryParameter() {
+ super("subtasks", MessageParameterRequisiteness.OPTIONAL);
+ }
+
+ @Override
+ public String convertStringToValue(String value) {
+ return value;
+ }
+
+ @Override
+ public String convertValueToString(String value) {
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java
new file mode 100644
index 0000000..dcd6934
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+/**
+ * {@link MessageQueryParameter} for selecting task managers when aggregating metrics.
+ */
+public class TaskManagersFilterQueryParameter extends MessageQueryParameter<ResourceID> {
+
+ TaskManagersFilterQueryParameter() {
+ super("taskmanagers", MessageParameterRequisiteness.OPTIONAL);
+ }
+
+ @Override
+ public ResourceID convertStringToValue(String value) {
+ return new ResourceID(value);
+ }
+
+ @Override
+ public String convertValueToString(ResourceID value) {
+ return value.getResourceIdString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 7e552de..fb663ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -56,6 +56,9 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatistic
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingJobsMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingTaskManagersMetricsHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
@@ -393,6 +396,33 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
responseHeaders,
metricFetcher);
+ final AggregatingTaskManagersMetricsHandler aggregatingTaskManagersMetricsHandler = new AggregatingTaskManagersMetricsHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ executor,
+ metricFetcher
+ );
+
+ final AggregatingJobsMetricsHandler aggregatingJobsMetricsHandler = new AggregatingJobsMetricsHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ executor,
+ metricFetcher
+ );
+
+ final AggregatingSubtasksMetricsHandler aggregatingSubtasksMetricsHandler = new AggregatingSubtasksMetricsHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ executor,
+ metricFetcher
+ );
+
final JobVertexTaskManagersHandler jobVertexTaskManagersHandler = new JobVertexTaskManagersHandler(
restAddressFuture,
leaderRetriever,
@@ -553,6 +583,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
handlers.add(Tuple2.of(SubtaskMetricsHeaders.getInstance(), subtaskMetricsHandler));
handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler));
handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsHandler));
+ handlers.add(Tuple2.of(aggregatingTaskManagersMetricsHandler.getMessageHeaders(), aggregatingTaskManagersMetricsHandler));
+ handlers.add(Tuple2.of(aggregatingJobsMetricsHandler.getMessageHeaders(), aggregatingJobsMetricsHandler));
+ handlers.add(Tuple2.of(aggregatingSubtasksMetricsHandler.getMessageHeaders(), aggregatingSubtasksMetricsHandler));
handlers.add(Tuple2.of(JobExecutionResultHeaders.getInstance(), jobExecutionResultHandler));
handlers.add(Tuple2.of(SavepointTriggerHeaders.getInstance(), savepointTriggerHandler));
handlers.add(Tuple2.of(SavepointStatusHeaders.getInstance(), savepointStatusHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
new file mode 100644
index 0000000..2dac8bf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedJobMetricsParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests for the {@link AggregatingJobsMetricsHandler}.
+ */
+public class AggregatingJobsMetricsHandlerTest extends AggregatingMetricsHandlerTestBase<AggregatingJobsMetricsHandler, AggregatedJobMetricsParameters> {
+
+ private static final JobID JOB_ID_1 = JobID.generate();
+ private static final JobID JOB_ID_2 = JobID.generate();
+ private static final JobID JOB_ID_3 = JobID.generate();
+
+ @Override
+ protected Tuple2<String, List<String>> getFilter() {
+ return Tuple2.of("jobs", Arrays.asList(JOB_ID_1.toString(), JOB_ID_3.toString()));
+ }
+
+ @Override
+ protected Collection<MetricDump> getMetricDumps() {
+ Collection<MetricDump> dumps = new ArrayList<>(3);
+ QueryScopeInfo.JobQueryScopeInfo job = new QueryScopeInfo.JobQueryScopeInfo(JOB_ID_1.toString(), "abc");
+ MetricDump.CounterDump cd1 = new MetricDump.CounterDump(job, "metric1", 1);
+ dumps.add(cd1);
+
+ QueryScopeInfo.JobQueryScopeInfo job2 = new QueryScopeInfo.JobQueryScopeInfo(JOB_ID_2.toString(), "abc");
+ MetricDump.CounterDump cd2 = new MetricDump.CounterDump(job2, "metric1", 3);
+ dumps.add(cd2);
+
+ QueryScopeInfo.JobQueryScopeInfo job3 = new QueryScopeInfo.JobQueryScopeInfo(JOB_ID_3.toString(), "abc");
+ MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job3, "metric2", 5);
+ dumps.add(cd3);
+ return dumps;
+ }
+
+ @Override
+ protected AggregatingJobsMetricsHandler getHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+ return new AggregatingJobsMetricsHandler(
+ localRestAddress,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ executor,
+ fetcher
+ );
+ }
+}