You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/16 19:18:49 UTC
[08/11] flink git commit: [FLINK-8370][REST] Port
AggregatingMetricsHandler to flip6
http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
new file mode 100644
index 0000000..4453ee2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Test base for handlers that extend {@link AbstractAggregatingMetricsHandler}.
+ */
+public abstract class AggregatingMetricsHandlerTestBase<
+ H extends AbstractAggregatingMetricsHandler<P>,
+ P extends AbstractAggregatedMetricsParameters<?>>
+ extends TestLogger {
+
+ private static final CompletableFuture<String> TEST_REST_ADDRESS;
+ private static final DispatcherGateway MOCK_DISPATCHER_GATEWAY;
+ private static final GatewayRetriever<DispatcherGateway> LEADER_RETRIEVER;
+ private static final Time TIMEOUT = Time.milliseconds(50);
+ private static final Map<String, String> TEST_HEADERS = Collections.emptyMap();
+ private static final Executor EXECUTOR = TestingUtils.defaultExecutor();
+
+ static {
+ TEST_REST_ADDRESS = CompletableFuture.completedFuture("localhost:12345");
+
+ MOCK_DISPATCHER_GATEWAY = mock(DispatcherGateway.class);
+
+ LEADER_RETRIEVER = new GatewayRetriever<DispatcherGateway>() {
+ @Override
+ public CompletableFuture<DispatcherGateway> getFuture() {
+ return CompletableFuture.completedFuture(MOCK_DISPATCHER_GATEWAY);
+ }
+ };
+ }
+
+ private H handler;
+ private MetricStore store;
+ private Map<String, String> pathParameters;
+
+ @Before
+ public void setUp() throws Exception {
+ MetricFetcher<RestfulGateway> fetcher = new MetricFetcher<RestfulGateway>(
+ mock(GatewayRetriever.class),
+ mock(MetricQueryServiceRetriever.class),
+ Executors.directExecutor(),
+ TestingUtils.TIMEOUT());
+ store = fetcher.getMetricStore();
+
+ Collection<MetricDump> metricDumps = getMetricDumps();
+ for (MetricDump dump : metricDumps) {
+ store.add(dump);
+ }
+
+ handler = getHandler(
+ TEST_REST_ADDRESS,
+ LEADER_RETRIEVER,
+ TIMEOUT,
+ TEST_HEADERS,
+ EXECUTOR,
+ fetcher);
+ pathParameters = getPathParameters();
+ }
+
+ protected Map<String, String> getPathParameters() {
+ return Collections.emptyMap();
+ }
+
+ protected abstract Tuple2<String, List<String>> getFilter();
+
+ protected abstract Collection<MetricDump> getMetricDumps();
+
+ protected abstract H getHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ Executor executor,
+ MetricFetcher<?> fetcher
+ );
+
+ @Test
+ public void getStores() throws Exception {
+ { // test without filter
+ HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ handler.getMessageHeaders().getUnresolvedMessageParameters(),
+ pathParameters,
+ Collections.emptyMap()
+ );
+ Collection<? extends MetricStore.ComponentMetricStore> subStores = handler.getStores(store, request);
+
+ assertEquals(3, subStores.size());
+
+ List<String> sortedMetrics1 = subStores.stream()
+ .map(subStore -> subStore.getMetric("abc.metric1"))
+ .filter(Objects::nonNull)
+ .sorted()
+ .collect(Collectors.toList());
+
+ assertEquals(2, sortedMetrics1.size());
+
+ assertEquals("1", sortedMetrics1.get(0));
+ assertEquals("3", sortedMetrics1.get(1));
+
+ List<String> sortedMetrics2 = subStores.stream()
+ .map(subStore -> subStore.getMetric("abc.metric2"))
+ .filter(Objects::nonNull)
+ .sorted()
+ .collect(Collectors.toList());
+
+ assertEquals(1, sortedMetrics2.size());
+
+ assertEquals("5", sortedMetrics2.get(0));
+ }
+
+ { // test with filter
+ Tuple2<String, List<String>> filter = getFilter();
+ Map<String, List<String>> queryParameters = new HashMap<>(4);
+ queryParameters.put(filter.f0, filter.f1);
+ HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ handler.getMessageHeaders().getUnresolvedMessageParameters(),
+ pathParameters,
+ queryParameters
+ );
+ Collection<? extends MetricStore.ComponentMetricStore> subStores = handler.getStores(store, request);
+
+ assertEquals(2, subStores.size());
+
+ List<String> sortedMetrics1 = subStores.stream()
+ .map(subStore -> subStore.getMetric("abc.metric1"))
+ .filter(Objects::nonNull)
+ .sorted()
+ .collect(Collectors.toList());
+
+ assertEquals(1, sortedMetrics1.size());
+
+ assertEquals("1", sortedMetrics1.get(0));
+
+ List<String> sortedMetrics2 = subStores.stream()
+ .map(subStore -> subStore.getMetric("abc.metric2"))
+ .filter(Objects::nonNull)
+ .sorted()
+ .collect(Collectors.toList());
+
+ assertEquals(1, sortedMetrics2.size());
+
+ assertEquals("5", sortedMetrics2.get(0));
+ }
+ }
+
+ @Test
+ public void testListMetrics() throws Exception {
+ HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ handler.getMessageHeaders().getUnresolvedMessageParameters(),
+ pathParameters,
+ Collections.emptyMap()
+ );
+
+ AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+ .get();
+
+ List<String> availableMetrics = response.getMetrics().stream()
+ .map(AggregatedMetric::getId)
+ .sorted()
+ .collect(Collectors.toList());
+
+ assertEquals(2, availableMetrics.size());
+ assertEquals("abc.metric1", availableMetrics.get(0));
+ assertEquals("abc.metric2", availableMetrics.get(1));
+ }
+
+ @Test
+ public void testMinAggregation() throws Exception {
+ Map<String, List<String>> queryParams = new HashMap<>(4);
+ queryParams.put("get", Collections.singletonList("abc.metric1"));
+ queryParams.put("agg", Collections.singletonList("min"));
+
+ HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ handler.getMessageHeaders().getUnresolvedMessageParameters(),
+ pathParameters,
+ queryParams
+ );
+
+ AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+ .get();
+
+ Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+ assertEquals(1, aggregatedMetrics.size());
+ AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+ assertEquals("abc.metric1", aggregatedMetric.getId());
+ assertEquals(1.0, aggregatedMetric.getMin(), 0.1);
+ assertNull(aggregatedMetric.getMax());
+ assertNull(aggregatedMetric.getSum());
+ assertNull(aggregatedMetric.getAvg());
+ }
+
+ @Test
+ public void testMaxAggregation() throws Exception {
+ Map<String, List<String>> queryParams = new HashMap<>(4);
+ queryParams.put("get", Collections.singletonList("abc.metric1"));
+ queryParams.put("agg", Collections.singletonList("max"));
+
+ HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ handler.getMessageHeaders().getUnresolvedMessageParameters(),
+ pathParameters,
+ queryParams
+ );
+
+ AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+ .get();
+
+ Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+ assertEquals(1, aggregatedMetrics.size());
+ AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+ assertEquals("abc.metric1", aggregatedMetric.getId());
+ assertEquals(3.0, aggregatedMetric.getMax(), 0.1);
+ assertNull(aggregatedMetric.getMin());
+ assertNull(aggregatedMetric.getSum());
+ assertNull(aggregatedMetric.getAvg());
+ }
+
+ @Test
+ public void testSumAggregation() throws Exception {
+ Map<String, List<String>> queryParams = new HashMap<>(4);
+ queryParams.put("get", Collections.singletonList("abc.metric1"));
+ queryParams.put("agg", Collections.singletonList("sum"));
+
+ HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ handler.getMessageHeaders().getUnresolvedMessageParameters(),
+ pathParameters,
+ queryParams
+ );
+
+ AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+ .get();
+
+ Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+ assertEquals(1, aggregatedMetrics.size());
+ AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+ assertEquals("abc.metric1", aggregatedMetric.getId());
+ assertEquals(4.0, aggregatedMetric.getSum(), 0.1);
+ assertNull(aggregatedMetric.getMin());
+ assertNull(aggregatedMetric.getMax());
+ assertNull(aggregatedMetric.getAvg());
+ }
+
+ @Test
+ public void testAvgAggregation() throws Exception {
+ Map<String, List<String>> queryParams = new HashMap<>(4);
+ queryParams.put("get", Collections.singletonList("abc.metric1"));
+ queryParams.put("agg", Collections.singletonList("avg"));
+
+ HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ handler.getMessageHeaders().getUnresolvedMessageParameters(),
+ pathParameters,
+ queryParams
+ );
+
+ AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+ .get();
+
+ Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+ assertEquals(1, aggregatedMetrics.size());
+ AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+ assertEquals("abc.metric1", aggregatedMetric.getId());
+ assertEquals(2.0, aggregatedMetric.getAvg(), 0.1);
+ assertNull(aggregatedMetric.getMin());
+ assertNull(aggregatedMetric.getMax());
+ assertNull(aggregatedMetric.getSum());
+ }
+
+ @Test
+ public void testMultipleAggregation() throws Exception {
+ Map<String, List<String>> queryParams = new HashMap<>(4);
+ queryParams.put("get", Collections.singletonList("abc.metric1"));
+ queryParams.put("agg", Arrays.asList("min", "max", "avg"));
+
+ HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ handler.getMessageHeaders().getUnresolvedMessageParameters(),
+ pathParameters,
+ queryParams
+ );
+
+ AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+ .get();
+
+ Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+ assertEquals(1, aggregatedMetrics.size());
+ AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+ assertEquals("abc.metric1", aggregatedMetric.getId());
+ assertEquals(1.0, aggregatedMetric.getMin(), 0.1);
+ assertEquals(3.0, aggregatedMetric.getMax(), 0.1);
+ assertEquals(2.0, aggregatedMetric.getAvg(), 0.1);
+ assertNull(aggregatedMetric.getSum());
+ }
+
+ @Test
+ public void testDefaultAggregation() throws Exception {
+ Map<String, List<String>> queryParams = new HashMap<>(4);
+ queryParams.put("get", Collections.singletonList("abc.metric1"));
+
+ HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ handler.getMessageHeaders().getUnresolvedMessageParameters(),
+ pathParameters,
+ queryParams
+ );
+
+ AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+ .get();
+
+ Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+ assertEquals(1, aggregatedMetrics.size());
+ AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+ assertEquals("abc.metric1", aggregatedMetric.getId());
+ assertEquals(1.0, aggregatedMetric.getMin(), 0.1);
+ assertEquals(3.0, aggregatedMetric.getMax(), 0.1);
+ assertEquals(2.0, aggregatedMetric.getAvg(), 0.1);
+ assertEquals(4.0, aggregatedMetric.getSum(), 0.1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
new file mode 100644
index 0000000..902570c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests for the {@link AggregatingSubtasksMetricsHandler}.
+ */
+public class AggregatingSubtasksMetricsHandlerTest extends AggregatingMetricsHandlerTestBase<AggregatingSubtasksMetricsHandler, AggregatedSubtaskMetricsParameters> {
+
+ private static final JobID JOB_ID = JobID.generate();
+ private static final JobVertexID TASK_ID = new JobVertexID();
+
+ @Override
+ protected Tuple2<String, List<String>> getFilter() {
+ return Tuple2.of("subtasks", Arrays.asList("1", "3"));
+ }
+
+ @Override
+ protected Map<String, String> getPathParameters() {
+ Map<String, String> pathParameters = new HashMap<>(4);
+ pathParameters.put(JobIDPathParameter.KEY, JOB_ID.toString());
+ pathParameters.put(JobVertexIdPathParameter.KEY, TASK_ID.toString());
+ return pathParameters;
+ }
+
+ @Override
+ protected Collection<MetricDump> getMetricDumps() {
+ Collection<MetricDump> dumps = new ArrayList<>(3);
+ QueryScopeInfo.TaskQueryScopeInfo task1 = new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), TASK_ID.toString(), 1, "abc");
+ MetricDump.CounterDump cd1 = new MetricDump.CounterDump(task1, "metric1", 1);
+ dumps.add(cd1);
+
+ QueryScopeInfo.TaskQueryScopeInfo task2 = new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), TASK_ID.toString(), 2, "abc");
+ MetricDump.CounterDump cd2 = new MetricDump.CounterDump(task2, "metric1", 3);
+ dumps.add(cd2);
+
+ QueryScopeInfo.TaskQueryScopeInfo task3 = new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), TASK_ID.toString(), 3, "abc");
+ MetricDump.CounterDump cd3 = new MetricDump.CounterDump(task3, "metric2", 5);
+ dumps.add(cd3);
+
+ return dumps;
+ }
+
+ @Override
+ protected AggregatingSubtasksMetricsHandler getHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+ return new AggregatingSubtasksMetricsHandler(
+ localRestAddress,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ executor,
+ fetcher
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
new file mode 100644
index 0000000..fb7a51b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregateTaskManagerMetricsParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests for the {@link AggregatingTaskManagersMetricsHandler}.
+ */
+public class AggregatingTaskManagersMetricsHandlerTest extends AggregatingMetricsHandlerTestBase<AggregatingTaskManagersMetricsHandler, AggregateTaskManagerMetricsParameters> {
+
+ private static final ResourceID TM_ID_1 = ResourceID.generate();
+ private static final ResourceID TM_ID_2 = ResourceID.generate();
+ private static final ResourceID TM_ID_3 = ResourceID.generate();
+
+ @Override
+ protected Tuple2<String, List<String>> getFilter() {
+ return Tuple2.of("taskmanagers", Arrays.asList(TM_ID_1.toString(), TM_ID_3.toString()));
+ }
+
+ @Override
+ protected Collection<MetricDump> getMetricDumps() {
+ Collection<MetricDump> dumps = new ArrayList<>(3);
+ QueryScopeInfo.TaskManagerQueryScopeInfo tm1 = new QueryScopeInfo.TaskManagerQueryScopeInfo(TM_ID_1.toString(), "abc");
+ MetricDump.CounterDump cd1 = new MetricDump.CounterDump(tm1, "metric1", 1);
+ dumps.add(cd1);
+
+ QueryScopeInfo.TaskManagerQueryScopeInfo tm2 = new QueryScopeInfo.TaskManagerQueryScopeInfo(TM_ID_2.toString(), "abc");
+ MetricDump.CounterDump cd2 = new MetricDump.CounterDump(tm2, "metric1", 3);
+ dumps.add(cd2);
+
+ QueryScopeInfo.TaskManagerQueryScopeInfo tm3 = new QueryScopeInfo.TaskManagerQueryScopeInfo(TM_ID_3.toString(), "abc");
+ MetricDump.CounterDump cd3 = new MetricDump.CounterDump(tm3, "metric2", 5);
+ dumps.add(cd3);
+
+ return dumps;
+ }
+
+ @Override
+ protected AggregatingTaskManagersMetricsHandler getHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+ return new AggregatingTaskManagersMetricsHandler(
+ localRestAddress,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ executor,
+ fetcher
+ );
+ }
+}