You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2017/10/25 10:29:34 UTC
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/4901
[FLINK-7781][metrics][REST] Support on-demand aggregations
## What is the purpose of the change
This PR adds on-demand aggregation support to the metrics REST API. It allows aggregating
* taskmanagers metrics across all taskmanagers
* job metrics across all jobs
* subtask metrics across all subtasks
The aggregation is handled by new handlers as described in the documentation. Additionally, a subtask metric handler was added to subsume the existing JobVertexMetricsHandler to expose subtask metrics in a more idiomatic way. The JobVertexMetricsHandler was not removed for backwards compatibility (with the webUI in particular):
## Brief change log
* fix the requests examples in the metric handler javadocs
* implement aggregation handlers
* expose all subtask stores in TaskMetricStore for easier access
* extend MetricStoreTest#setupMetricStore to contain metrics for multiple entities
* integrate aggregation handlers into WebRuntimeMonitor
* document the metrics REST API
## Verifying this change
This change added tests for each Aggregating*MetricsHandler and for the abstract base class (AbstractAggregatingMetricsHandlerTest).
This change can be verified manually by running multiple taskmanagers/jobs/subtasks in the cluster and querying the rest endpoints as described in the documentation.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
## Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (docs)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink 7781
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4901.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4901
----
commit 4a3f51157180241210160fb73508f406c9192bc2
Author: zentol <ch...@apache.org>
Date: 2017-10-25T07:42:14Z
[hotfix][javadocs] Fix metric handler example javadocs
commit afabf6fb72c2b2bd94fb0410c985e61e940d138e
Author: zentol <ch...@apache.org>
Date: 2017-10-25T09:21:28Z
[FLINK-7781][metrics][REST] Support on-demand aggregations
commit f768be1ebe0059b30571c43950f0942a7919bb0c
Author: zentol <ch...@apache.org>
Date: 2017-10-25T09:21:46Z
[FLINK-7912][metrics][docs] Document metrics REST API
----
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147668677
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
+ * the values for a set of metrics for a specific entity, or aggregate them across several entities.
+ *
--- End diff --
Caused by a previous iteration where both aggregating/non-aggregating handlers inherited from the same class. Will rewrite it.
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147696972
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
+import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
+
+/**
+ * Request handler that returns, aggregated across all subtasks, a list of all available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler {
+ public AggregatingSubtasksMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
+ }
+
+ @Override
+ protected Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters) {
+ String jobID = pathParameters.get(PARAMETER_JOB_ID);
+ String taskID = pathParameters.get(PARAMETER_VERTEX_ID);
+ if (jobID == null) {
+ return Collections.emptyList();
+ }
+ if (taskID == null) {
+ return Collections.emptyList();
+ }
+ String subtasksList = queryParameters.get("subtasks");
+ if (subtasksList == null || subtasksList.isEmpty()) {
+ return store.getTaskMetricStore(jobID, taskID).getAllSubtaskMetricStores();
+ } else {
+ Iterable<Integer> subtasks = getIntegerRangeFromString(subtasksList);
+ Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>();
+ for (int subtask : subtasks) {
+ subtaskStores.add(store.getSubtaskMetricStore(jobID, taskID, subtask));
+ }
+ return subtaskStores;
+ }
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{"/jobs/:jobid/vertices/:vertexid/subtasks/metrics"};
+ }
+
+ private Iterable<Integer> getIntegerRangeFromString(String rangeDefinition) throws NumberFormatException {
--- End diff --
You can remove the `throws NumberFormatException` as you handle the exception.
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147650987
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
+ * the values for a set of metrics for a specific entity, or aggregate them across several entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified all aggregations will be returned.
+ * {@code /metrics?get=X,Y&agg=min,max}
+ * The handler will then return an object containing the aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
+ */
+abstract class AbstractAggregatingMetricsHandler extends AbstractJsonRequestHandler {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final String PARAMETER_AGGREGATION = "agg";
+
+ private final MetricFetcher fetcher;
+
+ AbstractAggregatingMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor);
+ this.fetcher = Preconditions.checkNotNull(fetcher);
+ }
+
+ protected abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters);
+
+ @Override
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ fetcher.update();
+ String requestedMetricsList = queryParams.get(AbstractMetricsHandler.PARAMETER_METRICS);
+ String aggTypeList = queryParams.get(PARAMETER_AGGREGATION);
+ MetricStore store = fetcher.getMetricStore();
+
+ if (requestedMetricsList == null) {
+ Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
+ if (stores == null) {
+ return "[]";
+ }
+ Collection<String> list = getAvailableMetrics(stores);
+ return mapMetricListToJson(list);
+ }
+
+ if (requestedMetricsList.isEmpty()) {
+ /*
+ * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a
+ * request for which the "get" parameter is an empty string.
+ */
+ return "[]";
+ }
+
+ String[] requestedMetrics = requestedMetricsList.split(",");
+
+ List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories = new ArrayList<>();
+ // by default we return all aggregations
+ if (aggTypeList == null || aggTypeList.isEmpty()) {
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
+ } else {
+ for (String aggregation : aggTypeList.split(",")) {
+ switch (aggregation.toLowerCase()) {
+ case DoubleAccumulator.DoubleMinimum.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
+ break;
+ case DoubleAccumulator.DoubleMaximum.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
+ break;
+ case DoubleAccumulator.DoubleSum.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
+ break;
+ case DoubleAccumulator.DoubleAverage.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
+ break;
+ default:
+ log.warn("Invalid aggregation specified: {}", aggregation.toLowerCase());
+ }
+ }
+ }
+ Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
--- End diff --
Shouldn't this check for the `stores` be before populating the `requestedAggregationsFactories`? If the `stores` is empty, then object creation and all the work previously done is in vain.
Can this check be factored out before even checking the `requestedMetricsList`? So that we avoid code repetition. It seems that always when the `stores==null`, we `return "[]"`.
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147649624
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
+ * the values for a set of metrics for a specific entity, or aggregate them across several entities.
+ *
--- End diff --
The above javadoc can be improved. The phrase:
"Subclasses may either that returns a list of all available metrics or the values for a set of metrics for a specific entity, or aggregate them across several entities."
Seems to be the result of sloppy copy paste.
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/4901
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147649786
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
+ * the values for a set of metrics for a specific entity, or aggregate them across several entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
--- End diff --
"get" parameter a comma-separate -> "get" parameter**,** a comma-separate
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147655793
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java ---
@@ -39,12 +39,15 @@
* <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
* {@code [ { "id" : "X" } ] }
*
- * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
- * {@code /get?X,Y}
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
* The handler will then return a list containing the values of the requested metrics.
* {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
*/
public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
+
--- End diff --
Why not putting all these (`PARAMETER_METRICS`, `PARAMETER_AGGREGATION`, `SUBTASK_METRICS_REST_PATH` even `"/jobs/metrics"`, etc) in a `MetricsURLConstants` class (or sth like this) so that there is a central place for all? This could help maintenance.
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147670970
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across all subtasks of a single tasks, a list of all available metrics or the
+ * values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg".
+ */
+public class SubtaskMetricsHandler extends AbstractMetricsHandler {
+ private static final String SUBTASK_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/metrics";
+
+ public SubtaskMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{SUBTASK_METRICS_REST_PATH};
+ }
+
+ @Override
+ protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+ String subtaskNumString = pathParams.get(SubtaskExecutionAttemptDetailsHandler.PARAMETER_SUBTASK_INDEX);
+ int subtaskNum;
+ try {
+ subtaskNum = Integer.valueOf(subtaskNumString);
+ } catch (NumberFormatException nfe) {
+ return null;
+ }
--- End diff --
An unknown subtask index will cause the method to return null (```metrics.getSubtaskMetricStore(...)```), which is the same behavior as all other metric handlers.
We can't provide an accurate message in this case; we only know that this subtask index is unknown to the store, but not why. (does it exceed the parallelism, metrics haven't arrived yet).
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147669836
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java ---
@@ -39,12 +39,15 @@
* <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
* {@code [ { "id" : "X" } ] }
*
- * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
- * {@code /get?X,Y}
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
* The handler will then return a list containing the values of the requested metrics.
* {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
*/
public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
+
--- End diff --
This will be resolved once we port it to the new REST stuff anyway, as they will then reside in the repsective parameter/header classes.
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147651957
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
+ * the values for a set of metrics for a specific entity, or aggregate them across several entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified all aggregations will be returned.
+ * {@code /metrics?get=X,Y&agg=min,max}
+ * The handler will then return an object containing the aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
+ */
+abstract class AbstractAggregatingMetricsHandler extends AbstractJsonRequestHandler {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final String PARAMETER_AGGREGATION = "agg";
+
+ private final MetricFetcher fetcher;
+
+ AbstractAggregatingMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor);
+ this.fetcher = Preconditions.checkNotNull(fetcher);
+ }
+
+ protected abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters);
+
+ @Override
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ fetcher.update();
+ String requestedMetricsList = queryParams.get(AbstractMetricsHandler.PARAMETER_METRICS);
+ String aggTypeList = queryParams.get(PARAMETER_AGGREGATION);
+ MetricStore store = fetcher.getMetricStore();
+
+ if (requestedMetricsList == null) {
+ Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
+ if (stores == null) {
+ return "[]";
+ }
+ Collection<String> list = getAvailableMetrics(stores);
+ return mapMetricListToJson(list);
+ }
+
+ if (requestedMetricsList.isEmpty()) {
+ /*
+ * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a
+ * request for which the "get" parameter is an empty string.
+ */
+ return "[]";
+ }
+
+ String[] requestedMetrics = requestedMetricsList.split(",");
+
+ List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories = new ArrayList<>();
+ // by default we return all aggregations
+ if (aggTypeList == null || aggTypeList.isEmpty()) {
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
+ } else {
+ for (String aggregation : aggTypeList.split(",")) {
+ switch (aggregation.toLowerCase()) {
+ case DoubleAccumulator.DoubleMinimum.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
+ break;
+ case DoubleAccumulator.DoubleMaximum.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
+ break;
+ case DoubleAccumulator.DoubleSum.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
+ break;
+ case DoubleAccumulator.DoubleAverage.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
+ break;
+ default:
+ log.warn("Invalid aggregation specified: {}", aggregation.toLowerCase());
+ }
+ }
+ }
+ Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
+ if (stores == null){
+ return "[]";
+ }
+ return getAggregatedMetricValues(stores, requestedMetrics, requestedAggregationsFactories);
+ } catch (Exception e) {
+ throw new CompletionException(new FlinkException("Could not retrieve metrics.", e));
+ }
+ },
+ executor);
+ }
+
+ /**
+ * Returns a JSON string containing a list of all available metrics in the given stores. Effectively this method maps
+ * the union of all key-sets to JSON.
+ *
+ * @param stores metrics
+ * @return JSON string containing a list of all available metrics
+ */
+ private static Collection<String> getAvailableMetrics(Collection<? extends MetricStore.ComponentMetricStore> stores) {
+ Set<String> uniqueMetrics = new HashSet<>();
+ for (MetricStore.ComponentMetricStore store : stores) {
+ uniqueMetrics.addAll(store.metrics.keySet());
+ }
+ return uniqueMetrics;
+ }
+
+ private static String mapMetricListToJson(Collection<String> metrics) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ gen.writeStartArray();
+ for (String m : metrics) {
+ gen.writeStartObject();
+ gen.writeStringField("id", m);
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+
+ gen.close();
+ return writer.toString();
+ }
+
+ /**
+ * Extracts and aggregates all requested metrics from the given metric stores, and maps the result to a JSON string.
+ *
+ * @param stores available metrics
+ * @param requestedMetrics ids of requested metrics
+ * @param requestedAggregationsFactories requested aggregations
+ * @return JSON string containing the requested metrics
+ * @throws IOException
+ */
+ private String getAggregatedMetricValues(Collection<? extends MetricStore.ComponentMetricStore> stores, String[] requestedMetrics, List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ gen.writeStartArray();
+ for (String requestedMetric : requestedMetrics) {
+ final Collection<Double> values = new ArrayList<>();
+ try {
+ for (MetricStore.ComponentMetricStore store : stores) {
+ String stringValue = store.metrics.get(requestedMetric);
+ if (stringValue != null) {
+ values.add(Double.valueOf(stringValue));
+ }
+ }
+ } catch (NumberFormatException nfe) {
+ // metric is not numeric so we can't perform aggregations => ignore it
+ continue;
--- End diff --
Not sure when this can happen, but shouldn't we log sth when we have this exception?
As in the `AggregatingSubtasksMetricsHandler.getStores()` in the `catch` part.
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147670055
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
+
+ /**
+ * Adds the given value to this accumulator.
+ *
+ * @param value value to add
+ */
+ void add(double value);
+
+ /**
+ * Returns the current value of this accumulator.
+ *
+ * @return current value of this accumulator
+ */
+ double getValue();
+
+ /**
+ * Returns the name of this accumulator type. This name is used as a suffix for exposed metrics.
+ *
+ * @return name of this accumulator type
+ */
+ String getName();
+
+ /**
+ * A factory for {@link DoubleAccumulator}s. This allows us to regenerate a new set of accumulators for each metrics
+ * without re-evaluating the "agg" query parameter or re-using existing accumulators.
+ *
+ * @param <A> DoubleAccumulator subclass
+ */
+ interface DoubleAccumulatorFactory<A extends DoubleAccumulator> {
+ /**
+ * Creates a new accumulator with the given initial value.
+ *
+ * @param init initial value
+ * @return new accumulator with the given initial value
+ */
+ A get(double init);
+ }
+
+ /**
+ * Factory for {@link DoubleMaximum}.
+ */
+ final class DoubleMaximumFactory implements DoubleAccumulatorFactory<DoubleMaximum> {
+ private static final DoubleMaximumFactory INSTANCE = new DoubleMaximumFactory();
+
+ private DoubleMaximumFactory(){
+ }
+
+ @Override
+ public DoubleMaximum get(double init) {
+ return new DoubleMaximum(init);
+ }
+
+ public static DoubleMaximumFactory get() {
+ return INSTANCE;
+ }
+ }
+
+ /**
+ * Factory for {@link DoubleMinimum}.
+ */
+ final class DoubleMinimumFactory implements DoubleAccumulatorFactory<DoubleMinimum> {
+ private static final DoubleMinimumFactory INSTANCE = new DoubleMinimumFactory();
+
+ private DoubleMinimumFactory(){
+ }
+
+ @Override
+ public DoubleMinimum get(double init) {
+ return new DoubleMinimum(init);
+ }
+
+ public static DoubleMinimumFactory get() {
+ return INSTANCE;
+ }
+ }
+
+ /**
+ * Factory for {@link DoubleSum}.
+ */
+ final class DoubleSumFactory implements DoubleAccumulatorFactory<DoubleSum> {
+ private static final DoubleSumFactory INSTANCE = new DoubleSumFactory();
+
+ private DoubleSumFactory(){
+ }
+
+ @Override
+ public DoubleSum get(double init) {
+ return new DoubleSum(init);
+ }
+
+ public static DoubleSumFactory get() {
+ return INSTANCE;
+ }
+ }
+
+ /**
+ * Factory for {@link DoubleAverage}.
+ */
+ final class DoubleAverageFactory implements DoubleAccumulatorFactory<DoubleAverage> {
+ private static final DoubleAverageFactory INSTANCE = new DoubleAverageFactory();
+
+ private DoubleAverageFactory(){
+ }
+
+ @Override
+ public DoubleAverage get(double init) {
+ return new DoubleAverage(init);
+ }
+
+ public static DoubleAverageFactory get() {
+ return INSTANCE;
+ }
+ }
+
+ /**
+ * {@link DoubleAccumulator} that returns the maximum value.
+ */
+ final class DoubleMaximum implements DoubleAccumulator {
+
+ public static final String NAME = "max";
+
+ private double value;
+
+ private DoubleMaximum(double init) {
+ value = init;
+ }
+
+ @Override
+ public void add(double value) {
+ this.value = Math.max(this.value, value);
+ }
+
+ @Override
+ public double getValue() {
+ return value;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+ }
+
+ /**
+ * {@link DoubleAccumulator} that returns the minimum value.
+ */
+ final class DoubleMinimum implements DoubleAccumulator {
+
+ public static final String NAME = "min";
+
+ private double value;
+
+ private DoubleMinimum(double init) {
+ value = init;
+ }
+
+ @Override
+ public void add(double value) {
+ this.value = Math.min(this.value, value);
+ }
+
+ @Override
+ public double getValue() {
+ return value;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+ }
+
+ /**
+ * {@link DoubleAccumulator} that returns the sum of all values.
+ */
+ final class DoubleSum implements DoubleAccumulator {
+
+ public static final String NAME = "sum";
+
+ private double value;
+
+ private DoubleSum(double init) {
+ value = init;
+ }
+
+ @Override
+ public void add(double value) {
+ this.value += value;
+ }
+
+ @Override
+ public double getValue() {
+ return value;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+ }
+
+ /**
+ * {@link DoubleAccumulator} that returns the average over all values.
+ */
+ final class DoubleAverage implements DoubleAccumulator {
+
+ public static final String NAME = "avg";
+
+ private double sum;
+ private int count;
+
+ private DoubleAverage(double init) {
+ sum = init;
+ count = 1;
+ }
+
+ @Override
+ public void add(double value) {
+ this.sum += value;
+ this.count++;
+ }
+
+ @Override
+ public double getValue() {
+ if (count == 0) {
--- End diff --
yes, relic from a previous version where no initial value was set.
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147651247
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
+ * the values for a set of metrics for a specific entity, or aggregate them across several entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified all aggregations will be returned.
+ * {@code /metrics?get=X,Y&agg=min,max}
+ * The handler will then return an object containing the aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
+ */
+abstract class AbstractAggregatingMetricsHandler extends AbstractJsonRequestHandler {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final String PARAMETER_AGGREGATION = "agg";
+
+ private final MetricFetcher fetcher;
+
+ AbstractAggregatingMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor);
+ this.fetcher = Preconditions.checkNotNull(fetcher);
+ }
+
+ protected abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters);
+
+ @Override
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ fetcher.update();
+ String requestedMetricsList = queryParams.get(AbstractMetricsHandler.PARAMETER_METRICS);
+ String aggTypeList = queryParams.get(PARAMETER_AGGREGATION);
+ MetricStore store = fetcher.getMetricStore();
+
+ if (requestedMetricsList == null) {
+ Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
+ if (stores == null) {
+ return "[]";
+ }
+ Collection<String> list = getAvailableMetrics(stores);
+ return mapMetricListToJson(list);
+ }
+
+ if (requestedMetricsList.isEmpty()) {
+ /*
+ * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a
+ * request for which the "get" parameter is an empty string.
+ */
+ return "[]";
+ }
+
+ String[] requestedMetrics = requestedMetricsList.split(",");
+
+ List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories = new ArrayList<>();
+ // by default we return all aggregations
+ if (aggTypeList == null || aggTypeList.isEmpty()) {
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
+ } else {
+ for (String aggregation : aggTypeList.split(",")) {
+ switch (aggregation.toLowerCase()) {
+ case DoubleAccumulator.DoubleMinimum.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
+ break;
+ case DoubleAccumulator.DoubleMaximum.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
+ break;
+ case DoubleAccumulator.DoubleSum.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
+ break;
+ case DoubleAccumulator.DoubleAverage.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
+ break;
+ default:
+ log.warn("Invalid aggregation specified: {}", aggregation.toLowerCase());
+ }
+ }
+ }
+ Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
+ if (stores == null){
+ return "[]";
+ }
+ return getAggregatedMetricValues(stores, requestedMetrics, requestedAggregationsFactories);
+ } catch (Exception e) {
+ throw new CompletionException(new FlinkException("Could not retrieve metrics.", e));
+ }
+ },
+ executor);
+ }
+
+ /**
+ * Returns a JSON string containing a list of all available metrics in the given stores. Effectively this method maps
+ * the union of all key-sets to JSON.
+ *
+ * @param stores metrics
+ * @return JSON string containing a list of all available metrics
+ */
+ private static Collection<String> getAvailableMetrics(Collection<? extends MetricStore.ComponentMetricStore> stores) {
+ Set<String> uniqueMetrics = new HashSet<>();
+ for (MetricStore.ComponentMetricStore store : stores) {
+ uniqueMetrics.addAll(store.metrics.keySet());
+ }
+ return uniqueMetrics;
+ }
+
+ private static String mapMetricListToJson(Collection<String> metrics) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ gen.writeStartArray();
+ for (String m : metrics) {
+ gen.writeStartObject();
+ gen.writeStringField("id", m);
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+
+ gen.close();
+ return writer.toString();
+ }
+
+ /**
+ * Extracts and aggregates all requested metrics from the given metric stores, and maps the result to a JSON string.
+ *
+ * @param stores available metrics
+ * @param requestedMetrics ids of requested metrics
+ * @param requestedAggregationsFactories requested aggregations
+ * @return JSON string containing the requested metrics
+ * @throws IOException
+ */
+ private String getAggregatedMetricValues(Collection<? extends MetricStore.ComponentMetricStore> stores, String[] requestedMetrics, List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories) throws IOException {
--- End diff --
This is a personal style thing but I would break the line of the arguments into multiple lines, with one argument each.
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147649898
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
+ * the values for a set of metrics for a specific entity, or aggregate them across several entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified all aggregations will be returned.
--- End diff --
If the parameter is not specified all aggregations will be returned -> If the parameter is not specified**,** all aggregations will be returned
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147653983
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
+import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
+
+/**
+ * Request handler that returns, aggregated across all subtasks, a list of all available metrics or the values
+ * for a set of metrics.
+ */
+public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler {
+ public AggregatingSubtasksMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
+ }
+
+ @Override
+ protected Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters) {
+ String jobID = pathParameters.get(PARAMETER_JOB_ID);
+ String taskID = pathParameters.get(PARAMETER_VERTEX_ID);
--- End diff --
What happens here if `jobID == null`?
---
[GitHub] flink issue #4901: [FLINK-7781][metrics][REST] Support on-demand aggregation...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/4901
@kl0u I've addressed your comments.
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147657820
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
+
+ /**
+ * Adds the given value to this accumulator.
+ *
+ * @param value value to add
+ */
+ void add(double value);
+
+ /**
+ * Returns the current value of this accumulator.
+ *
+ * @return current value of this accumulator
+ */
+ double getValue();
+
+ /**
+ * Returns the name of this accumulator type. This name is used as a suffix for exposed metrics.
+ *
+ * @return name of this accumulator type
+ */
+ String getName();
+
+ /**
+ * A factory for {@link DoubleAccumulator}s. This allows us to regenerate a new set of accumulators for each metrics
+ * without re-evaluating the "agg" query parameter or re-using existing accumulators.
+ *
+ * @param <A> DoubleAccumulator subclass
+ */
+ interface DoubleAccumulatorFactory<A extends DoubleAccumulator> {
+ /**
+ * Creates a new accumulator with the given initial value.
+ *
+ * @param init initial value
+ * @return new accumulator with the given initial value
+ */
+ A get(double init);
+ }
+
+ /**
+ * Factory for {@link DoubleMaximum}.
+ */
+ final class DoubleMaximumFactory implements DoubleAccumulatorFactory<DoubleMaximum> {
+ private static final DoubleMaximumFactory INSTANCE = new DoubleMaximumFactory();
+
+ private DoubleMaximumFactory(){
+ }
+
+ @Override
+ public DoubleMaximum get(double init) {
+ return new DoubleMaximum(init);
+ }
+
+ public static DoubleMaximumFactory get() {
+ return INSTANCE;
+ }
+ }
+
+ /**
+ * Factory for {@link DoubleMinimum}.
+ */
+ final class DoubleMinimumFactory implements DoubleAccumulatorFactory<DoubleMinimum> {
+ private static final DoubleMinimumFactory INSTANCE = new DoubleMinimumFactory();
+
+ private DoubleMinimumFactory(){
+ }
+
+ @Override
+ public DoubleMinimum get(double init) {
+ return new DoubleMinimum(init);
+ }
+
+ public static DoubleMinimumFactory get() {
+ return INSTANCE;
+ }
+ }
+
+ /**
+ * Factory for {@link DoubleSum}.
+ */
+ final class DoubleSumFactory implements DoubleAccumulatorFactory<DoubleSum> {
+ private static final DoubleSumFactory INSTANCE = new DoubleSumFactory();
+
+ private DoubleSumFactory(){
+ }
+
+ @Override
+ public DoubleSum get(double init) {
+ return new DoubleSum(init);
+ }
+
+ public static DoubleSumFactory get() {
+ return INSTANCE;
+ }
+ }
+
+ /**
+ * Factory for {@link DoubleAverage}.
+ */
+ final class DoubleAverageFactory implements DoubleAccumulatorFactory<DoubleAverage> {
+ private static final DoubleAverageFactory INSTANCE = new DoubleAverageFactory();
+
+ private DoubleAverageFactory(){
+ }
+
+ @Override
+ public DoubleAverage get(double init) {
+ return new DoubleAverage(init);
+ }
+
+ public static DoubleAverageFactory get() {
+ return INSTANCE;
+ }
+ }
+
+ /**
+ * {@link DoubleAccumulator} that returns the maximum value.
+ */
+ final class DoubleMaximum implements DoubleAccumulator {
+
+ public static final String NAME = "max";
+
+ private double value;
+
+ private DoubleMaximum(double init) {
+ value = init;
+ }
+
+ @Override
+ public void add(double value) {
+ this.value = Math.max(this.value, value);
+ }
+
+ @Override
+ public double getValue() {
+ return value;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+ }
+
+ /**
+ * {@link DoubleAccumulator} that returns the minimum value.
+ */
+ final class DoubleMinimum implements DoubleAccumulator {
+
+ public static final String NAME = "min";
+
+ private double value;
+
+ private DoubleMinimum(double init) {
+ value = init;
+ }
+
+ @Override
+ public void add(double value) {
+ this.value = Math.min(this.value, value);
+ }
+
+ @Override
+ public double getValue() {
+ return value;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+ }
+
+ /**
+ * {@link DoubleAccumulator} that returns the sum of all values.
+ */
+ final class DoubleSum implements DoubleAccumulator {
+
+ public static final String NAME = "sum";
+
+ private double value;
+
+ private DoubleSum(double init) {
+ value = init;
+ }
+
+ @Override
+ public void add(double value) {
+ this.value += value;
+ }
+
+ @Override
+ public double getValue() {
+ return value;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+ }
+
+ /**
+ * {@link DoubleAccumulator} that returns the average over all values.
+ */
+ final class DoubleAverage implements DoubleAccumulator {
+
+ public static final String NAME = "avg";
+
+ private double sum;
+ private int count;
+
+ private DoubleAverage(double init) {
+ sum = init;
+ count = 1;
+ }
+
+ @Override
+ public void add(double value) {
+ this.sum += value;
+ this.count++;
+ }
+
+ @Override
+ public double getValue() {
+ if (count == 0) {
--- End diff --
`count` cannot be `0` right? Because we set it to `1` in the constructor.
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147697678
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
+import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
+
+/**
+ * Request handler that returns, aggregated across all subtasks, a list of all available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler {
+ public AggregatingSubtasksMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
+ }
+
+ @Override
+ protected Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters) {
+ String jobID = pathParameters.get(PARAMETER_JOB_ID);
+ String taskID = pathParameters.get(PARAMETER_VERTEX_ID);
+ if (jobID == null) {
+ return Collections.emptyList();
+ }
+ if (taskID == null) {
+ return Collections.emptyList();
+ }
+ String subtasksList = queryParameters.get("subtasks");
+ if (subtasksList == null || subtasksList.isEmpty()) {
+ return store.getTaskMetricStore(jobID, taskID).getAllSubtaskMetricStores();
+ } else {
+ Iterable<Integer> subtasks = getIntegerRangeFromString(subtasksList);
+ Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>();
+ for (int subtask : subtasks) {
+ subtaskStores.add(store.getSubtaskMetricStore(jobID, taskID, subtask));
+ }
+ return subtaskStores;
+ }
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{"/jobs/:jobid/vertices/:vertexid/subtasks/metrics"};
+ }
+
+ private Iterable<Integer> getIntegerRangeFromString(String rangeDefinition) throws NumberFormatException {
+ final String[] ranges = rangeDefinition.trim().split(",");
+
+ UnionIterator<Integer> iterators = new UnionIterator<>();
+
+ for (String rawRange : ranges) {
+ try {
+ Iterator<Integer> rangeIterator;
+ String range = rawRange.trim();
+ int dashIdx = range.indexOf('-');
+ if (dashIdx == -1) {
+ // only one value in range:
+ rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
+ } else {
+ // evaluate range
+ final int start = Integer.valueOf(range.substring(0, dashIdx));
+ final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
+ rangeIterator = new Iterator<Integer>() {
+ int i = start;
+
+ @Override
+ public boolean hasNext() {
+ return i <= end;
+ }
+
+ @Override
+ public Integer next() {
+ return i++;
--- End diff --
Here you should throw a `NoSuchElementException` if the `next()` is called for an index outside the size of the list (it is part of the iterator contract).
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147657052
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across all subtasks of a single tasks, a list of all available metrics or the
+ * values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg".
+ */
+public class SubtaskMetricsHandler extends AbstractMetricsHandler {
+ private static final String SUBTASK_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/metrics";
+
+ public SubtaskMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{SUBTASK_METRICS_REST_PATH};
+ }
+
+ @Override
+ protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+ String subtaskNumString = pathParams.get(SubtaskExecutionAttemptDetailsHandler.PARAMETER_SUBTASK_INDEX);
+ int subtaskNum;
+ try {
+ subtaskNum = Integer.valueOf(subtaskNumString);
+ } catch (NumberFormatException nfe) {
+ return null;
+ }
--- End diff --
Can this also be an invalid subtask number? E.g. we have 5 of them and we request subtaskid 11? In this case there should be a check that logs a warning and potentially sets he `subtaskNum` to `null` so that we go down the same path as a non-integer value.
---
[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4901#discussion_r147669457
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
+ * the values for a set of metrics for a specific entity, or aggregate them across several entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified all aggregations will be returned.
+ * {@code /metrics?get=X,Y&agg=min,max}
+ * The handler will then return an object containing the aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
+ */
+abstract class AbstractAggregatingMetricsHandler extends AbstractJsonRequestHandler {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final String PARAMETER_AGGREGATION = "agg";
+
+ private final MetricFetcher fetcher;
+
+ AbstractAggregatingMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor);
+ this.fetcher = Preconditions.checkNotNull(fetcher);
+ }
+
+ protected abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters);
+
+ @Override
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ fetcher.update();
+ String requestedMetricsList = queryParams.get(AbstractMetricsHandler.PARAMETER_METRICS);
+ String aggTypeList = queryParams.get(PARAMETER_AGGREGATION);
+ MetricStore store = fetcher.getMetricStore();
+
+ if (requestedMetricsList == null) {
+ Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
+ if (stores == null) {
+ return "[]";
+ }
+ Collection<String> list = getAvailableMetrics(stores);
+ return mapMetricListToJson(list);
+ }
+
+ if (requestedMetricsList.isEmpty()) {
+ /*
+ * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a
+ * request for which the "get" parameter is an empty string.
+ */
+ return "[]";
+ }
+
+ String[] requestedMetrics = requestedMetricsList.split(",");
+
+ List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories = new ArrayList<>();
+ // by default we return all aggregations
+ if (aggTypeList == null || aggTypeList.isEmpty()) {
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
+ } else {
+ for (String aggregation : aggTypeList.split(",")) {
+ switch (aggregation.toLowerCase()) {
+ case DoubleAccumulator.DoubleMinimum.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
+ break;
+ case DoubleAccumulator.DoubleMaximum.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
+ break;
+ case DoubleAccumulator.DoubleSum.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
+ break;
+ case DoubleAccumulator.DoubleAverage.NAME:
+ requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
+ break;
+ default:
+ log.warn("Invalid aggregation specified: {}", aggregation.toLowerCase());
+ }
+ }
+ }
+ Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
+ if (stores == null){
+ return "[]";
+ }
+ return getAggregatedMetricValues(stores, requestedMetrics, requestedAggregationsFactories);
+ } catch (Exception e) {
+ throw new CompletionException(new FlinkException("Could not retrieve metrics.", e));
+ }
+ },
+ executor);
+ }
+
+ /**
+ * Returns a JSON string containing a list of all available metrics in the given stores. Effectively this method maps
+ * the union of all key-sets to JSON.
+ *
+ * @param stores metrics
+ * @return JSON string containing a list of all available metrics
+ */
+ private static Collection<String> getAvailableMetrics(Collection<? extends MetricStore.ComponentMetricStore> stores) {
+ Set<String> uniqueMetrics = new HashSet<>();
+ for (MetricStore.ComponentMetricStore store : stores) {
+ uniqueMetrics.addAll(store.metrics.keySet());
+ }
+ return uniqueMetrics;
+ }
+
+ private static String mapMetricListToJson(Collection<String> metrics) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ gen.writeStartArray();
+ for (String m : metrics) {
+ gen.writeStartObject();
+ gen.writeStringField("id", m);
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+
+ gen.close();
+ return writer.toString();
+ }
+
+ /**
+ * Extracts and aggregates all requested metrics from the given metric stores, and maps the result to a JSON string.
+ *
+ * @param stores available metrics
+ * @param requestedMetrics ids of requested metrics
+ * @param requestedAggregationsFactories requested aggregations
+ * @return JSON string containing the requested metrics
+ * @throws IOException
+ */
+ private String getAggregatedMetricValues(Collection<? extends MetricStore.ComponentMetricStore> stores, String[] requestedMetrics, List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ gen.writeStartArray();
+ for (String requestedMetric : requestedMetrics) {
+ final Collection<Double> values = new ArrayList<>();
+ try {
+ for (MetricStore.ComponentMetricStore store : stores) {
+ String stringValue = store.metrics.get(requestedMetric);
+ if (stringValue != null) {
+ values.add(Double.valueOf(stringValue));
+ }
+ }
+ } catch (NumberFormatException nfe) {
+ // metric is not numeric so we can't perform aggregations => ignore it
+ continue;
--- End diff --
this can happen for non-numeric metrics, for example the path to the last externalized checkpoint. Ill add a warn message.
---