You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/16 19:18:49 UTC

[08/11] flink git commit: [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
new file mode 100644
index 0000000..4453ee2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Test base for handlers that extend {@link AbstractAggregatingMetricsHandler}.
+ */
+public abstract class AggregatingMetricsHandlerTestBase<
+	H extends AbstractAggregatingMetricsHandler<P>,
+	P extends AbstractAggregatedMetricsParameters<?>>
+	extends TestLogger {
+
+	private static final CompletableFuture<String> TEST_REST_ADDRESS;
+	private static final DispatcherGateway MOCK_DISPATCHER_GATEWAY;
+	private static final GatewayRetriever<DispatcherGateway> LEADER_RETRIEVER;
+	private static final Time TIMEOUT = Time.milliseconds(50);
+	private static final Map<String, String> TEST_HEADERS = Collections.emptyMap();
+	private static final Executor EXECUTOR = TestingUtils.defaultExecutor();
+
+	static {
+		TEST_REST_ADDRESS = CompletableFuture.completedFuture("localhost:12345");
+
+		MOCK_DISPATCHER_GATEWAY = mock(DispatcherGateway.class);
+
+		LEADER_RETRIEVER = new GatewayRetriever<DispatcherGateway>() {
+			@Override
+			public CompletableFuture<DispatcherGateway> getFuture() {
+				return CompletableFuture.completedFuture(MOCK_DISPATCHER_GATEWAY);
+			}
+		};
+	}
+
+	private H handler;
+	private MetricStore store;
+	private Map<String, String> pathParameters;
+
+	@Before
+	public void setUp() throws Exception {
+		MetricFetcher<RestfulGateway> fetcher = new MetricFetcher<RestfulGateway>(
+			mock(GatewayRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
+		store = fetcher.getMetricStore();
+
+		Collection<MetricDump> metricDumps = getMetricDumps();
+		for (MetricDump dump : metricDumps) {
+			store.add(dump);
+		}
+
+		handler = getHandler(
+			TEST_REST_ADDRESS,
+			LEADER_RETRIEVER,
+			TIMEOUT,
+			TEST_HEADERS,
+			EXECUTOR,
+			fetcher);
+		pathParameters = getPathParameters();
+	}
+
+	protected Map<String, String> getPathParameters() {
+		return Collections.emptyMap();
+	}
+
+	protected abstract Tuple2<String, List<String>> getFilter();
+
+	protected abstract Collection<MetricDump> getMetricDumps();
+
+	protected abstract H getHandler(
+		CompletableFuture<String> localRestAddress,
+		GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+		Time timeout,
+		Map<String, String> responseHeaders,
+		Executor executor,
+		MetricFetcher<?> fetcher
+	);
+
+	@Test
+	public void getStores() throws Exception {
+		{ // test without filter
+			HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+				EmptyRequestBody.getInstance(),
+				handler.getMessageHeaders().getUnresolvedMessageParameters(),
+				pathParameters,
+				Collections.emptyMap()
+			);
+			Collection<? extends MetricStore.ComponentMetricStore> subStores = handler.getStores(store, request);
+
+			assertEquals(3, subStores.size());
+
+			List<String> sortedMetrics1 = subStores.stream()
+				.map(subStore -> subStore.getMetric("abc.metric1"))
+				.filter(Objects::nonNull)
+				.sorted()
+				.collect(Collectors.toList());
+
+			assertEquals(2, sortedMetrics1.size());
+
+			assertEquals("1", sortedMetrics1.get(0));
+			assertEquals("3", sortedMetrics1.get(1));
+
+			List<String> sortedMetrics2 = subStores.stream()
+				.map(subStore -> subStore.getMetric("abc.metric2"))
+				.filter(Objects::nonNull)
+				.sorted()
+				.collect(Collectors.toList());
+
+			assertEquals(1, sortedMetrics2.size());
+
+			assertEquals("5", sortedMetrics2.get(0));
+		}
+
+		{ // test with filter
+			Tuple2<String, List<String>> filter = getFilter();
+			Map<String, List<String>> queryParameters = new HashMap<>(4);
+			queryParameters.put(filter.f0, filter.f1);
+			HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+				EmptyRequestBody.getInstance(),
+				handler.getMessageHeaders().getUnresolvedMessageParameters(),
+				pathParameters,
+				queryParameters
+			);
+			Collection<? extends MetricStore.ComponentMetricStore> subStores = handler.getStores(store, request);
+
+			assertEquals(2, subStores.size());
+
+			List<String> sortedMetrics1 = subStores.stream()
+				.map(subStore -> subStore.getMetric("abc.metric1"))
+				.filter(Objects::nonNull)
+				.sorted()
+				.collect(Collectors.toList());
+
+			assertEquals(1, sortedMetrics1.size());
+
+			assertEquals("1", sortedMetrics1.get(0));
+
+			List<String> sortedMetrics2 = subStores.stream()
+				.map(subStore -> subStore.getMetric("abc.metric2"))
+				.filter(Objects::nonNull)
+				.sorted()
+				.collect(Collectors.toList());
+
+			assertEquals(1, sortedMetrics2.size());
+
+			assertEquals("5", sortedMetrics2.get(0));
+		}
+	}
+
+	@Test
+	public void testListMetrics() throws Exception {
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			Collections.emptyMap()
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		List<String> availableMetrics = response.getMetrics().stream()
+			.map(AggregatedMetric::getId)
+			.sorted()
+			.collect(Collectors.toList());
+
+		assertEquals(2, availableMetrics.size());
+		assertEquals("abc.metric1", availableMetrics.get(0));
+		assertEquals("abc.metric2", availableMetrics.get(1));
+	}
+
+	@Test
+	public void testMinAggregation() throws Exception {
+		Map<String, List<String>> queryParams = new HashMap<>(4);
+		queryParams.put("get", Collections.singletonList("abc.metric1"));
+		queryParams.put("agg", Collections.singletonList("min"));
+
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			queryParams
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+		assertEquals(1, aggregatedMetrics.size());
+		AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+		assertEquals("abc.metric1", aggregatedMetric.getId());
+		assertEquals(1.0, aggregatedMetric.getMin(), 0.1);
+		assertNull(aggregatedMetric.getMax());
+		assertNull(aggregatedMetric.getSum());
+		assertNull(aggregatedMetric.getAvg());
+	}
+
+	@Test
+	public void testMaxAggregation() throws Exception {
+		Map<String, List<String>> queryParams = new HashMap<>(4);
+		queryParams.put("get", Collections.singletonList("abc.metric1"));
+		queryParams.put("agg", Collections.singletonList("max"));
+
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			queryParams
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+		assertEquals(1, aggregatedMetrics.size());
+		AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+		assertEquals("abc.metric1", aggregatedMetric.getId());
+		assertEquals(3.0, aggregatedMetric.getMax(), 0.1);
+		assertNull(aggregatedMetric.getMin());
+		assertNull(aggregatedMetric.getSum());
+		assertNull(aggregatedMetric.getAvg());
+	}
+
+	@Test
+	public void testSumAggregation() throws Exception {
+		Map<String, List<String>> queryParams = new HashMap<>(4);
+		queryParams.put("get", Collections.singletonList("abc.metric1"));
+		queryParams.put("agg", Collections.singletonList("sum"));
+
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			queryParams
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+		assertEquals(1, aggregatedMetrics.size());
+		AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+		assertEquals("abc.metric1", aggregatedMetric.getId());
+		assertEquals(4.0, aggregatedMetric.getSum(), 0.1);
+		assertNull(aggregatedMetric.getMin());
+		assertNull(aggregatedMetric.getMax());
+		assertNull(aggregatedMetric.getAvg());
+	}
+
+	@Test
+	public void testAvgAggregation() throws Exception {
+		Map<String, List<String>> queryParams = new HashMap<>(4);
+		queryParams.put("get", Collections.singletonList("abc.metric1"));
+		queryParams.put("agg", Collections.singletonList("avg"));
+
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			queryParams
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+		assertEquals(1, aggregatedMetrics.size());
+		AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+		assertEquals("abc.metric1", aggregatedMetric.getId());
+		assertEquals(2.0, aggregatedMetric.getAvg(), 0.1);
+		assertNull(aggregatedMetric.getMin());
+		assertNull(aggregatedMetric.getMax());
+		assertNull(aggregatedMetric.getSum());
+	}
+
+	@Test
+	public void testMultipleAggregation() throws Exception {
+		Map<String, List<String>> queryParams = new HashMap<>(4);
+		queryParams.put("get", Collections.singletonList("abc.metric1"));
+		queryParams.put("agg", Arrays.asList("min", "max", "avg"));
+
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			queryParams
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+		assertEquals(1, aggregatedMetrics.size());
+		AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+		assertEquals("abc.metric1", aggregatedMetric.getId());
+		assertEquals(1.0, aggregatedMetric.getMin(), 0.1);
+		assertEquals(3.0, aggregatedMetric.getMax(), 0.1);
+		assertEquals(2.0, aggregatedMetric.getAvg(), 0.1);
+		assertNull(aggregatedMetric.getSum());
+	}
+
+	@Test
+	public void testDefaultAggregation() throws Exception {
+		Map<String, List<String>> queryParams = new HashMap<>(4);
+		queryParams.put("get", Collections.singletonList("abc.metric1"));
+
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			queryParams
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+		assertEquals(1, aggregatedMetrics.size());
+		AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+		assertEquals("abc.metric1", aggregatedMetric.getId());
+		assertEquals(1.0, aggregatedMetric.getMin(), 0.1);
+		assertEquals(3.0, aggregatedMetric.getMax(), 0.1);
+		assertEquals(2.0, aggregatedMetric.getAvg(), 0.1);
+		assertEquals(4.0, aggregatedMetric.getSum(), 0.1);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
new file mode 100644
index 0000000..902570c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests for the {@link AggregatingSubtasksMetricsHandler}.
+ */
+public class AggregatingSubtasksMetricsHandlerTest extends AggregatingMetricsHandlerTestBase<AggregatingSubtasksMetricsHandler, AggregatedSubtaskMetricsParameters> {
+
+	private static final JobID JOB_ID = JobID.generate();
+	private static final JobVertexID TASK_ID = new JobVertexID();
+
+	@Override
+	protected Tuple2<String, List<String>> getFilter() {
+		return Tuple2.of("subtasks", Arrays.asList("1", "3"));
+	}
+
+	@Override
+	protected Map<String, String> getPathParameters() {
+		Map<String, String> pathParameters = new HashMap<>(4);
+		pathParameters.put(JobIDPathParameter.KEY, JOB_ID.toString());
+		pathParameters.put(JobVertexIdPathParameter.KEY, TASK_ID.toString());
+		return pathParameters;
+	}
+
+	@Override
+	protected Collection<MetricDump> getMetricDumps() {
+		Collection<MetricDump> dumps = new ArrayList<>(3);
+		QueryScopeInfo.TaskQueryScopeInfo task1 = new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), TASK_ID.toString(), 1, "abc");
+		MetricDump.CounterDump cd1 = new MetricDump.CounterDump(task1, "metric1", 1);
+		dumps.add(cd1);
+
+		QueryScopeInfo.TaskQueryScopeInfo task2 = new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), TASK_ID.toString(), 2, "abc");
+		MetricDump.CounterDump cd2 = new MetricDump.CounterDump(task2, "metric1", 3);
+		dumps.add(cd2);
+
+		QueryScopeInfo.TaskQueryScopeInfo task3 = new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), TASK_ID.toString(), 3, "abc");
+		MetricDump.CounterDump cd3 = new MetricDump.CounterDump(task3, "metric2", 5);
+		dumps.add(cd3);
+
+		return dumps;
+	}
+
+	@Override
+	protected AggregatingSubtasksMetricsHandler getHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+		return new AggregatingSubtasksMetricsHandler(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			executor,
+			fetcher
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
new file mode 100644
index 0000000..fb7a51b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregateTaskManagerMetricsParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests for the {@link AggregatingTaskManagersMetricsHandler}.
+ */
+public class AggregatingTaskManagersMetricsHandlerTest extends AggregatingMetricsHandlerTestBase<AggregatingTaskManagersMetricsHandler, AggregateTaskManagerMetricsParameters> {
+
+	private static final ResourceID TM_ID_1 = ResourceID.generate();
+	private static final ResourceID TM_ID_2 = ResourceID.generate();
+	private static final ResourceID TM_ID_3 = ResourceID.generate();
+
+	@Override
+	protected Tuple2<String, List<String>> getFilter() {
+		return Tuple2.of("taskmanagers", Arrays.asList(TM_ID_1.toString(), TM_ID_3.toString()));
+	}
+
+	@Override
+	protected Collection<MetricDump> getMetricDumps() {
+		Collection<MetricDump> dumps = new ArrayList<>(3);
+		QueryScopeInfo.TaskManagerQueryScopeInfo tm1 = new QueryScopeInfo.TaskManagerQueryScopeInfo(TM_ID_1.toString(), "abc");
+		MetricDump.CounterDump cd1 = new MetricDump.CounterDump(tm1, "metric1", 1);
+		dumps.add(cd1);
+
+		QueryScopeInfo.TaskManagerQueryScopeInfo tm2 = new QueryScopeInfo.TaskManagerQueryScopeInfo(TM_ID_2.toString(), "abc");
+		MetricDump.CounterDump cd2 = new MetricDump.CounterDump(tm2, "metric1", 3);
+		dumps.add(cd2);
+
+		QueryScopeInfo.TaskManagerQueryScopeInfo tm3 = new QueryScopeInfo.TaskManagerQueryScopeInfo(TM_ID_3.toString(), "abc");
+		MetricDump.CounterDump cd3 = new MetricDump.CounterDump(tm3, "metric2", 5);
+		dumps.add(cd3);
+
+		return dumps;
+	}
+
+	@Override
+	protected AggregatingTaskManagersMetricsHandler getHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+		return new AggregatingTaskManagersMetricsHandler(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			executor,
+			fetcher
+		);
+	}
+}