You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2017/10/25 10:29:34 UTC

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/4901

     [FLINK-7781][metrics][REST] Support on-demand aggregations 

    ## What is the purpose of the change
    
    This PR adds on-demand aggregation support to the metrics REST API. It allows aggregating
    * taskmanagers metrics across all taskmanagers
    * job metrics across all jobs
    * subtask metrics across all subtasks
    
    The aggregation is handled by new handlers as described in the documentation. Additionally, a subtask metric handler was added to subsume the existing JobVertexMetricsHandler to expose subtask metrics in a more idiomatic way. The JobVertexMetricsHandler was not removed for backwards compatibility (with the webUI in particular):
    
    ## Brief change log
    
    * fix the requests examples in the metric handler javadocs
    * implement aggregation handlers
      * expose all subtask stores in TaskMetricStore for easier access
      * extend MetricStoreTest#setupMetricStore to contain metrics for multiple entities
    * integrate aggregation handlers into WebRuntimeMonitor
    * document the metrics REST API
    
    ## Verifying this change
    
    This change added tests for each Aggregating*MetricsHandler and for the abstract base class (AbstractAggregatingMetricsHandlerTest).
    
    This change can be verified manually by running multiple taskmanagers/jobs/subtasks in the cluster and querying the rest endpoints as described in the documentation.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (docs)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 7781

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4901.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4901
    
----
commit 4a3f51157180241210160fb73508f406c9192bc2
Author: zentol <ch...@apache.org>
Date:   2017-10-25T07:42:14Z

    [hotfix][javadocs] Fix metric handler example javadocs

commit afabf6fb72c2b2bd94fb0410c985e61e940d138e
Author: zentol <ch...@apache.org>
Date:   2017-10-25T09:21:28Z

    [FLINK-7781][metrics][REST] Support on-demand aggregations

commit f768be1ebe0059b30571c43950f0942a7919bb0c
Author: zentol <ch...@apache.org>
Date:   2017-10-25T09:21:46Z

    [FLINK-7912][metrics][docs] Document metrics REST API

----


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147668677
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +import org.apache.flink.runtime.jobmaster.JobManagerGateway;
    +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
    +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
    + * the values for a set of metrics for a specific entity, or aggregate them across several entities.
    + *
    --- End diff --
    
    Caused by a previous iteration where both aggregating/non-aggregating handlers inherited from the same class. Will rewrite it.


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147696972
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +import org.apache.flink.util.UnionIterator;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.concurrent.Executor;
    +
    +import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
    +import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
    +
    +/**
    + * Request handler that returns, aggregated across all subtasks, a list of all available metrics or the values
    + * for a set of metrics.
    + *
    + * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
    + * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
    + */
    +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler {
    +	public AggregatingSubtasksMetricsHandler(Executor executor, MetricFetcher fetcher) {
    +		super(executor, fetcher);
    +	}
    +
    +	@Override
    +	protected Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters) {
    +		String jobID = pathParameters.get(PARAMETER_JOB_ID);
    +		String taskID = pathParameters.get(PARAMETER_VERTEX_ID);
    +		if (jobID == null) {
    +			return Collections.emptyList();
    +		}
    +		if (taskID == null) {
    +			return Collections.emptyList();
    +		}
    +		String subtasksList = queryParameters.get("subtasks");
    +		if (subtasksList == null || subtasksList.isEmpty()) {
    +			return store.getTaskMetricStore(jobID, taskID).getAllSubtaskMetricStores();
    +		} else {
    +			Iterable<Integer> subtasks = getIntegerRangeFromString(subtasksList);
    +			Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>();
    +			for (int subtask : subtasks) {
    +				subtaskStores.add(store.getSubtaskMetricStore(jobID, taskID, subtask));
    +			}
    +			return subtaskStores;
    +		}
    +	}
    +
    +	@Override
    +	public String[] getPaths() {
    +		return new String[]{"/jobs/:jobid/vertices/:vertexid/subtasks/metrics"};
    +	}
    +
    +	private Iterable<Integer> getIntegerRangeFromString(String rangeDefinition) throws NumberFormatException {
    --- End diff --
    
    You can remove the `throws NumberFormatException` as you handle the exception.


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147650987
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +import org.apache.flink.runtime.jobmaster.JobManagerGateway;
    +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
    +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
    + * the values for a set of metrics for a specific entity, or aggregate them across several entities.
    + *
    + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
    + * {@code [ { "id" : "X" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
    + * The handler will then return a list containing the values of the requested metrics.
    + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
    + * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
    + * "sum", "max", "min" and "avg". If the parameter is not specified all aggregations will be returned.
    + * {@code /metrics?get=X,Y&agg=min,max}
    + * The handler will then return an object containing the aggregations for the requested metrics.
    + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
    + */
    +abstract class AbstractAggregatingMetricsHandler extends AbstractJsonRequestHandler {
    +
    +	protected final Logger log = LoggerFactory.getLogger(getClass());
    +
    +	private static final String PARAMETER_AGGREGATION = "agg";
    +
    +	private final MetricFetcher fetcher;
    +
    +	AbstractAggregatingMetricsHandler(Executor executor, MetricFetcher fetcher) {
    +		super(executor);
    +		this.fetcher = Preconditions.checkNotNull(fetcher);
    +	}
    +
    +	protected abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters);
    +
    +	@Override
    +	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
    +		return CompletableFuture.supplyAsync(
    +			() -> {
    +				try {
    +					fetcher.update();
    +					String requestedMetricsList = queryParams.get(AbstractMetricsHandler.PARAMETER_METRICS);
    +					String aggTypeList = queryParams.get(PARAMETER_AGGREGATION);
    +					MetricStore store = fetcher.getMetricStore();
    +
    +					if (requestedMetricsList == null) {
    +						Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
    +						if (stores == null) {
    +							return "[]";
    +						}
    +						Collection<String> list = getAvailableMetrics(stores);
    +						return mapMetricListToJson(list);
    +					}
    +
    +					if (requestedMetricsList.isEmpty()) {
    +						/*
    +						 * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a
    +						 * request for which the "get" parameter is an empty string.
    +						 */
    +						return "[]";
    +					}
    +
    +					String[] requestedMetrics = requestedMetricsList.split(",");
    +
    +					List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories = new ArrayList<>();
    +					// by default we return all aggregations
    +					if (aggTypeList == null || aggTypeList.isEmpty()) {
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
    +					} else {
    +						for (String aggregation : aggTypeList.split(",")) {
    +							switch (aggregation.toLowerCase()) {
    +								case DoubleAccumulator.DoubleMinimum.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
    +									break;
    +								case DoubleAccumulator.DoubleMaximum.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
    +									break;
    +								case DoubleAccumulator.DoubleSum.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
    +									break;
    +								case DoubleAccumulator.DoubleAverage.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
    +									break;
    +								default:
    +									log.warn("Invalid aggregation specified: {}", aggregation.toLowerCase());
    +							}
    +						}
    +					}
    +					Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
    --- End diff --
    
    Shouldn't this check for the `stores` be before populating the `requestedAggregationsFactories`? If the `stores` is empty, then object creation and all the work previously done is in vain.
    Can this check be factored out before even checking the `requestedMetricsList`? So that we avoid code repetition. It seems that always when the `stores==null`, we `return "[]"`.


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147649624
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +import org.apache.flink.runtime.jobmaster.JobManagerGateway;
    +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
    +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
    + * the values for a set of metrics for a specific entity, or aggregate them across several entities.
    + *
    --- End diff --
    
    The above javadoc can be improved. The phrase:
    "Subclasses may either that returns a list of all available metrics or the values for a set of metrics for a specific entity, or aggregate them across several entities."
    
    Seems to be the result of sloppy copy paste.


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4901


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147649786
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +import org.apache.flink.runtime.jobmaster.JobManagerGateway;
    +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
    +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
    + * the values for a set of metrics for a specific entity, or aggregate them across several entities.
    + *
    + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
    + * {@code [ { "id" : "X" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
    + * The handler will then return a list containing the values of the requested metrics.
    + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
    --- End diff --
    
    "get" parameter a comma-separate -> "get" parameter**,** a comma-separate


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147655793
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java ---
    @@ -39,12 +39,15 @@
      * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
      * {@code [ { "id" : "X" } ] }
      *
    - * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
    - * {@code /get?X,Y}
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
      * The handler will then return a list containing the values of the requested metrics.
      * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
      */
     public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
    +
    --- End diff --
    
    Why not putting all these (`PARAMETER_METRICS`, `PARAMETER_AGGREGATION`, `SUBTASK_METRICS_REST_PATH` even `"/jobs/metrics"`, etc) in a `MetricsURLConstants` class (or sth like this) so that there is a central place for all? This could help maintenance.


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147670970
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java ---
    @@ -0,0 +1,70 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +import org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler;
    +
    +import java.util.Map;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Request handler that returns, aggregated across all subtasks of a single tasks, a list of all available metrics or the
    + * values for a set of metrics.
    + *
    + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
    + * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
    + *
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
    + * The handler will then return a list containing the values of the requested metrics.
    + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
    + *
    + * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
    + * "sum", "max", "min" and "avg".
    + */
    +public class SubtaskMetricsHandler extends AbstractMetricsHandler {
    +	private static final String SUBTASK_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/metrics";
    +
    +	public SubtaskMetricsHandler(Executor executor, MetricFetcher fetcher) {
    +		super(executor, fetcher);
    +	}
    +
    +	@Override
    +	public String[] getPaths() {
    +		return new String[]{SUBTASK_METRICS_REST_PATH};
    +	}
    +
    +	@Override
    +	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
    +		String subtaskNumString = pathParams.get(SubtaskExecutionAttemptDetailsHandler.PARAMETER_SUBTASK_INDEX);
    +		int subtaskNum;
    +		try {
    +			subtaskNum = Integer.valueOf(subtaskNumString);
    +		} catch (NumberFormatException nfe) {
    +			return null;
    +		}
    --- End diff --
    
    An unknown subtask index will cause the method to return null (```metrics.getSubtaskMetricStore(...)```), which is the same behavior as all other metric handlers.
    
    We can't provide an accurate message in this case; we only know that this subtask index is unknown to the store, but not why. (does it exceed the parallelism, metrics haven't arrived yet).


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147669836
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java ---
    @@ -39,12 +39,15 @@
      * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
      * {@code [ { "id" : "X" } ] }
      *
    - * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
    - * {@code /get?X,Y}
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
      * The handler will then return a list containing the values of the requested metrics.
      * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
      */
     public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
    +
    --- End diff --
    
    This will be resolved once we port it to the new REST stuff anyway, as they will then reside in the repsective parameter/header classes.


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147651957
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +import org.apache.flink.runtime.jobmaster.JobManagerGateway;
    +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
    +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
    + * the values for a set of metrics for a specific entity, or aggregate them across several entities.
    + *
    + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
    + * {@code [ { "id" : "X" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
    + * The handler will then return a list containing the values of the requested metrics.
    + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
    + * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
    + * "sum", "max", "min" and "avg". If the parameter is not specified all aggregations will be returned.
    + * {@code /metrics?get=X,Y&agg=min,max}
    + * The handler will then return an object containing the aggregations for the requested metrics.
    + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
    + */
    +abstract class AbstractAggregatingMetricsHandler extends AbstractJsonRequestHandler {
    +
    +	protected final Logger log = LoggerFactory.getLogger(getClass());
    +
    +	private static final String PARAMETER_AGGREGATION = "agg";
    +
    +	private final MetricFetcher fetcher;
    +
    +	AbstractAggregatingMetricsHandler(Executor executor, MetricFetcher fetcher) {
    +		super(executor);
    +		this.fetcher = Preconditions.checkNotNull(fetcher);
    +	}
    +
    +	protected abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters);
    +
    +	@Override
    +	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
    +		return CompletableFuture.supplyAsync(
    +			() -> {
    +				try {
    +					fetcher.update();
    +					String requestedMetricsList = queryParams.get(AbstractMetricsHandler.PARAMETER_METRICS);
    +					String aggTypeList = queryParams.get(PARAMETER_AGGREGATION);
    +					MetricStore store = fetcher.getMetricStore();
    +
    +					if (requestedMetricsList == null) {
    +						Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
    +						if (stores == null) {
    +							return "[]";
    +						}
    +						Collection<String> list = getAvailableMetrics(stores);
    +						return mapMetricListToJson(list);
    +					}
    +
    +					if (requestedMetricsList.isEmpty()) {
    +						/*
    +						 * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a
    +						 * request for which the "get" parameter is an empty string.
    +						 */
    +						return "[]";
    +					}
    +
    +					String[] requestedMetrics = requestedMetricsList.split(",");
    +
    +					List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories = new ArrayList<>();
    +					// by default we return all aggregations
    +					if (aggTypeList == null || aggTypeList.isEmpty()) {
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
    +					} else {
    +						for (String aggregation : aggTypeList.split(",")) {
    +							switch (aggregation.toLowerCase()) {
    +								case DoubleAccumulator.DoubleMinimum.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
    +									break;
    +								case DoubleAccumulator.DoubleMaximum.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
    +									break;
    +								case DoubleAccumulator.DoubleSum.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
    +									break;
    +								case DoubleAccumulator.DoubleAverage.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
    +									break;
    +								default:
    +									log.warn("Invalid aggregation specified: {}", aggregation.toLowerCase());
    +							}
    +						}
    +					}
    +					Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
    +					if (stores == null){
    +						return "[]";
    +					}
    +					return getAggregatedMetricValues(stores, requestedMetrics, requestedAggregationsFactories);
    +				} catch (Exception e) {
    +					throw new CompletionException(new FlinkException("Could not retrieve metrics.", e));
    +				}
    +			},
    +			executor);
    +	}
    +
    +	/**
    +	 * Returns a JSON string containing a list of all available metrics in the given stores. Effectively this method maps
    +	 * the union of all key-sets to JSON.
    +	 *
    +	 * @param stores metrics
    +	 * @return JSON string containing a list of all available metrics
    +	 */
    +	private static Collection<String> getAvailableMetrics(Collection<? extends MetricStore.ComponentMetricStore> stores) {
    +		Set<String> uniqueMetrics = new HashSet<>();
    +		for (MetricStore.ComponentMetricStore store : stores) {
    +			uniqueMetrics.addAll(store.metrics.keySet());
    +		}
    +		return uniqueMetrics;
    +	}
    +
    +	private static String mapMetricListToJson(Collection<String> metrics) throws IOException {
    +		StringWriter writer = new StringWriter();
    +		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
    +
    +		gen.writeStartArray();
    +		for (String m : metrics) {
    +			gen.writeStartObject();
    +			gen.writeStringField("id", m);
    +			gen.writeEndObject();
    +		}
    +		gen.writeEndArray();
    +
    +		gen.close();
    +		return writer.toString();
    +	}
    +
    +	/**
    +	 * Extracts and aggregates all requested metrics from the given metric stores, and maps the result to a JSON string.
    +	 *
    +	 * @param stores available metrics
    +	 * @param requestedMetrics ids of requested metrics
    +	 * @param requestedAggregationsFactories requested aggregations
    +	 * @return JSON string containing the requested metrics
    +	 * @throws IOException
    +	 */
    +	private String getAggregatedMetricValues(Collection<? extends MetricStore.ComponentMetricStore> stores, String[] requestedMetrics, List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories) throws IOException {
    +		StringWriter writer = new StringWriter();
    +		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
    +
    +		gen.writeStartArray();
    +		for (String requestedMetric : requestedMetrics) {
    +			final Collection<Double> values = new ArrayList<>();
    +			try {
    +				for (MetricStore.ComponentMetricStore store : stores) {
    +					String stringValue = store.metrics.get(requestedMetric);
    +					if (stringValue != null) {
    +						values.add(Double.valueOf(stringValue));
    +					}
    +				}
    +			} catch (NumberFormatException nfe) {
    +				// metric is not numeric so we can't perform aggregations => ignore it
    +				continue;
    --- End diff --
    
    Not sure when this can happen, but shouldn't we log sth when we have this exception?
    As in the `AggregatingSubtasksMetricsHandler.getStores()` in the `catch` part.


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147670055
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +/**
    + * An interface for accumulating double values.
    + */
    +interface DoubleAccumulator {
    +
    +	/**
    +	 * Adds the given value to this accumulator.
    +	 *
    +	 * @param value value to add
    +	 */
    +	void add(double value);
    +
    +	/**
    +	 * Returns the current value of this accumulator.
    +	 *
    +	 * @return current value of this accumulator
    +	 */
    +	double getValue();
    +
    +	/**
    +	 * Returns the name of this accumulator type. This name is used as a suffix for exposed metrics.
    +	 *
    +	 * @return name of this accumulator type
    +	 */
    +	String getName();
    +
    +	/**
    +	 * A factory for {@link DoubleAccumulator}s. This allows us to regenerate a new set of accumulators for each metrics
    +	 * without re-evaluating the "agg" query parameter or re-using existing accumulators.
    +	 *
    +	 * @param <A> DoubleAccumulator subclass
    +	 */
    +	interface DoubleAccumulatorFactory<A extends DoubleAccumulator> {
    +		/**
    +		 * Creates a new accumulator with the given initial value.
    +		 *
    +		 * @param init initial value
    +		 * @return new accumulator with the given initial value
    +		 */
    +		A get(double init);
    +	}
    +
    +	/**
    +	 * Factory for {@link DoubleMaximum}.
    +	 */
    +	final class DoubleMaximumFactory implements DoubleAccumulatorFactory<DoubleMaximum> {
    +		private static final DoubleMaximumFactory INSTANCE = new DoubleMaximumFactory();
    +
    +		private DoubleMaximumFactory(){
    +		}
    +
    +		@Override
    +		public DoubleMaximum get(double init) {
    +			return new DoubleMaximum(init);
    +		}
    +
    +		public static DoubleMaximumFactory get() {
    +			return INSTANCE;
    +		}
    +	}
    +
    +	/**
    +	 * Factory for {@link DoubleMinimum}.
    +	 */
    +	final class DoubleMinimumFactory implements DoubleAccumulatorFactory<DoubleMinimum> {
    +		private static final DoubleMinimumFactory INSTANCE = new DoubleMinimumFactory();
    +
    +		private DoubleMinimumFactory(){
    +		}
    +
    +		@Override
    +		public DoubleMinimum get(double init) {
    +			return new DoubleMinimum(init);
    +		}
    +
    +		public static DoubleMinimumFactory get() {
    +			return INSTANCE;
    +		}
    +	}
    +
    +	/**
    +	 * Factory for {@link DoubleSum}.
    +	 */
    +	final class DoubleSumFactory implements DoubleAccumulatorFactory<DoubleSum> {
    +		private static final DoubleSumFactory INSTANCE = new DoubleSumFactory();
    +
    +		private DoubleSumFactory(){
    +		}
    +
    +		@Override
    +		public DoubleSum get(double init) {
    +			return new DoubleSum(init);
    +		}
    +
    +		public static DoubleSumFactory get() {
    +			return INSTANCE;
    +		}
    +	}
    +
    +	/**
    +	 * Factory for {@link DoubleAverage}.
    +	 */
    +	final class DoubleAverageFactory implements DoubleAccumulatorFactory<DoubleAverage> {
    +		private static final DoubleAverageFactory INSTANCE = new DoubleAverageFactory();
    +
    +		private DoubleAverageFactory(){
    +		}
    +
    +		@Override
    +		public DoubleAverage get(double init) {
    +			return new DoubleAverage(init);
    +		}
    +
    +		public static DoubleAverageFactory get() {
    +			return INSTANCE;
    +		}
    +	}
    +
    +	/**
    +	 * {@link DoubleAccumulator} that returns the maximum value.
    +	 */
    +	final class DoubleMaximum implements DoubleAccumulator {
    +
    +		public static final String NAME = "max";
    +
    +		private double value;
    +
    +		private DoubleMaximum(double init) {
    +			value = init;
    +		}
    +
    +		@Override
    +		public void add(double value) {
    +			this.value = Math.max(this.value, value);
    +		}
    +
    +		@Override
    +		public double getValue() {
    +			return value;
    +		}
    +
    +		@Override
    +		public String getName() {
    +			return NAME;
    +		}
    +	}
    +
    +	/**
    +	 * {@link DoubleAccumulator} that returns the minimum value.
    +	 */
    +	final class DoubleMinimum implements DoubleAccumulator {
    +
    +		public static final String NAME = "min";
    +
    +		private double value;
    +
    +		private DoubleMinimum(double init) {
    +			value = init;
    +		}
    +
    +		@Override
    +		public void add(double value) {
    +			this.value = Math.min(this.value, value);
    +		}
    +
    +		@Override
    +		public double getValue() {
    +			return value;
    +		}
    +
    +		@Override
    +		public String getName() {
    +			return NAME;
    +		}
    +	}
    +
    +	/**
    +	 * {@link DoubleAccumulator} that returns the sum of all values.
    +	 */
    +	final class DoubleSum implements DoubleAccumulator {
    +
    +		public static final String NAME = "sum";
    +
    +		private double value;
    +
    +		private DoubleSum(double init) {
    +			value = init;
    +		}
    +
    +		@Override
    +		public void add(double value) {
    +			this.value += value;
    +		}
    +
    +		@Override
    +		public double getValue() {
    +			return value;
    +		}
    +
    +		@Override
    +		public String getName() {
    +			return NAME;
    +		}
    +	}
    +
    +	/**
    +	 * {@link DoubleAccumulator} that returns the average over all values.
    +	 */
    +	final class DoubleAverage implements DoubleAccumulator {
    +
    +		public static final String NAME = "avg";
    +
    +		private double sum;
    +		private int count;
    +
    +		private DoubleAverage(double init) {
    +			sum = init;
    +			count = 1;
    +		}
    +
    +		@Override
    +		public void add(double value) {
    +			this.sum += value;
    +			this.count++;
    +		}
    +
    +		@Override
    +		public double getValue() {
    +			if (count == 0) {
    --- End diff --
    
    yes, relic from a previous version where no initial value was set.


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147651247
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +import org.apache.flink.runtime.jobmaster.JobManagerGateway;
    +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
    +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
    + * the values for a set of metrics for a specific entity, or aggregate them across several entities.
    + *
    + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
    + * {@code [ { "id" : "X" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
    + * The handler will then return a list containing the values of the requested metrics.
    + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
    + * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
    + * "sum", "max", "min" and "avg". If the parameter is not specified all aggregations will be returned.
    + * {@code /metrics?get=X,Y&agg=min,max}
    + * The handler will then return an object containing the aggregations for the requested metrics.
    + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
    + */
    +abstract class AbstractAggregatingMetricsHandler extends AbstractJsonRequestHandler {
    +
    +	protected final Logger log = LoggerFactory.getLogger(getClass());
    +
    +	private static final String PARAMETER_AGGREGATION = "agg";
    +
    +	private final MetricFetcher fetcher;
    +
    +	AbstractAggregatingMetricsHandler(Executor executor, MetricFetcher fetcher) {
    +		super(executor);
    +		this.fetcher = Preconditions.checkNotNull(fetcher);
    +	}
    +
    +	protected abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters);
    +
    +	@Override
    +	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
    +		return CompletableFuture.supplyAsync(
    +			() -> {
    +				try {
    +					fetcher.update();
    +					String requestedMetricsList = queryParams.get(AbstractMetricsHandler.PARAMETER_METRICS);
    +					String aggTypeList = queryParams.get(PARAMETER_AGGREGATION);
    +					MetricStore store = fetcher.getMetricStore();
    +
    +					if (requestedMetricsList == null) {
    +						Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
    +						if (stores == null) {
    +							return "[]";
    +						}
    +						Collection<String> list = getAvailableMetrics(stores);
    +						return mapMetricListToJson(list);
    +					}
    +
    +					if (requestedMetricsList.isEmpty()) {
    +						/*
    +						 * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a
    +						 * request for which the "get" parameter is an empty string.
    +						 */
    +						return "[]";
    +					}
    +
    +					String[] requestedMetrics = requestedMetricsList.split(",");
    +
    +					List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories = new ArrayList<>();
    +					// by default we return all aggregations
    +					if (aggTypeList == null || aggTypeList.isEmpty()) {
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
    +					} else {
    +						for (String aggregation : aggTypeList.split(",")) {
    +							switch (aggregation.toLowerCase()) {
    +								case DoubleAccumulator.DoubleMinimum.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
    +									break;
    +								case DoubleAccumulator.DoubleMaximum.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
    +									break;
    +								case DoubleAccumulator.DoubleSum.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
    +									break;
    +								case DoubleAccumulator.DoubleAverage.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
    +									break;
    +								default:
    +									log.warn("Invalid aggregation specified: {}", aggregation.toLowerCase());
    +							}
    +						}
    +					}
    +					Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
    +					if (stores == null){
    +						return "[]";
    +					}
    +					return getAggregatedMetricValues(stores, requestedMetrics, requestedAggregationsFactories);
    +				} catch (Exception e) {
    +					throw new CompletionException(new FlinkException("Could not retrieve metrics.", e));
    +				}
    +			},
    +			executor);
    +	}
    +
    +	/**
    +	 * Returns a JSON string containing a list of all available metrics in the given stores. Effectively this method maps
    +	 * the union of all key-sets to JSON.
    +	 *
    +	 * @param stores metrics
    +	 * @return JSON string containing a list of all available metrics
    +	 */
    +	private static Collection<String> getAvailableMetrics(Collection<? extends MetricStore.ComponentMetricStore> stores) {
    +		Set<String> uniqueMetrics = new HashSet<>();
    +		for (MetricStore.ComponentMetricStore store : stores) {
    +			uniqueMetrics.addAll(store.metrics.keySet());
    +		}
    +		return uniqueMetrics;
    +	}
    +
    +	private static String mapMetricListToJson(Collection<String> metrics) throws IOException {
    +		StringWriter writer = new StringWriter();
    +		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
    +
    +		gen.writeStartArray();
    +		for (String m : metrics) {
    +			gen.writeStartObject();
    +			gen.writeStringField("id", m);
    +			gen.writeEndObject();
    +		}
    +		gen.writeEndArray();
    +
    +		gen.close();
    +		return writer.toString();
    +	}
    +
    +	/**
    +	 * Extracts and aggregates all requested metrics from the given metric stores, and maps the result to a JSON string.
    +	 *
    +	 * @param stores available metrics
    +	 * @param requestedMetrics ids of requested metrics
    +	 * @param requestedAggregationsFactories requested aggregations
    +	 * @return JSON string containing the requested metrics
    +	 * @throws IOException
    +	 */
    +	private String getAggregatedMetricValues(Collection<? extends MetricStore.ComponentMetricStore> stores, String[] requestedMetrics, List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories) throws IOException {
    --- End diff --
    
    This is a personal style thing but I would break the line of the arguments into multiple lines, with one argument each.


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147649898
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +import org.apache.flink.runtime.jobmaster.JobManagerGateway;
    +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
    +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
    + * the values for a set of metrics for a specific entity, or aggregate them across several entities.
    + *
    + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
    + * {@code [ { "id" : "X" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
    + * The handler will then return a list containing the values of the requested metrics.
    + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
    + * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
    + * "sum", "max", "min" and "avg". If the parameter is not specified all aggregations will be returned.
    --- End diff --
    
    If the parameter is not specified all aggregations will be returned -> If the parameter is not specified**,** all aggregations will be returned


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147653983
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java ---
    @@ -0,0 +1,70 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.concurrent.Executor;
    +
    +import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
    +import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
    +
    +/**
    + * Request handler that returns, aggregated across all subtasks, a list of all available metrics or the values
    + * for a set of metrics.
    + */
    +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler {
    +	public AggregatingSubtasksMetricsHandler(Executor executor, MetricFetcher fetcher) {
    +		super(executor, fetcher);
    +	}
    +
    +	@Override
    +	protected Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters) {
    +		String jobID = pathParameters.get(PARAMETER_JOB_ID);
    +		String taskID = pathParameters.get(PARAMETER_VERTEX_ID);
    --- End diff --
    
    What happens here if `jobID == null`? 


---

[GitHub] flink issue #4901: [FLINK-7781][metrics][REST] Support on-demand aggregation...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4901
  
    @kl0u I've addressed your comments.


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147657820
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +/**
    + * An interface for accumulating double values.
    + */
    +interface DoubleAccumulator {
    +
    +	/**
    +	 * Adds the given value to this accumulator.
    +	 *
    +	 * @param value value to add
    +	 */
    +	void add(double value);
    +
    +	/**
    +	 * Returns the current value of this accumulator.
    +	 *
    +	 * @return current value of this accumulator
    +	 */
    +	double getValue();
    +
    +	/**
    +	 * Returns the name of this accumulator type. This name is used as a suffix for exposed metrics.
    +	 *
    +	 * @return name of this accumulator type
    +	 */
    +	String getName();
    +
    +	/**
    +	 * A factory for {@link DoubleAccumulator}s. This allows us to regenerate a new set of accumulators for each metrics
    +	 * without re-evaluating the "agg" query parameter or re-using existing accumulators.
    +	 *
    +	 * @param <A> DoubleAccumulator subclass
    +	 */
    +	interface DoubleAccumulatorFactory<A extends DoubleAccumulator> {
    +		/**
    +		 * Creates a new accumulator with the given initial value.
    +		 *
    +		 * @param init initial value
    +		 * @return new accumulator with the given initial value
    +		 */
    +		A get(double init);
    +	}
    +
    +	/**
    +	 * Factory for {@link DoubleMaximum}.
    +	 */
    +	final class DoubleMaximumFactory implements DoubleAccumulatorFactory<DoubleMaximum> {
    +		private static final DoubleMaximumFactory INSTANCE = new DoubleMaximumFactory();
    +
    +		private DoubleMaximumFactory(){
    +		}
    +
    +		@Override
    +		public DoubleMaximum get(double init) {
    +			return new DoubleMaximum(init);
    +		}
    +
    +		public static DoubleMaximumFactory get() {
    +			return INSTANCE;
    +		}
    +	}
    +
    +	/**
    +	 * Factory for {@link DoubleMinimum}.
    +	 */
    +	final class DoubleMinimumFactory implements DoubleAccumulatorFactory<DoubleMinimum> {
    +		private static final DoubleMinimumFactory INSTANCE = new DoubleMinimumFactory();
    +
    +		private DoubleMinimumFactory(){
    +		}
    +
    +		@Override
    +		public DoubleMinimum get(double init) {
    +			return new DoubleMinimum(init);
    +		}
    +
    +		public static DoubleMinimumFactory get() {
    +			return INSTANCE;
    +		}
    +	}
    +
    +	/**
    +	 * Factory for {@link DoubleSum}.
    +	 */
    +	final class DoubleSumFactory implements DoubleAccumulatorFactory<DoubleSum> {
    +		private static final DoubleSumFactory INSTANCE = new DoubleSumFactory();
    +
    +		private DoubleSumFactory(){
    +		}
    +
    +		@Override
    +		public DoubleSum get(double init) {
    +			return new DoubleSum(init);
    +		}
    +
    +		public static DoubleSumFactory get() {
    +			return INSTANCE;
    +		}
    +	}
    +
    +	/**
    +	 * Factory for {@link DoubleAverage}.
    +	 */
    +	final class DoubleAverageFactory implements DoubleAccumulatorFactory<DoubleAverage> {
    +		private static final DoubleAverageFactory INSTANCE = new DoubleAverageFactory();
    +
    +		private DoubleAverageFactory(){
    +		}
    +
    +		@Override
    +		public DoubleAverage get(double init) {
    +			return new DoubleAverage(init);
    +		}
    +
    +		public static DoubleAverageFactory get() {
    +			return INSTANCE;
    +		}
    +	}
    +
    +	/**
    +	 * {@link DoubleAccumulator} that returns the maximum value.
    +	 */
    +	final class DoubleMaximum implements DoubleAccumulator {
    +
    +		public static final String NAME = "max";
    +
    +		private double value;
    +
    +		private DoubleMaximum(double init) {
    +			value = init;
    +		}
    +
    +		@Override
    +		public void add(double value) {
    +			this.value = Math.max(this.value, value);
    +		}
    +
    +		@Override
    +		public double getValue() {
    +			return value;
    +		}
    +
    +		@Override
    +		public String getName() {
    +			return NAME;
    +		}
    +	}
    +
    +	/**
    +	 * {@link DoubleAccumulator} that returns the minimum value.
    +	 */
    +	final class DoubleMinimum implements DoubleAccumulator {
    +
    +		public static final String NAME = "min";
    +
    +		private double value;
    +
    +		private DoubleMinimum(double init) {
    +			value = init;
    +		}
    +
    +		@Override
    +		public void add(double value) {
    +			this.value = Math.min(this.value, value);
    +		}
    +
    +		@Override
    +		public double getValue() {
    +			return value;
    +		}
    +
    +		@Override
    +		public String getName() {
    +			return NAME;
    +		}
    +	}
    +
    +	/**
    +	 * {@link DoubleAccumulator} that returns the sum of all values.
    +	 */
    +	final class DoubleSum implements DoubleAccumulator {
    +
    +		public static final String NAME = "sum";
    +
    +		private double value;
    +
    +		private DoubleSum(double init) {
    +			value = init;
    +		}
    +
    +		@Override
    +		public void add(double value) {
    +			this.value += value;
    +		}
    +
    +		@Override
    +		public double getValue() {
    +			return value;
    +		}
    +
    +		@Override
    +		public String getName() {
    +			return NAME;
    +		}
    +	}
    +
    +	/**
    +	 * {@link DoubleAccumulator} that returns the average over all values.
    +	 */
    +	final class DoubleAverage implements DoubleAccumulator {
    +
    +		public static final String NAME = "avg";
    +
    +		private double sum;
    +		private int count;
    +
    +		private DoubleAverage(double init) {
    +			sum = init;
    +			count = 1;
    +		}
    +
    +		@Override
    +		public void add(double value) {
    +			this.sum += value;
    +			this.count++;
    +		}
    +
    +		@Override
    +		public double getValue() {
    +			if (count == 0) {
    --- End diff --
    
    `count` cannot be `0` right? Because we set it to `1` in the constructor.


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147697678
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AggregatingSubtasksMetricsHandler.java ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +import org.apache.flink.util.UnionIterator;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.concurrent.Executor;
    +
    +import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
    +import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
    +
    +/**
    + * Request handler that returns, aggregated across all subtasks, a list of all available metrics or the values
    + * for a set of metrics.
    + *
    + * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
    + * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
    + */
    +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler {
    +	public AggregatingSubtasksMetricsHandler(Executor executor, MetricFetcher fetcher) {
    +		super(executor, fetcher);
    +	}
    +
    +	@Override
    +	protected Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters) {
    +		String jobID = pathParameters.get(PARAMETER_JOB_ID);
    +		String taskID = pathParameters.get(PARAMETER_VERTEX_ID);
    +		if (jobID == null) {
    +			return Collections.emptyList();
    +		}
    +		if (taskID == null) {
    +			return Collections.emptyList();
    +		}
    +		String subtasksList = queryParameters.get("subtasks");
    +		if (subtasksList == null || subtasksList.isEmpty()) {
    +			return store.getTaskMetricStore(jobID, taskID).getAllSubtaskMetricStores();
    +		} else {
    +			Iterable<Integer> subtasks = getIntegerRangeFromString(subtasksList);
    +			Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>();
    +			for (int subtask : subtasks) {
    +				subtaskStores.add(store.getSubtaskMetricStore(jobID, taskID, subtask));
    +			}
    +			return subtaskStores;
    +		}
    +	}
    +
    +	@Override
    +	public String[] getPaths() {
    +		return new String[]{"/jobs/:jobid/vertices/:vertexid/subtasks/metrics"};
    +	}
    +
    +	private Iterable<Integer> getIntegerRangeFromString(String rangeDefinition) throws NumberFormatException {
    +		final String[] ranges = rangeDefinition.trim().split(",");
    +
    +		UnionIterator<Integer> iterators = new UnionIterator<>();
    +
    +		for (String rawRange : ranges) {
    +			try {
    +				Iterator<Integer> rangeIterator;
    +				String range = rawRange.trim();
    +				int dashIdx = range.indexOf('-');
    +				if (dashIdx == -1) {
    +					// only one value in range:
    +					rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
    +				} else {
    +					// evaluate range
    +					final int start = Integer.valueOf(range.substring(0, dashIdx));
    +					final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
    +					rangeIterator = new Iterator<Integer>() {
    +						int i = start;
    +
    +						@Override
    +						public boolean hasNext() {
    +							return i <= end;
    +						}
    +
    +						@Override
    +						public Integer next() {
    +							return i++;
    --- End diff --
    
    Here you should throw a `NoSuchElementException` if the `next()` is called for an index outside the size of the list (it is part of the iterator contract).


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147657052
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java ---
    @@ -0,0 +1,70 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +import org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler;
    +
    +import java.util.Map;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Request handler that returns, aggregated across all subtasks of a single tasks, a list of all available metrics or the
    + * values for a set of metrics.
    + *
    + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
    + * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
    + *
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
    + * The handler will then return a list containing the values of the requested metrics.
    + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
    + *
    + * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
    + * "sum", "max", "min" and "avg".
    + */
    +public class SubtaskMetricsHandler extends AbstractMetricsHandler {
    +	private static final String SUBTASK_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/metrics";
    +
    +	public SubtaskMetricsHandler(Executor executor, MetricFetcher fetcher) {
    +		super(executor, fetcher);
    +	}
    +
    +	@Override
    +	public String[] getPaths() {
    +		return new String[]{SUBTASK_METRICS_REST_PATH};
    +	}
    +
    +	@Override
    +	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
    +		String subtaskNumString = pathParams.get(SubtaskExecutionAttemptDetailsHandler.PARAMETER_SUBTASK_INDEX);
    +		int subtaskNum;
    +		try {
    +			subtaskNum = Integer.valueOf(subtaskNumString);
    +		} catch (NumberFormatException nfe) {
    +			return null;
    +		}
    --- End diff --
    
    Can this also be an invalid subtask number? E.g. we have 5 of them and we request subtaskid 11? In this case there should be a check that logs a warning and potentially sets he `subtaskNum` to `null` so that we go down the same path as a non-integer value.


---

[GitHub] flink pull request #4901: [FLINK-7781][metrics][REST] Support on-demand aggr...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4901#discussion_r147669457
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractAggregatingMetricsHandler.java ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.legacy.metrics;
    +
    +import org.apache.flink.runtime.jobmaster.JobManagerGateway;
    +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
    +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
    +import java.util.concurrent.Executor;
    +
    +/**
    + * Abstract request handler for querying metrics. Subclasses may either that returns a list of all available metrics or
    + * the values for a set of metrics for a specific entity, or aggregate them across several entities.
    + *
    + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
    + * {@code [ { "id" : "X" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
    + * {@code /metrics?get=X,Y}
    + * The handler will then return a list containing the values of the requested metrics.
    + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
    + *
    + * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
    + * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
    + * "sum", "max", "min" and "avg". If the parameter is not specified all aggregations will be returned.
    + * {@code /metrics?get=X,Y&agg=min,max}
    + * The handler will then return an object containing the aggregations for the requested metrics.
    + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
    + */
    +abstract class AbstractAggregatingMetricsHandler extends AbstractJsonRequestHandler {
    +
    +	protected final Logger log = LoggerFactory.getLogger(getClass());
    +
    +	private static final String PARAMETER_AGGREGATION = "agg";
    +
    +	private final MetricFetcher fetcher;
    +
    +	AbstractAggregatingMetricsHandler(Executor executor, MetricFetcher fetcher) {
    +		super(executor);
    +		this.fetcher = Preconditions.checkNotNull(fetcher);
    +	}
    +
    +	protected abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, Map<String, String> pathParameters, Map<String, String> queryParameters);
    +
    +	@Override
    +	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
    +		return CompletableFuture.supplyAsync(
    +			() -> {
    +				try {
    +					fetcher.update();
    +					String requestedMetricsList = queryParams.get(AbstractMetricsHandler.PARAMETER_METRICS);
    +					String aggTypeList = queryParams.get(PARAMETER_AGGREGATION);
    +					MetricStore store = fetcher.getMetricStore();
    +
    +					if (requestedMetricsList == null) {
    +						Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
    +						if (stores == null) {
    +							return "[]";
    +						}
    +						Collection<String> list = getAvailableMetrics(stores);
    +						return mapMetricListToJson(list);
    +					}
    +
    +					if (requestedMetricsList.isEmpty()) {
    +						/*
    +						 * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a
    +						 * request for which the "get" parameter is an empty string.
    +						 */
    +						return "[]";
    +					}
    +
    +					String[] requestedMetrics = requestedMetricsList.split(",");
    +
    +					List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories = new ArrayList<>();
    +					// by default we return all aggregations
    +					if (aggTypeList == null || aggTypeList.isEmpty()) {
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
    +						requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
    +					} else {
    +						for (String aggregation : aggTypeList.split(",")) {
    +							switch (aggregation.toLowerCase()) {
    +								case DoubleAccumulator.DoubleMinimum.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleMinimumFactory.get());
    +									break;
    +								case DoubleAccumulator.DoubleMaximum.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleMaximumFactory.get());
    +									break;
    +								case DoubleAccumulator.DoubleSum.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleSumFactory.get());
    +									break;
    +								case DoubleAccumulator.DoubleAverage.NAME:
    +									requestedAggregationsFactories.add(DoubleAccumulator.DoubleAverageFactory.get());
    +									break;
    +								default:
    +									log.warn("Invalid aggregation specified: {}", aggregation.toLowerCase());
    +							}
    +						}
    +					}
    +					Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, pathParams, queryParams);
    +					if (stores == null){
    +						return "[]";
    +					}
    +					return getAggregatedMetricValues(stores, requestedMetrics, requestedAggregationsFactories);
    +				} catch (Exception e) {
    +					throw new CompletionException(new FlinkException("Could not retrieve metrics.", e));
    +				}
    +			},
    +			executor);
    +	}
    +
    +	/**
    +	 * Returns a JSON string containing a list of all available metrics in the given stores. Effectively this method maps
    +	 * the union of all key-sets to JSON.
    +	 *
    +	 * @param stores metrics
    +	 * @return JSON string containing a list of all available metrics
    +	 */
    +	private static Collection<String> getAvailableMetrics(Collection<? extends MetricStore.ComponentMetricStore> stores) {
    +		Set<String> uniqueMetrics = new HashSet<>();
    +		for (MetricStore.ComponentMetricStore store : stores) {
    +			uniqueMetrics.addAll(store.metrics.keySet());
    +		}
    +		return uniqueMetrics;
    +	}
    +
    +	private static String mapMetricListToJson(Collection<String> metrics) throws IOException {
    +		StringWriter writer = new StringWriter();
    +		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
    +
    +		gen.writeStartArray();
    +		for (String m : metrics) {
    +			gen.writeStartObject();
    +			gen.writeStringField("id", m);
    +			gen.writeEndObject();
    +		}
    +		gen.writeEndArray();
    +
    +		gen.close();
    +		return writer.toString();
    +	}
    +
    +	/**
    +	 * Extracts and aggregates all requested metrics from the given metric stores, and maps the result to a JSON string.
    +	 *
    +	 * @param stores available metrics
    +	 * @param requestedMetrics ids of requested metrics
    +	 * @param requestedAggregationsFactories requested aggregations
    +	 * @return JSON string containing the requested metrics
    +	 * @throws IOException
    +	 */
    +	private String getAggregatedMetricValues(Collection<? extends MetricStore.ComponentMetricStore> stores, String[] requestedMetrics, List<DoubleAccumulator.DoubleAccumulatorFactory<?>> requestedAggregationsFactories) throws IOException {
    +		StringWriter writer = new StringWriter();
    +		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
    +
    +		gen.writeStartArray();
    +		for (String requestedMetric : requestedMetrics) {
    +			final Collection<Double> values = new ArrayList<>();
    +			try {
    +				for (MetricStore.ComponentMetricStore store : stores) {
    +					String stringValue = store.metrics.get(requestedMetric);
    +					if (stringValue != null) {
    +						values.add(Double.valueOf(stringValue));
    +					}
    +				}
    +			} catch (NumberFormatException nfe) {
    +				// metric is not numeric so we can't perform aggregations => ignore it
    +				continue;
    --- End diff --
    
    this can happen for non-numeric metrics, for example the path to the last externalized checkpoint. Ill add a warn message.


---