You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/04/03 10:40:35 UTC
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/5805
[FLINK-8370][REST] Port AggregatingMetricsHandler to flip6
## What is the purpose of the change
This PR ports the `AggregatingMetricsHandler` classes to flip6.
Additionally this PR contains 2 minor changes:
* the `MessageParameter` constructor is now `protected`
* the converter methods of `QueryParameters` may now also throw `ConversionExceptions` like their `PathParamater` counter-part
## Brief change log
* the `MessageParameter` constructor is now `protected`
* the converter methods of `QueryParameters` may now also throw `ConversionExceptions` like their `PathParamater` counter-part
* port `AggregatingMetricsHandler` to flip6
* duplicate `DoubleAccumulator` class
* define headers, various parameters and response bodies
* define handlers
* conceptually they work like the legacy versions, but I had to beef up the aggregation factories a bit as the existing approach wasn't compatible with an actual constructor
* add handlers to WebMonitorEndpoint
## Verifying this change
This change added tests and can be verified as follows:
* run tests extending `AggregatingMetricsHandlerTestBase`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (documented (via legacy documentation))
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink 8370
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5805.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5805
----
commit e238b85d12feda1f66de9ebf8aaaeaa223c3129d
Author: zentol <ch...@...>
Date: 2018-03-26T12:41:03Z
[hotfix][metrics] Make MessageParameter constructor protected
commit 188f5ef96bce2a7fa8ee382ab1d5fc900d078149
Author: zentol <ch...@...>
Date: 2018-03-26T12:41:25Z
[hotfix][metrics] Allow QueryParameter converters to throw ConversionExceptions
commit 85d00792ca34849034bc7c59a9b7d07ed4fff486
Author: zentol <ch...@...>
Date: 2018-03-28T10:52:07Z
[FLINK-8370][REST] Port AggregatingMetricsHandler to flip6
----
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181691127
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java ---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned.
+ * {@code /metrics?get=X,Y&agg=min,max}
+ * The handler will then return a list of objects containing the aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> {
+
+ private final Executor executor;
+ private final MetricFetcher<?> fetcher;
+
+ protected AbstractAggregatingMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ AbstractAggregatedMetricsHeaders<P> messageHeaders,
+ Executor executor,
+ MetricFetcher<?> fetcher) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.fetcher = Preconditions.checkNotNull(fetcher);
+ }
+
+ abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, P> request);
+
+ @Override
+ protected CompletableFuture<AggregatedMetricsResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, P> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ fetcher.update();
+ List<String> requestedMetrics = request.getQueryParameter(MetricsFilterParameter.class);
+ List<MetricsAggregationParameter.AggregationMode> requestedAggregations = request.getQueryParameter(MetricsAggregationParameter.class);
+ MetricStore store = fetcher.getMetricStore();
+
+ Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, request);
+ if (stores == null) {
--- End diff --
added the `Nonnull` annotation
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181664352
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+/**
+ * Headers for aggregating subtask metrics.
+ */
+public class AggregatedSubtaskMetricsHeaders extends AbstractAggregatedMetricsHeaders<AggregatedSubtaskMetricsParameters> {
+
+ private static final AggregatedSubtaskMetricsHeaders INSTANCE = new AggregatedSubtaskMetricsHeaders();
+
+ private AggregatedSubtaskMetricsHeaders() {
+ }
+
+ @Override
+ public AggregatedSubtaskMetricsParameters getUnresolvedMessageParameters() {
+ return new AggregatedSubtaskMetricsParameters();
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/jobs/:jobid/vertices/:vertexid/subtasks/metrics";
--- End diff --
normally `"/jobs/:" + JobIDPathParameter.KEY + "vertices/:" [...]` is used
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5805
---
[GitHub] flink issue #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler to flip...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/5805
merging.
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181401972
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Locale;
+
+/**
+ * TODO: add javadoc.
--- End diff --
Should be replaced.
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181414763
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java ---
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
--- End diff --
This is virtually the same as `org.apache.flink.runtime.rest.handler.legacy.metrics.DoubleAccumulator`. Why do we have to duplicate it?
```
diff ./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java ./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
19c19
< package org.apache.flink.runtime.rest.handler.legacy.metrics;
---
> package org.apache.flink.runtime.rest.handler.job.metrics;
```
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181534458
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedSubtaskMetricsParameters> {
+
+ public AggregatingSubtasksMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ Executor executor,
+ MetricFetcher<?> fetcher) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
+ }
+
+ @Override
+ Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedSubtaskMetricsParameters> request) {
+ JobID jobID = request.getPathParameter(JobIDPathParameter.class);
+ JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class);
+
+ Collection<String> subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class);
+ if (subtaskRanges.isEmpty()) {
+ return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores();
+ } else {
+ Iterable<Integer> subtasks = getIntegerRangeFromString(subtaskRanges);
+ Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>(8);
+ for (int subtask : subtasks) {
+ subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask));
+ }
+ return subtaskStores;
+ }
+ }
+
+ private Iterable<Integer> getIntegerRangeFromString(Collection<String> ranges) {
+ UnionIterator<Integer> iterators = new UnionIterator<>();
+
+ for (String rawRange : ranges) {
+ try {
+ Iterator<Integer> rangeIterator;
+ String range = rawRange.trim();
+ int dashIdx = range.indexOf('-');
+ if (dashIdx == -1) {
+ // only one value in range:
+ rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
+ } else {
+ // evaluate range
+ final int start = Integer.valueOf(range.substring(0, dashIdx));
+ final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
+ rangeIterator = new Iterator<Integer>() {
--- End diff --
sweet, one more utility class to remove.
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181583359
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java ---
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
--- End diff --
Remembered why I did it this way. We would have to make the entire `DoubleAccumulator` class public otherwise.
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181660436
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java ---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned.
+ * {@code /metrics?get=X,Y&agg=min,max}
+ * The handler will then return a list of objects containing the aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> {
+
+ private final Executor executor;
+ private final MetricFetcher<?> fetcher;
+
+ protected AbstractAggregatingMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ AbstractAggregatedMetricsHeaders<P> messageHeaders,
+ Executor executor,
+ MetricFetcher<?> fetcher) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.fetcher = Preconditions.checkNotNull(fetcher);
+ }
+
+ abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, P> request);
+
+ @Override
+ protected CompletableFuture<AggregatedMetricsResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, P> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ fetcher.update();
+ List<String> requestedMetrics = request.getQueryParameter(MetricsFilterParameter.class);
+ List<MetricsAggregationParameter.AggregationMode> requestedAggregations = request.getQueryParameter(MetricsAggregationParameter.class);
+ MetricStore store = fetcher.getMetricStore();
+
+ Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, request);
+ if (stores == null) {
--- End diff --
Imo if `getStores` is allowed to return `null`, the method should be annotated if `@Nullable`. Here I cannot find a code path where `null` is returned. Maybe we should not allow `null` to be returned.
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181666171
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Based {@link MessageHeaders} class for aggregating metrics.
+ */
+public abstract class AbstractAggregatedMetricsHeaders<P extends AbstractAggregatedMetricsParameters> implements MessageHeaders<EmptyRequestBody, AggregatedMetricsResponseBody, P> {
--- End diff --
nit: raw types (`AbstractAggregatedMetricsParameters`)
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181411248
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedSubtaskMetricsParameters> {
+
+ public AggregatingSubtasksMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ Executor executor,
+ MetricFetcher<?> fetcher) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
+ }
+
+ @Override
+ Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedSubtaskMetricsParameters> request) {
+ JobID jobID = request.getPathParameter(JobIDPathParameter.class);
+ JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class);
+
+ Collection<String> subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class);
+ if (subtaskRanges.isEmpty()) {
+ return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores();
+ } else {
+ Iterable<Integer> subtasks = getIntegerRangeFromString(subtaskRanges);
+ Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>(8);
+ for (int subtask : subtasks) {
+ subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask));
+ }
+ return subtaskStores;
+ }
+ }
+
+ private Iterable<Integer> getIntegerRangeFromString(Collection<String> ranges) {
+ UnionIterator<Integer> iterators = new UnionIterator<>();
+
+ for (String rawRange : ranges) {
+ try {
+ Iterator<Integer> rangeIterator;
+ String range = rawRange.trim();
+ int dashIdx = range.indexOf('-');
+ if (dashIdx == -1) {
+ // only one value in range:
+ rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
+ } else {
+ // evaluate range
+ final int start = Integer.valueOf(range.substring(0, dashIdx));
+ final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
+ rangeIterator = new Iterator<Integer>() {
--- End diff --
Isn't this the same as: `IntStream.rangeClosed(start, end).iterator();`
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181404252
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java ---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned.
+ * {@code /metrics?get=X,Y&agg=min,max}
+ * The handler will then return a list of objects containing the aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> {
+
+ private final Executor executor;
+ private final MetricFetcher<?> fetcher;
+
+ protected AbstractAggregatingMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ AbstractAggregatedMetricsHeaders<P> messageHeaders,
+ Executor executor,
+ MetricFetcher fetcher) {
--- End diff --
nit: use of rawtype
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181534395
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java ---
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
--- End diff --
ah yes, I copied it at the start but never got around to deleting the old one..
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181408771
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedSubtaskMetricsParameters> {
+
+ public AggregatingSubtasksMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ Executor executor,
+ MetricFetcher<?> fetcher) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
+ }
+
+ @Override
+ Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedSubtaskMetricsParameters> request) {
+ JobID jobID = request.getPathParameter(JobIDPathParameter.class);
+ JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class);
+
+ Collection<String> subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class);
+ if (subtaskRanges.isEmpty()) {
+ return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores();
--- End diff --
I think there is a potential NPE because `store.getTaskMetricStore(jobID.toString(), taskID.toString())` can return `null`.
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181404370
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java ---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned.
+ * {@code /metrics?get=X,Y&agg=min,max}
+ * The handler will then return a list of objects containing the aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> {
+
+ private final Executor executor;
+ private final MetricFetcher<?> fetcher;
+
+ protected AbstractAggregatingMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ AbstractAggregatedMetricsHeaders<P> messageHeaders,
+ Executor executor,
+ MetricFetcher fetcher) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+ this.executor = executor;
--- End diff --
`null` checks missing for `executor` and `fetcher`
---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5805#discussion_r181411022
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedSubtaskMetricsParameters> {
+
+ public AggregatingSubtasksMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ Executor executor,
+ MetricFetcher<?> fetcher) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
+ }
+
+ @Override
+ Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedSubtaskMetricsParameters> request) {
+ JobID jobID = request.getPathParameter(JobIDPathParameter.class);
+ JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class);
+
+ Collection<String> subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class);
+ if (subtaskRanges.isEmpty()) {
+ return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores();
+ } else {
+ Iterable<Integer> subtasks = getIntegerRangeFromString(subtaskRanges);
+ Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>(8);
+ for (int subtask : subtasks) {
+ subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask));
+ }
+ return subtaskStores;
+ }
+ }
+
+ private Iterable<Integer> getIntegerRangeFromString(Collection<String> ranges) {
+ UnionIterator<Integer> iterators = new UnionIterator<>();
+
+ for (String rawRange : ranges) {
+ try {
+ Iterator<Integer> rangeIterator;
+ String range = rawRange.trim();
+ int dashIdx = range.indexOf('-');
+ if (dashIdx == -1) {
+ // only one value in range:
+ rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
+ } else {
+ // evaluate range
+ final int start = Integer.valueOf(range.substring(0, dashIdx));
+ final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
+ rangeIterator = new Iterator<Integer>() {
+ int i = start;
+
+ @Override
+ public boolean hasNext() {
+ return i <= end;
+ }
+
+ @Override
+ public Integer next() {
+ if (hasNext()) {
+ return i++;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove not supported");
--- End diff --
That's already the default implementation for that interface.
---