You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/04/03 10:40:35 UTC

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/5805

    [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6 

    ## What is the purpose of the change
    
    This PR ports the `AggregatingMetricsHandler` classes to flip6.
    
    Additionally this PR contains 2 minor changes:
    * the `MessageParameter` constructor is now `protected`
    * the converter methods of `QueryParameters` may now also throw `ConversionExceptions` like their `PathParamater` counter-part
    
    ## Brief change log
    
    * the `MessageParameter` constructor is now `protected`
    * the converter methods of `QueryParameters` may now also throw `ConversionExceptions` like their `PathParamater` counter-part
    * port `AggregatingMetricsHandler` to flip6
      * duplicate `DoubleAccumulator` class
      * define headers, various parameters and response bodies
      * define handlers
        * conceptually they work like the legacy versions, but I had to beef up the aggregation factories a bit as the existing approach wasn't compatible with an actual constructor
      *  add handlers to WebMonitorEndpoint
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    * run tests extending `AggregatingMetricsHandlerTestBase`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (documented (via legacy documentation))


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 8370

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5805.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5805
    
----
commit e238b85d12feda1f66de9ebf8aaaeaa223c3129d
Author: zentol <ch...@...>
Date:   2018-03-26T12:41:03Z

    [hotfix][metrics] Make MessageParameter constructor protected

commit 188f5ef96bce2a7fa8ee382ab1d5fc900d078149
Author: zentol <ch...@...>
Date:   2018-03-26T12:41:25Z

    [hotfix][metrics] Allow QueryParameter converters to throw ConversionExceptions

commit 85d00792ca34849034bc7c59a9b7d07ed4fff486
Author: zentol <ch...@...>
Date:   2018-03-28T10:52:07Z

    [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6

----


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181691127
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java ---
    @@ -0,0 +1,302 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job.metrics;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
    +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
    +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
    +import java.util.concurrent.Executor;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics
    + * or the aggregated values of them across all/selected entities.
    + *
    + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
    + * {@code [ { "id" : "X" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
    + * The handler will then return a list containing the values of the requested metrics.
    + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
    + *
    + * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
    + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned.
    + * {@code /metrics?get=X,Y&agg=min,max}
    + * The handler will then return a list of objects containing the aggregations for the requested metrics.
    + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
    + */
    +public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> {
    +
    +	private final Executor executor;
    +	private final MetricFetcher<?> fetcher;
    +
    +	protected AbstractAggregatingMetricsHandler(
    +			CompletableFuture<String> localRestAddress,
    +			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
    +			Time timeout,
    +			Map<String, String> responseHeaders,
    +			AbstractAggregatedMetricsHeaders<P> messageHeaders,
    +			Executor executor,
    +			MetricFetcher<?> fetcher) {
    +		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
    +		this.executor = Preconditions.checkNotNull(executor);
    +		this.fetcher = Preconditions.checkNotNull(fetcher);
    +	}
    +
    +	abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, P> request);
    +
    +	@Override
    +	protected CompletableFuture<AggregatedMetricsResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, P> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
    +		return CompletableFuture.supplyAsync(
    +			() -> {
    +				try {
    +					fetcher.update();
    +					List<String> requestedMetrics = request.getQueryParameter(MetricsFilterParameter.class);
    +					List<MetricsAggregationParameter.AggregationMode> requestedAggregations = request.getQueryParameter(MetricsAggregationParameter.class);
    +					MetricStore store = fetcher.getMetricStore();
    +
    +					Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, request);
    +					if (stores == null) {
    --- End diff --
    
    added the `Nonnull` annotation


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181664352
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java ---
    @@ -0,0 +1,44 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.job.metrics;
    +
    +/**
    + * Headers for aggregating subtask metrics.
    + */
    +public class AggregatedSubtaskMetricsHeaders extends AbstractAggregatedMetricsHeaders<AggregatedSubtaskMetricsParameters> {
    +
    +	private static final AggregatedSubtaskMetricsHeaders INSTANCE = new AggregatedSubtaskMetricsHeaders();
    +
    +	private AggregatedSubtaskMetricsHeaders() {
    +	}
    +
    +	@Override
    +	public AggregatedSubtaskMetricsParameters getUnresolvedMessageParameters() {
    +		return new AggregatedSubtaskMetricsParameters();
    +	}
    +
    +	@Override
    +	public String getTargetRestEndpointURL() {
    +		return "/jobs/:jobid/vertices/:vertexid/subtasks/metrics";
    --- End diff --
    
    normally `"/jobs/:" + JobIDPathParameter.KEY + "vertices/:" [...]` is used


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5805


---

[GitHub] flink issue #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler to flip...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5805
  
    merging.


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181401972
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.job.metrics;
    +
    +import org.apache.flink.runtime.rest.messages.ConversionException;
    +import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
    +
    +import java.util.Locale;
    +
    +/**
    + * TODO: add javadoc.
    --- End diff --
    
    Should be replaced.


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181414763
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java ---
    @@ -0,0 +1,257 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job.metrics;
    +
    +/**
    + * An interface for accumulating double values.
    + */
    +interface DoubleAccumulator {
    --- End diff --
    
    This is virtually the same as `org.apache.flink.runtime.rest.handler.legacy.metrics.DoubleAccumulator`. Why do we have to duplicate it?
    
    ```
    diff ./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java ./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
    19c19
    < package org.apache.flink.runtime.rest.handler.legacy.metrics;
    ---
    > package org.apache.flink.runtime.rest.handler.job.metrics;
    ```


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181534458
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job.metrics;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
    +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
    +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.UnionIterator;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values
    + * for a set of metrics.
    + *
    + * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
    + * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
    + */
    +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedSubtaskMetricsParameters> {
    +
    +	public AggregatingSubtasksMetricsHandler(
    +			CompletableFuture<String> localRestAddress,
    +			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
    +			Time timeout,
    +			Map<String, String> responseHeaders,
    +			Executor executor,
    +			MetricFetcher<?> fetcher) {
    +		super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
    +	}
    +
    +	@Override
    +	Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedSubtaskMetricsParameters> request) {
    +		JobID jobID = request.getPathParameter(JobIDPathParameter.class);
    +		JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class);
    +
    +		Collection<String> subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class);
    +		if (subtaskRanges.isEmpty()) {
    +			return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores();
    +		} else {
    +			Iterable<Integer> subtasks = getIntegerRangeFromString(subtaskRanges);
    +			Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>(8);
    +			for (int subtask : subtasks) {
    +				subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask));
    +			}
    +			return subtaskStores;
    +		}
    +	}
    +
    +	private Iterable<Integer> getIntegerRangeFromString(Collection<String> ranges) {
    +		UnionIterator<Integer> iterators = new UnionIterator<>();
    +
    +		for (String rawRange : ranges) {
    +			try {
    +				Iterator<Integer> rangeIterator;
    +				String range = rawRange.trim();
    +				int dashIdx = range.indexOf('-');
    +				if (dashIdx == -1) {
    +					// only one value in range:
    +					rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
    +				} else {
    +					// evaluate range
    +					final int start = Integer.valueOf(range.substring(0, dashIdx));
    +					final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
    +					rangeIterator = new Iterator<Integer>() {
    --- End diff --
    
    sweet, one more utility class to remove.


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181583359
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java ---
    @@ -0,0 +1,257 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job.metrics;
    +
    +/**
    + * An interface for accumulating double values.
    + */
    +interface DoubleAccumulator {
    --- End diff --
    
    Remembered why I did it this way. We would have to make the entire `DoubleAccumulator` class public otherwise.


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181660436
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java ---
    @@ -0,0 +1,302 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job.metrics;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
    +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
    +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
    +import java.util.concurrent.Executor;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics
    + * or the aggregated values of them across all/selected entities.
    + *
    + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
    + * {@code [ { "id" : "X" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
    + * The handler will then return a list containing the values of the requested metrics.
    + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
    + *
    + * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
    + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned.
    + * {@code /metrics?get=X,Y&agg=min,max}
    + * The handler will then return a list of objects containing the aggregations for the requested metrics.
    + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
    + */
    +public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> {
    +
    +	private final Executor executor;
    +	private final MetricFetcher<?> fetcher;
    +
    +	protected AbstractAggregatingMetricsHandler(
    +			CompletableFuture<String> localRestAddress,
    +			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
    +			Time timeout,
    +			Map<String, String> responseHeaders,
    +			AbstractAggregatedMetricsHeaders<P> messageHeaders,
    +			Executor executor,
    +			MetricFetcher<?> fetcher) {
    +		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
    +		this.executor = Preconditions.checkNotNull(executor);
    +		this.fetcher = Preconditions.checkNotNull(fetcher);
    +	}
    +
    +	abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, P> request);
    +
    +	@Override
    +	protected CompletableFuture<AggregatedMetricsResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, P> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
    +		return CompletableFuture.supplyAsync(
    +			() -> {
    +				try {
    +					fetcher.update();
    +					List<String> requestedMetrics = request.getQueryParameter(MetricsFilterParameter.class);
    +					List<MetricsAggregationParameter.AggregationMode> requestedAggregations = request.getQueryParameter(MetricsAggregationParameter.class);
    +					MetricStore store = fetcher.getMetricStore();
    +
    +					Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, request);
    +					if (stores == null) {
    --- End diff --
    
    Imo if `getStores` is allowed to return `null`, the method should be annotated if `@Nullable`. Here I cannot find a code path where `null` is returned. Maybe we should not allow `null` to be returned.


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181666171
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.job.metrics;
    +
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +/**
    + * Based {@link MessageHeaders} class for aggregating metrics.
    + */
    +public abstract class AbstractAggregatedMetricsHeaders<P extends AbstractAggregatedMetricsParameters> implements MessageHeaders<EmptyRequestBody, AggregatedMetricsResponseBody, P> {
    --- End diff --
    
    nit: raw types (`AbstractAggregatedMetricsParameters`)


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181411248
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job.metrics;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
    +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
    +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.UnionIterator;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values
    + * for a set of metrics.
    + *
    + * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
    + * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
    + */
    +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedSubtaskMetricsParameters> {
    +
    +	public AggregatingSubtasksMetricsHandler(
    +			CompletableFuture<String> localRestAddress,
    +			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
    +			Time timeout,
    +			Map<String, String> responseHeaders,
    +			Executor executor,
    +			MetricFetcher<?> fetcher) {
    +		super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
    +	}
    +
    +	@Override
    +	Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedSubtaskMetricsParameters> request) {
    +		JobID jobID = request.getPathParameter(JobIDPathParameter.class);
    +		JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class);
    +
    +		Collection<String> subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class);
    +		if (subtaskRanges.isEmpty()) {
    +			return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores();
    +		} else {
    +			Iterable<Integer> subtasks = getIntegerRangeFromString(subtaskRanges);
    +			Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>(8);
    +			for (int subtask : subtasks) {
    +				subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask));
    +			}
    +			return subtaskStores;
    +		}
    +	}
    +
    +	private Iterable<Integer> getIntegerRangeFromString(Collection<String> ranges) {
    +		UnionIterator<Integer> iterators = new UnionIterator<>();
    +
    +		for (String rawRange : ranges) {
    +			try {
    +				Iterator<Integer> rangeIterator;
    +				String range = rawRange.trim();
    +				int dashIdx = range.indexOf('-');
    +				if (dashIdx == -1) {
    +					// only one value in range:
    +					rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
    +				} else {
    +					// evaluate range
    +					final int start = Integer.valueOf(range.substring(0, dashIdx));
    +					final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
    +					rangeIterator = new Iterator<Integer>() {
    --- End diff --
    
    Isn't this the same as: `IntStream.rangeClosed(start, end).iterator();`


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181404252
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java ---
    @@ -0,0 +1,302 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job.metrics;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
    +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
    +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
    +import java.util.concurrent.Executor;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics
    + * or the aggregated values of them across all/selected entities.
    + *
    + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
    + * {@code [ { "id" : "X" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
    + * The handler will then return a list containing the values of the requested metrics.
    + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
    + *
    + * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
    + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned.
    + * {@code /metrics?get=X,Y&agg=min,max}
    + * The handler will then return a list of objects containing the aggregations for the requested metrics.
    + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
    + */
    +public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> {
    +
    +	private final Executor executor;
    +	private final MetricFetcher<?> fetcher;
    +
    +	protected AbstractAggregatingMetricsHandler(
    +			CompletableFuture<String> localRestAddress,
    +			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
    +			Time timeout,
    +			Map<String, String> responseHeaders,
    +			AbstractAggregatedMetricsHeaders<P> messageHeaders,
    +			Executor executor,
    +			MetricFetcher fetcher) {
    --- End diff --
    
    nit: use of rawtype


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181534395
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java ---
    @@ -0,0 +1,257 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job.metrics;
    +
    +/**
    + * An interface for accumulating double values.
    + */
    +interface DoubleAccumulator {
    --- End diff --
    
    ah yes, I copied it at the start but never got around to deleting the old one..


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181408771
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job.metrics;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
    +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
    +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.UnionIterator;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values
    + * for a set of metrics.
    + *
    + * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
    + * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
    + */
    +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedSubtaskMetricsParameters> {
    +
    +	public AggregatingSubtasksMetricsHandler(
    +			CompletableFuture<String> localRestAddress,
    +			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
    +			Time timeout,
    +			Map<String, String> responseHeaders,
    +			Executor executor,
    +			MetricFetcher<?> fetcher) {
    +		super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
    +	}
    +
    +	@Override
    +	Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedSubtaskMetricsParameters> request) {
    +		JobID jobID = request.getPathParameter(JobIDPathParameter.class);
    +		JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class);
    +
    +		Collection<String> subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class);
    +		if (subtaskRanges.isEmpty()) {
    +			return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores();
    --- End diff --
    
    I think there is a potential NPE because `store.getTaskMetricStore(jobID.toString(), taskID.toString())` can return `null`.


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181404370
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java ---
    @@ -0,0 +1,302 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job.metrics;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
    +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
    +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
    +import java.util.concurrent.Executor;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics
    + * or the aggregated values of them across all/selected entities.
    + *
    + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
    + * {@code [ { "id" : "X" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
    + * The handler will then return a list containing the values of the requested metrics.
    + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
    + *
    + * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
    + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned.
    + * {@code /metrics?get=X,Y&agg=min,max}
    + * The handler will then return a list of objects containing the aggregations for the requested metrics.
    + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
    + */
    +public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> {
    +
    +	private final Executor executor;
    +	private final MetricFetcher<?> fetcher;
    +
    +	protected AbstractAggregatingMetricsHandler(
    +			CompletableFuture<String> localRestAddress,
    +			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
    +			Time timeout,
    +			Map<String, String> responseHeaders,
    +			AbstractAggregatedMetricsHeaders<P> messageHeaders,
    +			Executor executor,
    +			MetricFetcher fetcher) {
    +		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
    +		this.executor = executor;
    --- End diff --
    
    `null` checks missing for `executor` and `fetcher`


---

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5805#discussion_r181411022
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job.metrics;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
    +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
    +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
    +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
    +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.UnionIterator;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values
    + * for a set of metrics.
    + *
    + * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
    + * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
    + */
    +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedSubtaskMetricsParameters> {
    +
    +	public AggregatingSubtasksMetricsHandler(
    +			CompletableFuture<String> localRestAddress,
    +			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
    +			Time timeout,
    +			Map<String, String> responseHeaders,
    +			Executor executor,
    +			MetricFetcher<?> fetcher) {
    +		super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
    +	}
    +
    +	@Override
    +	Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedSubtaskMetricsParameters> request) {
    +		JobID jobID = request.getPathParameter(JobIDPathParameter.class);
    +		JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class);
    +
    +		Collection<String> subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class);
    +		if (subtaskRanges.isEmpty()) {
    +			return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores();
    +		} else {
    +			Iterable<Integer> subtasks = getIntegerRangeFromString(subtaskRanges);
    +			Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>(8);
    +			for (int subtask : subtasks) {
    +				subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask));
    +			}
    +			return subtaskStores;
    +		}
    +	}
    +
    +	private Iterable<Integer> getIntegerRangeFromString(Collection<String> ranges) {
    +		UnionIterator<Integer> iterators = new UnionIterator<>();
    +
    +		for (String rawRange : ranges) {
    +			try {
    +				Iterator<Integer> rangeIterator;
    +				String range = rawRange.trim();
    +				int dashIdx = range.indexOf('-');
    +				if (dashIdx == -1) {
    +					// only one value in range:
    +					rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
    +				} else {
    +					// evaluate range
    +					final int start = Integer.valueOf(range.substring(0, dashIdx));
    +					final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
    +					rangeIterator = new Iterator<Integer>() {
    +						int i = start;
    +
    +						@Override
    +						public boolean hasNext() {
    +							return i <= end;
    +						}
    +
    +						@Override
    +						public Integer next() {
    +							if (hasNext()) {
    +								return i++;
    +							} else {
    +								throw new NoSuchElementException();
    +							}
    +						}
    +
    +						@Override
    +						public void remove() {
    +							throw new UnsupportedOperationException("Remove not supported");
    --- End diff --
    
    That's already the default implementation for that interface.


---