You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/16 21:49:59 UTC

[flink] 01/03: [hotfix] Introduce MetricFetcher interface

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 992b65af7420ebacf8aa02b26269190919ce3ccd
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 11 15:30:01 2019 +0100

    [hotfix] Introduce MetricFetcher interface
    
    Rename MetricFetcher into MetricFetcherImpl and introduce MetricFetcher interface.
    This allows for better testing and hides the type parameter of MetricFetcherImpl.
---
 .../flink/docs/rest/RestAPIDocGenerator.java       |   1 +
 .../runtime/webmonitor/WebRuntimeMonitor.java      |   8 +-
 .../runtime/dispatcher/DispatcherRestEndpoint.java |   1 +
 ...tDispatcherResourceManagerComponentFactory.java |  11 +-
 .../flink/runtime/minicluster/MiniCluster.java     |  17 +-
 .../flink/runtime/rest/JobRestEndpointFactory.java |   1 +
 .../flink/runtime/rest/RestEndpointFactory.java    |   1 +
 .../runtime/rest/SessionRestEndpointFactory.java   |   1 +
 .../rest/handler/job/JobDetailsHandler.java        |   8 +-
 .../rest/handler/job/JobVertexDetailsHandler.java  |   6 +-
 .../handler/job/JobVertexTaskManagersHandler.java  |   6 +-
 .../job/SubtaskCurrentAttemptDetailsHandler.java   |   4 +-
 .../job/SubtaskExecutionAttemptDetailsHandler.java |   6 +-
 .../metrics/AbstractAggregatingMetricsHandler.java |   4 +-
 .../job/metrics/AggregatingJobsMetricsHandler.java |   2 +-
 .../metrics/AggregatingSubtasksMetricsHandler.java |   2 +-
 .../AggregatingTaskManagersMetricsHandler.java     |   2 +-
 .../rest/handler/legacy/metrics/MetricFetcher.java | 182 +--------------------
 .../{MetricFetcher.java => MetricFetcherImpl.java} |  35 +++-
 .../handler/legacy/metrics/VoidMetricFetcher.java  |  38 +++++
 .../rest/handler/util/MutableIOMetrics.java        |   1 +
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  11 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   3 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   3 +-
 .../metrics/AggregatingJobsMetricsHandlerTest.java |   2 +-
 .../metrics/AggregatingMetricsHandlerTestBase.java |   5 +-
 .../AggregatingSubtasksMetricsHandlerTest.java     |   2 +-
 .../AggregatingTaskManagersMetricsHandlerTest.java |   2 +-
 .../handler/legacy/metrics/MetricFetcherTest.java  |   2 +-
 29 files changed, 138 insertions(+), 229 deletions(-)

diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 98eaf39..891ae08 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index d0499ef..826b93a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -101,7 +102,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private AtomicBoolean cleanedUp = new AtomicBoolean();
 
-
 	private MetricFetcher metricFetcher;
 
 	public WebRuntimeMonitor(
@@ -193,7 +193,11 @@ public class WebRuntimeMonitor implements WebMonitor {
 		} else {
 			sslFactory = null;
 		}
-		metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, scheduledExecutor, timeout);
+		metricFetcher = new MetricFetcherImpl<>(
+			retriever,
+			queryServiceRetriever,
+			scheduledExecutor,
+			timeout);
 
 		Router router = new Router();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index c6c060d..b36fd6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.WebMonitorExtension;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 5059476..8b593fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -61,6 +61,7 @@ import javax.annotation.Nonnull;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Abstract class which implements the creation of the {@link DispatcherResourceManagerComponent} components.
@@ -128,15 +129,17 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
 				10,
 				Time.milliseconds(50L));
 
+			final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
+				configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
+				configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
+				"DispatcherRestEndpoint");
+
 			webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
 				configuration,
 				dispatcherGatewayRetriever,
 				resourceManagerGatewayRetriever,
 				blobServer,
-				WebMonitorEndpoint.createExecutorService(
-					configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
-					configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
-					"DispatcherRestEndpoint"),
+				executor,
 				metricQueryServiceRetriever,
 				highAvailabilityServices.getWebMonitorLeaderElectionService(),
 				fatalErrorHandler);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1da7827..92738ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -97,6 +97,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -347,6 +348,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					20,
 					Time.milliseconds(20L));
 
+				final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
+					configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
+					configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
+					"DispatcherRestEndpoint");
+
 				this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
 					RestServerEndpointConfiguration.fromConfiguration(configuration),
 					dispatcherGatewayRetriever,
@@ -354,24 +360,21 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					RestHandlerConfiguration.fromConfiguration(configuration),
 					resourceManagerGatewayRetriever,
 					blobServer.getTransientBlobService(),
-					WebMonitorEndpoint.createExecutorService(
-						configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
-						configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
-						"DispatcherRestEndpoint"),
+					executor,
 					new AkkaQueryServiceRetriever(
 						metricQueryServiceActorSystem,
 						Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
 					haServices.getWebMonitorLeaderElectionService(),
 					new ShutDownFatalErrorHandler());
 
-				dispatcherRestEndpoint.start();
+				this.dispatcherRestEndpoint.start();
 
-				restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl());
+				restAddressURI = new URI(this.dispatcherRestEndpoint.getRestBaseUrl());
 
 				// bring up the dispatcher that launches JobManagers when jobs submitted
 				LOG.info("Starting job dispatcher(s) for JobManger");
 
-				final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
+				final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, this.dispatcherRestEndpoint);
 
 				dispatcher = new StandaloneDispatcher(
 					jobManagerRpcService,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
index 9bfc9ac..dec7768 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
index 64750e7..ff10be6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
index 4669745..2dcf6de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 2a0e1fc..aefe724 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -59,7 +59,7 @@ import java.util.concurrent.Executor;
  */
 public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> implements JsonArchivist {
 
-	private final MetricFetcher<?> metricFetcher;
+	private final MetricFetcher metricFetcher;
 
 	public JobDetailsHandler(
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -68,7 +68,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI
 			MessageHeaders<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> messageHeaders,
 			ExecutionGraphCache executionGraphCache,
 			Executor executor,
-			MetricFetcher<?> metricFetcher) {
+			MetricFetcher metricFetcher) {
 		super(
 			leaderRetriever,
 			timeout,
@@ -95,7 +95,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI
 		return Collections.singleton(new ArchivedJson(path, json));
 	}
 
-	private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph executionGraph, @Nullable MetricFetcher<?> metricFetcher) {
+	private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph executionGraph, @Nullable MetricFetcher metricFetcher) {
 		final long now = System.currentTimeMillis();
 		final long startTime = executionGraph.getStatusTimestamp(JobStatus.CREATED);
 		final long endTime = executionGraph.getState().isGloballyTerminalState() ?
@@ -147,7 +147,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI
 			AccessExecutionJobVertex ejv,
 			long now,
 			JobID jobId,
-			MetricFetcher<?> metricFetcher) {
+			MetricFetcher metricFetcher) {
 		int[] tasksPerState = new int[ExecutionState.values().length];
 		long startTime = Long.MAX_VALUE;
 		long endTime = 0;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
index 71a919a..0cab589 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
@@ -57,7 +57,7 @@ import java.util.concurrent.Executor;
  * Request handler for the job vertex details.
  */
 public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters> implements JsonArchivist {
-	private final MetricFetcher<? extends RestfulGateway> metricFetcher;
+	private final MetricFetcher metricFetcher;
 
 	public JobVertexDetailsHandler(
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -66,7 +66,7 @@ public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVe
 			MessageHeaders<EmptyRequestBody, JobVertexDetailsInfo, JobVertexMessageParameters> messageHeaders,
 			ExecutionGraphCache executionGraphCache,
 			Executor executor,
-			MetricFetcher<? extends RestfulGateway> metricFetcher) {
+			MetricFetcher metricFetcher) {
 		super(
 			leaderRetriever,
 			timeout,
@@ -106,7 +106,7 @@ public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVe
 		return archive;
 	}
 
-	private static JobVertexDetailsInfo createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) {
+	private static JobVertexDetailsInfo createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher metricFetcher) {
 		List<JobVertexDetailsInfo.VertexTaskDetail> subtasks = new ArrayList<>();
 		final long now = System.currentTimeMillis();
 		int num = 0;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 8984009..fdd9cff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -62,7 +62,7 @@ import java.util.concurrent.Executor;
  * runtime and metrics of all its subtasks aggregated by TaskManager.
  */
 public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> implements JsonArchivist {
-	private MetricFetcher<?> metricFetcher;
+	private MetricFetcher metricFetcher;
 
 	public JobVertexTaskManagersHandler(
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -71,7 +71,7 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 			MessageHeaders<EmptyRequestBody, JobVertexTaskManagersInfo, JobVertexMessageParameters> messageHeaders,
 			ExecutionGraphCache executionGraphCache,
 			Executor executor,
-			MetricFetcher<?> metricFetcher) {
+			MetricFetcher metricFetcher) {
 		super(leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
 		this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
 	}
@@ -105,7 +105,7 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 		return archive;
 	}
 
-	private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) {
+	private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher metricFetcher) {
 		// Build a map that groups tasks by TaskManager
 		Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
 		for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
index f632669..3a5a5d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
@@ -46,7 +46,7 @@ import java.util.concurrent.Executor;
  */
 public class SubtaskCurrentAttemptDetailsHandler extends AbstractSubtaskHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> {
 
-	private final MetricFetcher<?> metricFetcher;
+	private final MetricFetcher metricFetcher;
 
 	public SubtaskCurrentAttemptDetailsHandler(
 		GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -55,7 +55,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends AbstractSubtaskHandler<
 		MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> messageHeaders,
 		ExecutionGraphCache executionGraphCache,
 		Executor executor,
-		MetricFetcher<?> metricFetcher) {
+		MetricFetcher metricFetcher) {
 
 		super(leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index 1caa917..c538606 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -61,7 +61,7 @@ public class SubtaskExecutionAttemptDetailsHandler
 	extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters>
 	implements JsonArchivist {
 
-	private final MetricFetcher<?> metricFetcher;
+	private final MetricFetcher metricFetcher;
 
 	/**
 	 * Instantiates a new subtask execution attempt details handler.
@@ -80,7 +80,7 @@ public class SubtaskExecutionAttemptDetailsHandler
 			MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> messageHeaders,
 			ExecutionGraphCache executionGraphCache,
 			Executor executor,
-			MetricFetcher<?> metricFetcher) {
+			MetricFetcher metricFetcher) {
 
 		super(leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
 
@@ -131,7 +131,7 @@ public class SubtaskExecutionAttemptDetailsHandler
 			AccessExecution execution,
 			JobID jobID,
 			JobVertexID jobVertexID,
-			@Nullable MetricFetcher<?> metricFetcher) {
+			@Nullable MetricFetcher metricFetcher) {
 		final MutableIOMetrics ioMetrics = new MutableIOMetrics();
 
 		ioMetrics.addIOMetrics(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
index be48d89..b482dfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
@@ -74,7 +74,7 @@ import java.util.stream.Collectors;
 public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> {
 
 	private final Executor executor;
-	private final MetricFetcher<?> fetcher;
+	private final MetricFetcher fetcher;
 
 	protected AbstractAggregatingMetricsHandler(
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -82,7 +82,7 @@ public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggreg
 			Map<String, String> responseHeaders,
 			AbstractAggregatedMetricsHeaders<P> messageHeaders,
 			Executor executor,
-			MetricFetcher<?> fetcher) {
+			MetricFetcher fetcher) {
 		super(leaderRetriever, timeout, responseHeaders, messageHeaders);
 		this.executor = Preconditions.checkNotNull(executor);
 		this.fetcher = Preconditions.checkNotNull(fetcher);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
index df4483c..ead1b3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
@@ -51,7 +51,7 @@ public class AggregatingJobsMetricsHandler extends AbstractAggregatingMetricsHan
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout, Map<String, String> responseHeaders,
 			Executor executor,
-			MetricFetcher<?> fetcher) {
+			MetricFetcher fetcher) {
 		super(leaderRetriever, timeout, responseHeaders, AggregatedJobMetricsHeaders.getInstance(), executor, fetcher);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
index bd5e83d..a2eed43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
@@ -58,7 +58,7 @@ public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetric
 			Time timeout,
 			Map<String, String> responseHeaders,
 			Executor executor,
-			MetricFetcher<?> fetcher) {
+			MetricFetcher fetcher) {
 		super(leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
index e943e2b..a638132 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
@@ -51,7 +51,7 @@ public class AggregatingTaskManagersMetricsHandler extends AbstractAggregatingMe
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout, Map<String, String> responseHeaders,
 			Executor executor,
-			MetricFetcher<?> fetcher) {
+			MetricFetcher fetcher) {
 		super(leaderRetriever, timeout, responseHeaders, AggregatedTaskManagerMetricsHeaders.getInstance(), executor, fetcher);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index 44a6169..3afdd25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -18,193 +18,23 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
-
 /**
  * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers.
  *
  * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since
  * the last call has passed.
  */
-public class MetricFetcher<T extends RestfulGateway> {
-	private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
-
-	private final GatewayRetriever<T> retriever;
-	private final MetricQueryServiceRetriever queryServiceRetriever;
-	private final Executor executor;
-	private final Time timeout;
-
-	private final MetricStore metrics = new MetricStore();
-	private final MetricDumpDeserializer deserializer = new MetricDumpDeserializer();
-
-	private long lastUpdateTime;
-
-	public MetricFetcher(
-			GatewayRetriever<T> retriever,
-			MetricQueryServiceRetriever queryServiceRetriever,
-			Executor executor,
-			Time timeout) {
-		this.retriever = Preconditions.checkNotNull(retriever);
-		this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever);
-		this.executor = Preconditions.checkNotNull(executor);
-		this.timeout = Preconditions.checkNotNull(timeout);
-	}
-
-	/**
-	 * Returns the MetricStore containing all stored metrics.
-	 *
-	 * @return MetricStore containing all stored metrics;
-	 */
-	public MetricStore getMetricStore() {
-		return metrics;
-	}
-
-	/**
-	 * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated.
-	 */
-	public void update() {
-		synchronized (this) {
-			long currentTime = System.currentTimeMillis();
-			if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update
-				lastUpdateTime = currentTime;
-				fetchMetrics();
-			}
-		}
-	}
-
-	private void fetchMetrics() {
-		LOG.debug("Start fetching metrics.");
-
-		try {
-			Optional<T> optionalLeaderGateway = retriever.getNow();
-			if (optionalLeaderGateway.isPresent()) {
-				final T leaderGateway = optionalLeaderGateway.get();
-
-				/*
-				 * Remove all metrics that belong to a job that is not running and no longer archived.
-				 */
-				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestMultipleJobDetails(timeout);
-
-				jobDetailsFuture.whenCompleteAsync(
-					(MultipleJobsDetails jobDetails, Throwable throwable) -> {
-						if (throwable != null) {
-							LOG.debug("Fetching of JobDetails failed.", throwable);
-						} else {
-							ArrayList<String> toRetain = new ArrayList<>(jobDetails.getJobs().size());
-							for (JobDetails job : jobDetails.getJobs()) {
-								toRetain.add(job.getJobId().toString());
-							}
-							metrics.retainJobs(toRetain);
-						}
-					},
-					executor);
-
-				CompletableFuture<Collection<String>> queryServicePathsFuture = leaderGateway.requestMetricQueryServicePaths(timeout);
-
-				queryServicePathsFuture.whenCompleteAsync(
-					(Collection<String> queryServicePaths, Throwable throwable) -> {
-						if (throwable != null) {
-							LOG.warn("Requesting paths for query services failed.", throwable);
-						} else {
-							for (String queryServicePath : queryServicePaths) {
-								retrieveAndQueryMetrics(queryServicePath);
-							}
-						}
-					},
-					executor);
-
-				// TODO: Once the old code has been ditched, remove the explicit TaskManager query service discovery
-				// TODO: and return it as part of requestQueryServicePaths. Moreover, change the MetricStore such that
-				// TODO: we don't have to explicitly retain the valid TaskManagers, e.g. letting it be a cache with expiry time
-				CompletableFuture<Collection<Tuple2<ResourceID, String>>> taskManagerQueryServicePathsFuture = leaderGateway
-					.requestTaskManagerMetricQueryServicePaths(timeout);
-
-				taskManagerQueryServicePathsFuture.whenCompleteAsync(
-					(Collection<Tuple2<ResourceID, String>> queryServicePaths, Throwable throwable) -> {
-						if (throwable != null) {
-							LOG.warn("Requesting TaskManager's path for query services failed.", throwable);
-						} else {
-							List<String> taskManagersToRetain = queryServicePaths
-								.stream()
-								.map(
-									(Tuple2<ResourceID, String> tuple) -> {
-										retrieveAndQueryMetrics(tuple.f1);
-										return tuple.f0.getResourceIdString();
-									}
-								).collect(Collectors.toList());
-
-							metrics.retainTaskManagers(taskManagersToRetain);
-						}
-					},
-					executor);
-			}
-		} catch (Exception e) {
-			LOG.warn("Exception while fetching metrics.", e);
-		}
-	}
+public interface MetricFetcher {
 
 	/**
-	 * Retrieves and queries the specified QueryServiceGateway.
+	 * Get {@link MetricStore} which contains all currently fetched metrics.
 	 *
-	 * @param queryServicePath specifying the QueryServiceGateway
+	 * @return {@link MetricStore} with all fetched metrics
 	 */
-	private void retrieveAndQueryMetrics(String queryServicePath) {
-		LOG.debug("Retrieve metric query service gateway for {}", queryServicePath);
-
-		final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
-
-		queryServiceGatewayFuture.whenCompleteAsync(
-			(MetricQueryServiceGateway queryServiceGateway, Throwable t) -> {
-				if (t != null) {
-					LOG.debug("Could not retrieve QueryServiceGateway.", t);
-				} else {
-					queryMetrics(queryServiceGateway);
-				}
-			},
-			executor);
-	}
+	MetricStore getMetricStore();
 
 	/**
-	 * Query the metrics from the given QueryServiceGateway.
-	 *
-	 * @param queryServiceGateway to query for metrics
+	 * Trigger fetching of metrics.
 	 */
-	private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
-		LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
-
-		queryServiceGateway
-			.queryMetrics(timeout)
-			.whenCompleteAsync(
-				(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
-					if (t != null) {
-						LOG.debug("Fetching metrics failed.", t);
-					} else {
-						metrics.addAll(deserializer.deserialize(result));
-					}
-				},
-				executor);
-	}
+	void update();
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
similarity index 86%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
index 44a6169..861f512 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -33,24 +35,26 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
 
 /**
- * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers.
+ * Implementation of {@link MetricFetcher} which fetches metrics from the {@link MetricQueryServiceGateway}.
  *
- * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since
- * the last call has passed.
+ * @param <T> type of the {@link RestfulGateway} from which to retrieve the metric query service path.
  */
-public class MetricFetcher<T extends RestfulGateway> {
-	private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
+public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetcher {
+	private static final Logger LOG = LoggerFactory.getLogger(MetricFetcherImpl.class);
 
 	private final GatewayRetriever<T> retriever;
 	private final MetricQueryServiceRetriever queryServiceRetriever;
@@ -62,7 +66,7 @@ public class MetricFetcher<T extends RestfulGateway> {
 
 	private long lastUpdateTime;
 
-	public MetricFetcher(
+	public MetricFetcherImpl(
 			GatewayRetriever<T> retriever,
 			MetricQueryServiceRetriever queryServiceRetriever,
 			Executor executor,
@@ -78,6 +82,7 @@ public class MetricFetcher<T extends RestfulGateway> {
 	 *
 	 * @return MetricStore containing all stored metrics;
 	 */
+	@Override
 	public MetricStore getMetricStore() {
 		return metrics;
 	}
@@ -85,10 +90,11 @@ public class MetricFetcher<T extends RestfulGateway> {
 	/**
 	 * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated.
 	 */
+	@Override
 	public void update() {
 		synchronized (this) {
 			long currentTime = System.currentTimeMillis();
-			if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update
+			if (currentTime - lastUpdateTime > 10000L) {
 				lastUpdateTime = currentTime;
 				fetchMetrics();
 			}
@@ -207,4 +213,19 @@ public class MetricFetcher<T extends RestfulGateway> {
 				},
 				executor);
 	}
+
+	@Nonnull
+	public static <T extends RestfulGateway> MetricFetcherImpl<T> fromConfiguration(
+			final Configuration configuration,
+			final MetricQueryServiceRetriever metricQueryServiceRetriever,
+			final GatewayRetriever<T> dispatcherGatewayRetriever,
+			final ExecutorService executor) {
+		final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+		return new MetricFetcherImpl<>(
+			dispatcherGatewayRetriever,
+			metricQueryServiceRetriever,
+			executor,
+			timeout);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java
new file mode 100644
index 0000000..28d5450
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+/**
+ * No-op implementation of the {@link MetricFetcher}.
+ */
+public enum VoidMetricFetcher implements MetricFetcher {
+	INSTANCE;
+
+	private static final MetricStore METRIC_STORE = new MetricStore();
+
+	@Override
+	public MetricStore getMetricStore() {
+		return METRIC_STORE;
+	}
+
+	@Override
+	public void update() {
+		// noop
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index 6822417..948ec93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.rest.handler.job.JobVertexDetailsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index ec14aa3..f215ab8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -77,6 +77,7 @@ import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandle
 import org.apache.flink.runtime.rest.handler.legacy.files.StdoutFileHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler;
@@ -157,7 +158,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 	private final ExecutionGraphCache executionGraphCache;
 	private final CheckpointStatsCache checkpointStatsCache;
 
-	private final MetricFetcher<? extends T> metricFetcher;
+	private final MetricFetcher metricFetcher;
 
 	private final LeaderElectionService leaderElectionService;
 
@@ -193,11 +194,11 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		this.checkpointStatsCache = new CheckpointStatsCache(
 			restConfiguration.getMaxCheckpointStatisticCacheEntries());
 
-		this.metricFetcher = new MetricFetcher<>(
-			leaderRetriever,
+		this.metricFetcher = MetricFetcherImpl.fromConfiguration(
+			clusterConfiguration,
 			metricQueryServiceRetriever,
-			executor,
-			restConfiguration.getTimeout());
+			leaderRetriever,
+			executor);
 
 		this.leaderElectionService = Preconditions.checkNotNull(leaderElectionService);
 		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index f00e49f..10d670e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
@@ -118,7 +119,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
 		// Instance the handler.
 		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration());
 
-		final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
+		final MetricFetcher metricFetcher = new MetricFetcherImpl<>(
 			() -> null,
 			path -> null,
 			TestingUtils.defaultExecutor(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 59c6b38..6160d4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
@@ -116,7 +117,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
 			emptyAccumulators);
 
 		// Change some fields so we can make it different from other sub tasks.
-		final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
+		final MetricFetcher metricFetcher = new MetricFetcherImpl<>(
 			() -> null,
 			path -> null,
 			TestingUtils.defaultExecutor(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
index 76f81dc..a01dd78 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
@@ -67,7 +67,7 @@ public class AggregatingJobsMetricsHandlerTest extends AggregatingMetricsHandler
 	}
 
 	@Override
-	protected AggregatingJobsMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+	protected AggregatingJobsMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher fetcher) {
 		return new AggregatingJobsMetricsHandler(
 			leaderRetriever,
 			timeout,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
index 7db669a..41e5a06 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
@@ -88,7 +89,7 @@ public abstract class AggregatingMetricsHandlerTestBase<
 
 	@Before
 	public void setUp() throws Exception {
-		MetricFetcher<RestfulGateway> fetcher = new MetricFetcher<RestfulGateway>(
+		MetricFetcher fetcher = new MetricFetcherImpl<RestfulGateway>(
 			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
@@ -122,7 +123,7 @@ public abstract class AggregatingMetricsHandlerTestBase<
 		Time timeout,
 		Map<String, String> responseHeaders,
 		Executor executor,
-		MetricFetcher<?> fetcher
+		MetricFetcher fetcher
 	);
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
index 3c6c9f1..06b5b71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
@@ -79,7 +79,7 @@ public class AggregatingSubtasksMetricsHandlerTest extends AggregatingMetricsHan
 	}
 
 	@Override
-	protected AggregatingSubtasksMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+	protected AggregatingSubtasksMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher fetcher) {
 		return new AggregatingSubtasksMetricsHandler(
 			leaderRetriever,
 			timeout,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
index 14d3777..4e2cf24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
@@ -68,7 +68,7 @@ public class AggregatingTaskManagersMetricsHandlerTest extends AggregatingMetric
 	}
 
 	@Override
-	protected AggregatingTaskManagersMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+	protected AggregatingTaskManagersMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher fetcher) {
 		return new AggregatingTaskManagersMetricsHandler(
 			leaderRetriever,
 			timeout,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index 61c028f..e1ec719 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -108,7 +108,7 @@ public class MetricFetcherTest extends TestLogger {
 		when(queryServiceRetriever.retrieveService(eq(tmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
 
 		// ========= start MetricFetcher testing =======================================================================
-		MetricFetcher fetcher = new MetricFetcher<>(
+		MetricFetcher fetcher = new MetricFetcherImpl<>(
 			retriever,
 			queryServiceRetriever,
 			Executors.directExecutor(),