You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/16 21:49:58 UTC

[flink] branch master updated (86e06bc -> 7d98940)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 86e06bc  [hotfix][docs] Remove stray character
     new 992b65a  [hotfix] Introduce MetricFetcher interface
     new 474fe8e  [FLINK-10822] Make MetricFetcher update interval configurable
     new 7d98940  [hotfix] Remove Mockito from MetricFetcherTest

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/_includes/generated/metric_configuration.html |   5 +
 .../apache/flink/configuration/MetricOptions.java  |   8 +
 .../flink/docs/rest/RestAPIDocGenerator.java       |   6 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java      |  10 +-
 .../runtime/dispatcher/DispatcherRestEndpoint.java |   6 +-
 ...tDispatcherResourceManagerComponentFactory.java |  18 +-
 .../jobmaster/MiniDispatcherRestEndpoint.java      |   6 +-
 .../flink/runtime/minicluster/MiniCluster.java     |  28 ++--
 .../flink/runtime/rest/JobRestEndpointFactory.java |   6 +-
 .../flink/runtime/rest/RestEndpointFactory.java    |   4 +-
 .../runtime/rest/SessionRestEndpointFactory.java   |   6 +-
 .../rest/handler/job/JobDetailsHandler.java        |   8 +-
 .../rest/handler/job/JobVertexDetailsHandler.java  |   6 +-
 .../handler/job/JobVertexTaskManagersHandler.java  |   6 +-
 .../job/SubtaskCurrentAttemptDetailsHandler.java   |   4 +-
 .../job/SubtaskExecutionAttemptDetailsHandler.java |   6 +-
 .../metrics/AbstractAggregatingMetricsHandler.java |   4 +-
 .../job/metrics/AggregatingJobsMetricsHandler.java |   2 +-
 .../metrics/AggregatingSubtasksMetricsHandler.java |   2 +-
 .../AggregatingTaskManagersMetricsHandler.java     |   2 +-
 .../rest/handler/legacy/metrics/MetricFetcher.java | 182 +--------------------
 .../{MetricFetcher.java => MetricFetcherImpl.java} |  45 ++++-
 .../handler/legacy/metrics/VoidMetricFetcher.java  |  26 +--
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  11 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   7 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   7 +-
 .../metrics/AggregatingJobsMetricsHandlerTest.java |   2 +-
 .../metrics/AggregatingMetricsHandlerTestBase.java |   9 +-
 .../AggregatingSubtasksMetricsHandlerTest.java     |   2 +-
 .../AggregatingTaskManagersMetricsHandlerTest.java |   2 +-
 .../handler/legacy/metrics/MetricFetcherTest.java  | 131 ++++++++++-----
 .../TestingMetricQueryServiceGateway.java          |  76 +++++++++
 32 files changed, 336 insertions(+), 307 deletions(-)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/{MetricFetcher.java => MetricFetcherImpl.java} (82%)
 copy flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala => flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java (71%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/TestingMetricQueryServiceGateway.java


[flink] 02/03: [FLINK-10822] Make MetricFetcher update interval configurable

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 474fe8e8ae063e250f7f8f4eddcd799b15f8b69b
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 10 11:17:54 2019 +0100

    [FLINK-10822] Make MetricFetcher update interval configurable
    
    Introduce WebOptions.METRIC_FETCHER_UPDATE_INTERVAL configuration option to configure
    the update interval of the MetricFetcher.
    
    This closes #7459.
---
 docs/_includes/generated/metric_configuration.html |  5 ++
 .../apache/flink/configuration/MetricOptions.java  |  8 +++
 .../flink/docs/rest/RestAPIDocGenerator.java       |  5 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java      |  4 +-
 .../runtime/dispatcher/DispatcherRestEndpoint.java |  5 +-
 ...tDispatcherResourceManagerComponentFactory.java |  7 ++-
 .../jobmaster/MiniDispatcherRestEndpoint.java      |  6 +-
 .../flink/runtime/minicluster/MiniCluster.java     | 11 +++-
 .../flink/runtime/rest/JobRestEndpointFactory.java |  5 +-
 .../flink/runtime/rest/RestEndpointFactory.java    |  3 +-
 .../runtime/rest/SessionRestEndpointFactory.java   |  5 +-
 .../handler/legacy/metrics/MetricFetcherImpl.java  | 14 ++++-
 .../rest/handler/util/MutableIOMetrics.java        |  1 -
 .../runtime/webmonitor/WebMonitorEndpoint.java     | 10 +---
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |  4 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |  4 +-
 .../metrics/AggregatingMetricsHandlerTestBase.java |  4 +-
 .../handler/legacy/metrics/MetricFetcherTest.java  | 67 +++++++++++++++++++++-
 18 files changed, 129 insertions(+), 39 deletions(-)

diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html
index 39a76ce..bb4b2a8 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -8,6 +8,11 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>metrics.fetcher.update-interval</h5></td>
+            <td style="word-wrap: break-word;">10000</td>
+            <td>Update interval for the metric fetcher used by the web UI in milliseconds. Decrease this value for faster updating metrics. Increase this value if the metric fetcher causes too much load.</td>
+        </tr>
+        <tr>
             <td><h5>metrics.internal.query-service.port</h5></td>
             <td style="word-wrap: break-word;">"0"</td>
             <td>The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 2c22d92..3deb766 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -175,6 +175,14 @@ public class MetricOptions {
 			" by Akka's thread pool executor. " +
 			"The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). " +
 			"Warning, increasing this value may bring the main Flink components down.");
+	/**
+	 * The config parameter defining the update interval for the metric fetcher used by the web UI in milliseconds.
+	 */
+	public static final ConfigOption<Long> METRIC_FETCHER_UPDATE_INTERVAL =
+		key("metrics.fetcher.update-interval")
+			.defaultValue(10000L)
+			.withDescription("Update interval for the metric fetcher used by the web UI in milliseconds. Decrease this value for " +
+				"faster updating metrics. Increase this value if the metric fetcher causes too much load.");
 
 	private MetricOptions() {
 	}
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 891ae08..a0ec804 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.ConfigurationException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -325,7 +324,6 @@ public class RestAPIDocGenerator {
 		private static final RestHandlerConfiguration handlerConfig;
 		private static final GatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever;
 		private static final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
-		private static final MetricQueryServiceRetriever metricQueryServiceRetriever;
 
 		static {
 			config = new Configuration();
@@ -341,7 +339,6 @@ public class RestAPIDocGenerator {
 
 			dispatcherGatewayRetriever = () -> null;
 			resourceManagerGatewayRetriever = () -> null;
-			metricQueryServiceRetriever = path -> null;
 		}
 
 		private DocumentingDispatcherRestEndpoint() throws IOException {
@@ -353,7 +350,7 @@ public class RestAPIDocGenerator {
 				resourceManagerGatewayRetriever,
 				NoOpTransientBlobService.INSTANCE,
 				Executors.newFixedThreadPool(1),
-				metricQueryServiceRetriever,
+				VoidMetricFetcher.INSTANCE,
 				NoOpElectionService.INSTANCE,
 				NoOpFatalErrorHandler.INSTANCE);
 		}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 826b93a..10db5a7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
@@ -197,7 +198,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 			retriever,
 			queryServiceRetriever,
 			scheduledExecutor,
-			timeout);
+			timeout,
+			MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
 
 		Router router = new Router();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index b36fd6f..3d52331 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.WebMonitorExtension;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -61,7 +60,7 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 			GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
 			TransientBlobService transientBlobService,
 			ExecutorService executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			MetricFetcher metricFetcher,
 			LeaderElectionService leaderElectionService,
 			FatalErrorHandler fatalErrorHandler) throws IOException {
 
@@ -73,7 +72,7 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 			resourceManagerRetriever,
 			transientBlobService,
 			executor,
-			metricQueryServiceRetriever,
+			metricFetcher,
 			leaderElectionService,
 			fatalErrorHandler);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 8b593fc..9ab348d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rest.RestEndpointFactory;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -140,7 +141,11 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
 				resourceManagerGatewayRetriever,
 				blobServer,
 				executor,
-				metricQueryServiceRetriever,
+				MetricFetcherImpl.fromConfiguration(
+					configuration,
+					metricQueryServiceRetriever,
+					dispatcherGatewayRetriever,
+					executor),
 				highAvailabilityServices.getWebMonitorLeaderElectionService(),
 				fatalErrorHandler);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
index dae9d8e..f11b669 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
@@ -25,11 +25,11 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
@@ -47,7 +47,7 @@ public class MiniDispatcherRestEndpoint extends WebMonitorEndpoint<RestfulGatewa
 			GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
 			TransientBlobService transientBlobService,
 			ExecutorService executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			MetricFetcher metricFetcher,
 			LeaderElectionService leaderElectionService,
 			FatalErrorHandler fatalErrorHandler) throws IOException {
 		super(
@@ -58,7 +58,7 @@ public class MiniDispatcherRestEndpoint extends WebMonitorEndpoint<RestfulGatewa
 			resourceManagerRetriever,
 			transientBlobService,
 			executor,
-			metricQueryServiceRetriever,
+			metricFetcher,
 			leaderElectionService,
 			fatalErrorHandler);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 92738ca..80f8c1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -65,6 +65,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
@@ -361,9 +362,13 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					resourceManagerGatewayRetriever,
 					blobServer.getTransientBlobService(),
 					executor,
-					new AkkaQueryServiceRetriever(
-						metricQueryServiceActorSystem,
-						Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
+					MetricFetcherImpl.fromConfiguration(
+						configuration,
+						new AkkaQueryServiceRetriever(
+							metricQueryServiceActorSystem,
+							Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
+						dispatcherGatewayRetriever,
+						executor),
 					haServices.getWebMonitorLeaderElectionService(),
 					new ShutDownFatalErrorHandler());
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
index dec7768..21df821 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import java.util.concurrent.ExecutorService;
 
@@ -47,7 +46,7 @@ public enum JobRestEndpointFactory implements RestEndpointFactory<RestfulGateway
 			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
 			TransientBlobService transientBlobService,
 			ExecutorService executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			MetricFetcher metricFetcher,
 			LeaderElectionService leaderElectionService,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
 		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
@@ -60,7 +59,7 @@ public enum JobRestEndpointFactory implements RestEndpointFactory<RestfulGateway
 			resourceManagerGatewayRetriever,
 			transientBlobService,
 			executor,
-			metricQueryServiceRetriever,
+			metricFetcher,
 			leaderElectionService,
 			fatalErrorHandler);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
index ff10be6..840a4be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import java.util.concurrent.ExecutorService;
 
@@ -45,7 +44,7 @@ public interface RestEndpointFactory<T extends RestfulGateway> {
 		LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
 		TransientBlobService transientBlobService,
 		ExecutorService executor,
-		MetricQueryServiceRetriever metricQueryServiceRetriever,
+		MetricFetcher metricFetcher,
 		LeaderElectionService leaderElectionService,
 		FatalErrorHandler fatalErrorHandler) throws Exception;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
index 2dcf6de..15d5fe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import java.util.concurrent.ExecutorService;
 
@@ -46,7 +45,7 @@ public enum SessionRestEndpointFactory implements RestEndpointFactory<Dispatcher
 			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
 			TransientBlobService transientBlobService,
 			ExecutorService executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			MetricFetcher metricFetcher,
 			LeaderElectionService leaderElectionService,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
 		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
@@ -59,7 +58,7 @@ public enum SessionRestEndpointFactory implements RestEndpointFactory<Dispatcher
 			resourceManagerGatewayRetriever,
 			transientBlobService,
 			executor,
-			metricQueryServiceRetriever,
+			metricFetcher,
 			leaderElectionService,
 			fatalErrorHandler);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
index 861f512..7e904e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -63,6 +64,7 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche
 
 	private final MetricStore metrics = new MetricStore();
 	private final MetricDumpDeserializer deserializer = new MetricDumpDeserializer();
+	private final long updateInterval;
 
 	private long lastUpdateTime;
 
@@ -70,11 +72,15 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche
 			GatewayRetriever<T> retriever,
 			MetricQueryServiceRetriever queryServiceRetriever,
 			Executor executor,
-			Time timeout) {
+			Time timeout,
+			long updateInterval) {
 		this.retriever = Preconditions.checkNotNull(retriever);
 		this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever);
 		this.executor = Preconditions.checkNotNull(executor);
 		this.timeout = Preconditions.checkNotNull(timeout);
+
+		Preconditions.checkArgument(updateInterval > 0, "The update interval must be larger than 0.");
+		this.updateInterval = updateInterval;
 	}
 
 	/**
@@ -94,7 +100,7 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche
 	public void update() {
 		synchronized (this) {
 			long currentTime = System.currentTimeMillis();
-			if (currentTime - lastUpdateTime > 10000L) {
+			if (currentTime - lastUpdateTime > updateInterval) {
 				lastUpdateTime = currentTime;
 				fetchMetrics();
 			}
@@ -221,11 +227,13 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche
 			final GatewayRetriever<T> dispatcherGatewayRetriever,
 			final ExecutorService executor) {
 		final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+		final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
 
 		return new MetricFetcherImpl<>(
 			dispatcherGatewayRetriever,
 			metricQueryServiceRetriever,
 			executor,
-			timeout);
+			timeout,
+			updateInterval);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index 948ec93..6822417 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.rest.handler.job.JobVertexDetailsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
-import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index f215ab8..8efbe7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -77,7 +77,6 @@ import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandle
 import org.apache.flink.runtime.rest.handler.legacy.files.StdoutFileHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
-import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler;
@@ -119,7 +118,6 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FileUtils;
@@ -176,7 +174,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
 			TransientBlobService transientBlobService,
 			ExecutorService executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			MetricFetcher metricFetcher,
 			LeaderElectionService leaderElectionService,
 			FatalErrorHandler fatalErrorHandler) throws IOException {
 		super(endpointConfiguration);
@@ -194,11 +192,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		this.checkpointStatsCache = new CheckpointStatsCache(
 			restConfiguration.getMaxCheckpointStatisticCacheEntries());
 
-		this.metricFetcher = MetricFetcherImpl.fromConfiguration(
-			clusterConfiguration,
-			metricQueryServiceRetriever,
-			leaderRetriever,
-			executor);
+		this.metricFetcher = metricFetcher;
 
 		this.leaderElectionService = Preconditions.checkNotNull(leaderElectionService);
 		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 10d670e..5c688fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -123,7 +124,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
 			() -> null,
 			path -> null,
 			TestingUtils.defaultExecutor(),
-			Time.milliseconds(1000L));
+			Time.milliseconds(1000L),
+			MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
 
 		final SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(
 			() -> null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 6160d4a..d0a78d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecution;
@@ -121,7 +122,8 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
 			() -> null,
 			path -> null,
 			TestingUtils.defaultExecutor(),
-			Time.milliseconds(1000L));
+			Time.milliseconds(1000L),
+			MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
 
 		// Instance the handler.
 		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
index 41e5a06..a982ac1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.job.metrics;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
@@ -93,7 +94,8 @@ public abstract class AggregatingMetricsHandlerTestBase<
 			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
-			TestingUtils.TIMEOUT());
+			TestingUtils.TIMEOUT(),
+			MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
 		store = fetcher.getMetricStore();
 
 		Collection<MetricDump> metricDumps = getMetricDumps();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index e1ec719..b478893 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
@@ -35,6 +36,8 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -46,13 +49,18 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -112,7 +120,8 @@ public class MetricFetcherTest extends TestLogger {
 			retriever,
 			queryServiceRetriever,
 			Executors.directExecutor(),
-			timeout);
+			timeout,
+			MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
 
 		// verify that update fetches metrics and updates the store
 		fetcher.update();
@@ -184,4 +193,60 @@ public class MetricFetcherTest extends TestLogger {
 
 		return dump;
 	}
+
+	@Test
+	public void testLongUpdateInterval() {
+		final long updateInterval = 1000L;
+		final AtomicInteger requestMetricQueryServicePathsCounter = new AtomicInteger(0);
+		final RestfulGateway restfulGateway = createRestfulGateway(requestMetricQueryServicePathsCounter);
+
+		final MetricFetcher fetcher = createMetricFetcher(updateInterval, restfulGateway);
+
+		fetcher.update();
+		fetcher.update();
+
+		assertThat(requestMetricQueryServicePathsCounter.get(), is(1));
+	}
+
+	@Test
+	public void testShortUpdateInterval() throws InterruptedException {
+		final long updateInterval = 1L;
+		final AtomicInteger requestMetricQueryServicePathsCounter = new AtomicInteger(0);
+		final RestfulGateway restfulGateway = createRestfulGateway(requestMetricQueryServicePathsCounter);
+
+		final MetricFetcher fetcher = createMetricFetcher(updateInterval, restfulGateway);
+
+		fetcher.update();
+
+		final long start = System.currentTimeMillis();
+		long difference = 0L;
+
+		while (difference <= updateInterval) {
+			Thread.sleep(2L * updateInterval);
+			difference = System.currentTimeMillis() - start;
+		}
+
+		fetcher.update();
+
+		assertThat(requestMetricQueryServicePathsCounter.get(), is(2));
+	}
+
+	@Nonnull
+	private MetricFetcher createMetricFetcher(long updateInterval, RestfulGateway restfulGateway) {
+		return new MetricFetcherImpl<>(
+			() -> CompletableFuture.completedFuture(restfulGateway),
+			path -> new CompletableFuture<>(),
+			Executors.directExecutor(),
+			Time.seconds(10L),
+			updateInterval);
+	}
+
+	private RestfulGateway createRestfulGateway(AtomicInteger requestMetricQueryServicePathsCounter) {
+		return new TestingRestfulGateway.Builder()
+			.setRequestMetricQueryServicePathsSupplier(() -> {
+				requestMetricQueryServicePathsCounter.incrementAndGet();
+				return new CompletableFuture<>();
+			})
+			.build();
+	}
 }


[flink] 01/03: [hotfix] Introduce MetricFetcher interface

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 992b65af7420ebacf8aa02b26269190919ce3ccd
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 11 15:30:01 2019 +0100

    [hotfix] Introduce MetricFetcher interface
    
    Rename MetricFetcher into MetricFetcherImpl and introduce MetricFetcher interface.
    This allows for better testing and hides the type parameter of MetricFetcherImpl.
---
 .../flink/docs/rest/RestAPIDocGenerator.java       |   1 +
 .../runtime/webmonitor/WebRuntimeMonitor.java      |   8 +-
 .../runtime/dispatcher/DispatcherRestEndpoint.java |   1 +
 ...tDispatcherResourceManagerComponentFactory.java |  11 +-
 .../flink/runtime/minicluster/MiniCluster.java     |  17 +-
 .../flink/runtime/rest/JobRestEndpointFactory.java |   1 +
 .../flink/runtime/rest/RestEndpointFactory.java    |   1 +
 .../runtime/rest/SessionRestEndpointFactory.java   |   1 +
 .../rest/handler/job/JobDetailsHandler.java        |   8 +-
 .../rest/handler/job/JobVertexDetailsHandler.java  |   6 +-
 .../handler/job/JobVertexTaskManagersHandler.java  |   6 +-
 .../job/SubtaskCurrentAttemptDetailsHandler.java   |   4 +-
 .../job/SubtaskExecutionAttemptDetailsHandler.java |   6 +-
 .../metrics/AbstractAggregatingMetricsHandler.java |   4 +-
 .../job/metrics/AggregatingJobsMetricsHandler.java |   2 +-
 .../metrics/AggregatingSubtasksMetricsHandler.java |   2 +-
 .../AggregatingTaskManagersMetricsHandler.java     |   2 +-
 .../rest/handler/legacy/metrics/MetricFetcher.java | 182 +--------------------
 .../{MetricFetcher.java => MetricFetcherImpl.java} |  35 +++-
 .../handler/legacy/metrics/VoidMetricFetcher.java  |  38 +++++
 .../rest/handler/util/MutableIOMetrics.java        |   1 +
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  11 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   3 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   3 +-
 .../metrics/AggregatingJobsMetricsHandlerTest.java |   2 +-
 .../metrics/AggregatingMetricsHandlerTestBase.java |   5 +-
 .../AggregatingSubtasksMetricsHandlerTest.java     |   2 +-
 .../AggregatingTaskManagersMetricsHandlerTest.java |   2 +-
 .../handler/legacy/metrics/MetricFetcherTest.java  |   2 +-
 29 files changed, 138 insertions(+), 229 deletions(-)

diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 98eaf39..891ae08 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index d0499ef..826b93a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -101,7 +102,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private AtomicBoolean cleanedUp = new AtomicBoolean();
 
-
 	private MetricFetcher metricFetcher;
 
 	public WebRuntimeMonitor(
@@ -193,7 +193,11 @@ public class WebRuntimeMonitor implements WebMonitor {
 		} else {
 			sslFactory = null;
 		}
-		metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, scheduledExecutor, timeout);
+		metricFetcher = new MetricFetcherImpl<>(
+			retriever,
+			queryServiceRetriever,
+			scheduledExecutor,
+			timeout);
 
 		Router router = new Router();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index c6c060d..b36fd6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.WebMonitorExtension;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 5059476..8b593fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -61,6 +61,7 @@ import javax.annotation.Nonnull;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Abstract class which implements the creation of the {@link DispatcherResourceManagerComponent} components.
@@ -128,15 +129,17 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
 				10,
 				Time.milliseconds(50L));
 
+			final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
+				configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
+				configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
+				"DispatcherRestEndpoint");
+
 			webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
 				configuration,
 				dispatcherGatewayRetriever,
 				resourceManagerGatewayRetriever,
 				blobServer,
-				WebMonitorEndpoint.createExecutorService(
-					configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
-					configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
-					"DispatcherRestEndpoint"),
+				executor,
 				metricQueryServiceRetriever,
 				highAvailabilityServices.getWebMonitorLeaderElectionService(),
 				fatalErrorHandler);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1da7827..92738ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -97,6 +97,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -347,6 +348,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					20,
 					Time.milliseconds(20L));
 
+				final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
+					configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
+					configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
+					"DispatcherRestEndpoint");
+
 				this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
 					RestServerEndpointConfiguration.fromConfiguration(configuration),
 					dispatcherGatewayRetriever,
@@ -354,24 +360,21 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					RestHandlerConfiguration.fromConfiguration(configuration),
 					resourceManagerGatewayRetriever,
 					blobServer.getTransientBlobService(),
-					WebMonitorEndpoint.createExecutorService(
-						configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
-						configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
-						"DispatcherRestEndpoint"),
+					executor,
 					new AkkaQueryServiceRetriever(
 						metricQueryServiceActorSystem,
 						Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
 					haServices.getWebMonitorLeaderElectionService(),
 					new ShutDownFatalErrorHandler());
 
-				dispatcherRestEndpoint.start();
+				this.dispatcherRestEndpoint.start();
 
-				restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl());
+				restAddressURI = new URI(this.dispatcherRestEndpoint.getRestBaseUrl());
 
 				// bring up the dispatcher that launches JobManagers when jobs submitted
 				LOG.info("Starting job dispatcher(s) for JobManger");
 
-				final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
+				final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, this.dispatcherRestEndpoint);
 
 				dispatcher = new StandaloneDispatcher(
 					jobManagerRpcService,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
index 9bfc9ac..dec7768 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
index 64750e7..ff10be6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
index 4669745..2dcf6de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 2a0e1fc..aefe724 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -59,7 +59,7 @@ import java.util.concurrent.Executor;
  */
 public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> implements JsonArchivist {
 
-	private final MetricFetcher<?> metricFetcher;
+	private final MetricFetcher metricFetcher;
 
 	public JobDetailsHandler(
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -68,7 +68,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI
 			MessageHeaders<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> messageHeaders,
 			ExecutionGraphCache executionGraphCache,
 			Executor executor,
-			MetricFetcher<?> metricFetcher) {
+			MetricFetcher metricFetcher) {
 		super(
 			leaderRetriever,
 			timeout,
@@ -95,7 +95,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI
 		return Collections.singleton(new ArchivedJson(path, json));
 	}
 
-	private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph executionGraph, @Nullable MetricFetcher<?> metricFetcher) {
+	private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph executionGraph, @Nullable MetricFetcher metricFetcher) {
 		final long now = System.currentTimeMillis();
 		final long startTime = executionGraph.getStatusTimestamp(JobStatus.CREATED);
 		final long endTime = executionGraph.getState().isGloballyTerminalState() ?
@@ -147,7 +147,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI
 			AccessExecutionJobVertex ejv,
 			long now,
 			JobID jobId,
-			MetricFetcher<?> metricFetcher) {
+			MetricFetcher metricFetcher) {
 		int[] tasksPerState = new int[ExecutionState.values().length];
 		long startTime = Long.MAX_VALUE;
 		long endTime = 0;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
index 71a919a..0cab589 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
@@ -57,7 +57,7 @@ import java.util.concurrent.Executor;
  * Request handler for the job vertex details.
  */
 public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters> implements JsonArchivist {
-	private final MetricFetcher<? extends RestfulGateway> metricFetcher;
+	private final MetricFetcher metricFetcher;
 
 	public JobVertexDetailsHandler(
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -66,7 +66,7 @@ public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVe
 			MessageHeaders<EmptyRequestBody, JobVertexDetailsInfo, JobVertexMessageParameters> messageHeaders,
 			ExecutionGraphCache executionGraphCache,
 			Executor executor,
-			MetricFetcher<? extends RestfulGateway> metricFetcher) {
+			MetricFetcher metricFetcher) {
 		super(
 			leaderRetriever,
 			timeout,
@@ -106,7 +106,7 @@ public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVe
 		return archive;
 	}
 
-	private static JobVertexDetailsInfo createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) {
+	private static JobVertexDetailsInfo createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher metricFetcher) {
 		List<JobVertexDetailsInfo.VertexTaskDetail> subtasks = new ArrayList<>();
 		final long now = System.currentTimeMillis();
 		int num = 0;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 8984009..fdd9cff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -62,7 +62,7 @@ import java.util.concurrent.Executor;
  * runtime and metrics of all its subtasks aggregated by TaskManager.
  */
 public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> implements JsonArchivist {
-	private MetricFetcher<?> metricFetcher;
+	private MetricFetcher metricFetcher;
 
 	public JobVertexTaskManagersHandler(
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -71,7 +71,7 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 			MessageHeaders<EmptyRequestBody, JobVertexTaskManagersInfo, JobVertexMessageParameters> messageHeaders,
 			ExecutionGraphCache executionGraphCache,
 			Executor executor,
-			MetricFetcher<?> metricFetcher) {
+			MetricFetcher metricFetcher) {
 		super(leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
 		this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
 	}
@@ -105,7 +105,7 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 		return archive;
 	}
 
-	private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) {
+	private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher metricFetcher) {
 		// Build a map that groups tasks by TaskManager
 		Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
 		for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
index f632669..3a5a5d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
@@ -46,7 +46,7 @@ import java.util.concurrent.Executor;
  */
 public class SubtaskCurrentAttemptDetailsHandler extends AbstractSubtaskHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> {
 
-	private final MetricFetcher<?> metricFetcher;
+	private final MetricFetcher metricFetcher;
 
 	public SubtaskCurrentAttemptDetailsHandler(
 		GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -55,7 +55,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends AbstractSubtaskHandler<
 		MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> messageHeaders,
 		ExecutionGraphCache executionGraphCache,
 		Executor executor,
-		MetricFetcher<?> metricFetcher) {
+		MetricFetcher metricFetcher) {
 
 		super(leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index 1caa917..c538606 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -61,7 +61,7 @@ public class SubtaskExecutionAttemptDetailsHandler
 	extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters>
 	implements JsonArchivist {
 
-	private final MetricFetcher<?> metricFetcher;
+	private final MetricFetcher metricFetcher;
 
 	/**
 	 * Instantiates a new subtask execution attempt details handler.
@@ -80,7 +80,7 @@ public class SubtaskExecutionAttemptDetailsHandler
 			MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> messageHeaders,
 			ExecutionGraphCache executionGraphCache,
 			Executor executor,
-			MetricFetcher<?> metricFetcher) {
+			MetricFetcher metricFetcher) {
 
 		super(leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
 
@@ -131,7 +131,7 @@ public class SubtaskExecutionAttemptDetailsHandler
 			AccessExecution execution,
 			JobID jobID,
 			JobVertexID jobVertexID,
-			@Nullable MetricFetcher<?> metricFetcher) {
+			@Nullable MetricFetcher metricFetcher) {
 		final MutableIOMetrics ioMetrics = new MutableIOMetrics();
 
 		ioMetrics.addIOMetrics(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
index be48d89..b482dfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
@@ -74,7 +74,7 @@ import java.util.stream.Collectors;
 public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> {
 
 	private final Executor executor;
-	private final MetricFetcher<?> fetcher;
+	private final MetricFetcher fetcher;
 
 	protected AbstractAggregatingMetricsHandler(
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -82,7 +82,7 @@ public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggreg
 			Map<String, String> responseHeaders,
 			AbstractAggregatedMetricsHeaders<P> messageHeaders,
 			Executor executor,
-			MetricFetcher<?> fetcher) {
+			MetricFetcher fetcher) {
 		super(leaderRetriever, timeout, responseHeaders, messageHeaders);
 		this.executor = Preconditions.checkNotNull(executor);
 		this.fetcher = Preconditions.checkNotNull(fetcher);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
index df4483c..ead1b3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
@@ -51,7 +51,7 @@ public class AggregatingJobsMetricsHandler extends AbstractAggregatingMetricsHan
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout, Map<String, String> responseHeaders,
 			Executor executor,
-			MetricFetcher<?> fetcher) {
+			MetricFetcher fetcher) {
 		super(leaderRetriever, timeout, responseHeaders, AggregatedJobMetricsHeaders.getInstance(), executor, fetcher);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
index bd5e83d..a2eed43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
@@ -58,7 +58,7 @@ public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetric
 			Time timeout,
 			Map<String, String> responseHeaders,
 			Executor executor,
-			MetricFetcher<?> fetcher) {
+			MetricFetcher fetcher) {
 		super(leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
index e943e2b..a638132 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
@@ -51,7 +51,7 @@ public class AggregatingTaskManagersMetricsHandler extends AbstractAggregatingMe
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout, Map<String, String> responseHeaders,
 			Executor executor,
-			MetricFetcher<?> fetcher) {
+			MetricFetcher fetcher) {
 		super(leaderRetriever, timeout, responseHeaders, AggregatedTaskManagerMetricsHeaders.getInstance(), executor, fetcher);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index 44a6169..3afdd25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -18,193 +18,23 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
-
 /**
  * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers.
  *
  * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since
  * the last call has passed.
  */
-public class MetricFetcher<T extends RestfulGateway> {
-	private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
-
-	private final GatewayRetriever<T> retriever;
-	private final MetricQueryServiceRetriever queryServiceRetriever;
-	private final Executor executor;
-	private final Time timeout;
-
-	private final MetricStore metrics = new MetricStore();
-	private final MetricDumpDeserializer deserializer = new MetricDumpDeserializer();
-
-	private long lastUpdateTime;
-
-	public MetricFetcher(
-			GatewayRetriever<T> retriever,
-			MetricQueryServiceRetriever queryServiceRetriever,
-			Executor executor,
-			Time timeout) {
-		this.retriever = Preconditions.checkNotNull(retriever);
-		this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever);
-		this.executor = Preconditions.checkNotNull(executor);
-		this.timeout = Preconditions.checkNotNull(timeout);
-	}
-
-	/**
-	 * Returns the MetricStore containing all stored metrics.
-	 *
-	 * @return MetricStore containing all stored metrics;
-	 */
-	public MetricStore getMetricStore() {
-		return metrics;
-	}
-
-	/**
-	 * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated.
-	 */
-	public void update() {
-		synchronized (this) {
-			long currentTime = System.currentTimeMillis();
-			if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update
-				lastUpdateTime = currentTime;
-				fetchMetrics();
-			}
-		}
-	}
-
-	private void fetchMetrics() {
-		LOG.debug("Start fetching metrics.");
-
-		try {
-			Optional<T> optionalLeaderGateway = retriever.getNow();
-			if (optionalLeaderGateway.isPresent()) {
-				final T leaderGateway = optionalLeaderGateway.get();
-
-				/*
-				 * Remove all metrics that belong to a job that is not running and no longer archived.
-				 */
-				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestMultipleJobDetails(timeout);
-
-				jobDetailsFuture.whenCompleteAsync(
-					(MultipleJobsDetails jobDetails, Throwable throwable) -> {
-						if (throwable != null) {
-							LOG.debug("Fetching of JobDetails failed.", throwable);
-						} else {
-							ArrayList<String> toRetain = new ArrayList<>(jobDetails.getJobs().size());
-							for (JobDetails job : jobDetails.getJobs()) {
-								toRetain.add(job.getJobId().toString());
-							}
-							metrics.retainJobs(toRetain);
-						}
-					},
-					executor);
-
-				CompletableFuture<Collection<String>> queryServicePathsFuture = leaderGateway.requestMetricQueryServicePaths(timeout);
-
-				queryServicePathsFuture.whenCompleteAsync(
-					(Collection<String> queryServicePaths, Throwable throwable) -> {
-						if (throwable != null) {
-							LOG.warn("Requesting paths for query services failed.", throwable);
-						} else {
-							for (String queryServicePath : queryServicePaths) {
-								retrieveAndQueryMetrics(queryServicePath);
-							}
-						}
-					},
-					executor);
-
-				// TODO: Once the old code has been ditched, remove the explicit TaskManager query service discovery
-				// TODO: and return it as part of requestQueryServicePaths. Moreover, change the MetricStore such that
-				// TODO: we don't have to explicitly retain the valid TaskManagers, e.g. letting it be a cache with expiry time
-				CompletableFuture<Collection<Tuple2<ResourceID, String>>> taskManagerQueryServicePathsFuture = leaderGateway
-					.requestTaskManagerMetricQueryServicePaths(timeout);
-
-				taskManagerQueryServicePathsFuture.whenCompleteAsync(
-					(Collection<Tuple2<ResourceID, String>> queryServicePaths, Throwable throwable) -> {
-						if (throwable != null) {
-							LOG.warn("Requesting TaskManager's path for query services failed.", throwable);
-						} else {
-							List<String> taskManagersToRetain = queryServicePaths
-								.stream()
-								.map(
-									(Tuple2<ResourceID, String> tuple) -> {
-										retrieveAndQueryMetrics(tuple.f1);
-										return tuple.f0.getResourceIdString();
-									}
-								).collect(Collectors.toList());
-
-							metrics.retainTaskManagers(taskManagersToRetain);
-						}
-					},
-					executor);
-			}
-		} catch (Exception e) {
-			LOG.warn("Exception while fetching metrics.", e);
-		}
-	}
+public interface MetricFetcher {
 
 	/**
-	 * Retrieves and queries the specified QueryServiceGateway.
+	 * Get {@link MetricStore} which contains all currently fetched metrics.
 	 *
-	 * @param queryServicePath specifying the QueryServiceGateway
+	 * @return {@link MetricStore} with all fetched metrics
 	 */
-	private void retrieveAndQueryMetrics(String queryServicePath) {
-		LOG.debug("Retrieve metric query service gateway for {}", queryServicePath);
-
-		final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
-
-		queryServiceGatewayFuture.whenCompleteAsync(
-			(MetricQueryServiceGateway queryServiceGateway, Throwable t) -> {
-				if (t != null) {
-					LOG.debug("Could not retrieve QueryServiceGateway.", t);
-				} else {
-					queryMetrics(queryServiceGateway);
-				}
-			},
-			executor);
-	}
+	MetricStore getMetricStore();
 
 	/**
-	 * Query the metrics from the given QueryServiceGateway.
-	 *
-	 * @param queryServiceGateway to query for metrics
+	 * Trigger fetching of metrics.
 	 */
-	private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
-		LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
-
-		queryServiceGateway
-			.queryMetrics(timeout)
-			.whenCompleteAsync(
-				(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
-					if (t != null) {
-						LOG.debug("Fetching metrics failed.", t);
-					} else {
-						metrics.addAll(deserializer.deserialize(result));
-					}
-				},
-				executor);
-	}
+	void update();
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
similarity index 86%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
index 44a6169..861f512 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -33,24 +35,26 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
 
 /**
- * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers.
+ * Implementation of {@link MetricFetcher} which fetches metrics from the {@link MetricQueryServiceGateway}.
  *
- * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since
- * the last call has passed.
+ * @param <T> type of the {@link RestfulGateway} from which to retrieve the metric query service path.
  */
-public class MetricFetcher<T extends RestfulGateway> {
-	private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
+public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetcher {
+	private static final Logger LOG = LoggerFactory.getLogger(MetricFetcherImpl.class);
 
 	private final GatewayRetriever<T> retriever;
 	private final MetricQueryServiceRetriever queryServiceRetriever;
@@ -62,7 +66,7 @@ public class MetricFetcher<T extends RestfulGateway> {
 
 	private long lastUpdateTime;
 
-	public MetricFetcher(
+	public MetricFetcherImpl(
 			GatewayRetriever<T> retriever,
 			MetricQueryServiceRetriever queryServiceRetriever,
 			Executor executor,
@@ -78,6 +82,7 @@ public class MetricFetcher<T extends RestfulGateway> {
 	 *
 	 * @return MetricStore containing all stored metrics;
 	 */
+	@Override
 	public MetricStore getMetricStore() {
 		return metrics;
 	}
@@ -85,10 +90,11 @@ public class MetricFetcher<T extends RestfulGateway> {
 	/**
 	 * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated.
 	 */
+	@Override
 	public void update() {
 		synchronized (this) {
 			long currentTime = System.currentTimeMillis();
-			if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update
+			if (currentTime - lastUpdateTime > 10000L) {
 				lastUpdateTime = currentTime;
 				fetchMetrics();
 			}
@@ -207,4 +213,19 @@ public class MetricFetcher<T extends RestfulGateway> {
 				},
 				executor);
 	}
+
+	@Nonnull
+	public static <T extends RestfulGateway> MetricFetcherImpl<T> fromConfiguration(
+			final Configuration configuration,
+			final MetricQueryServiceRetriever metricQueryServiceRetriever,
+			final GatewayRetriever<T> dispatcherGatewayRetriever,
+			final ExecutorService executor) {
+		final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+		return new MetricFetcherImpl<>(
+			dispatcherGatewayRetriever,
+			metricQueryServiceRetriever,
+			executor,
+			timeout);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java
new file mode 100644
index 0000000..28d5450
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+/**
+ * No-op implementation of the {@link MetricFetcher}.
+ */
+public enum VoidMetricFetcher implements MetricFetcher {
+	INSTANCE;
+
+	private static final MetricStore METRIC_STORE = new MetricStore();
+
+	@Override
+	public MetricStore getMetricStore() {
+		return METRIC_STORE;
+	}
+
+	@Override
+	public void update() {
+		// noop
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index 6822417..948ec93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.rest.handler.job.JobVertexDetailsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index ec14aa3..f215ab8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -77,6 +77,7 @@ import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandle
 import org.apache.flink.runtime.rest.handler.legacy.files.StdoutFileHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler;
@@ -157,7 +158,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 	private final ExecutionGraphCache executionGraphCache;
 	private final CheckpointStatsCache checkpointStatsCache;
 
-	private final MetricFetcher<? extends T> metricFetcher;
+	private final MetricFetcher metricFetcher;
 
 	private final LeaderElectionService leaderElectionService;
 
@@ -193,11 +194,11 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		this.checkpointStatsCache = new CheckpointStatsCache(
 			restConfiguration.getMaxCheckpointStatisticCacheEntries());
 
-		this.metricFetcher = new MetricFetcher<>(
-			leaderRetriever,
+		this.metricFetcher = MetricFetcherImpl.fromConfiguration(
+			clusterConfiguration,
 			metricQueryServiceRetriever,
-			executor,
-			restConfiguration.getTimeout());
+			leaderRetriever,
+			executor);
 
 		this.leaderElectionService = Preconditions.checkNotNull(leaderElectionService);
 		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index f00e49f..10d670e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
@@ -118,7 +119,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
 		// Instance the handler.
 		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration());
 
-		final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
+		final MetricFetcher metricFetcher = new MetricFetcherImpl<>(
 			() -> null,
 			path -> null,
 			TestingUtils.defaultExecutor(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 59c6b38..6160d4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
@@ -116,7 +117,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
 			emptyAccumulators);
 
 		// Change some fields so we can make it different from other sub tasks.
-		final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
+		final MetricFetcher metricFetcher = new MetricFetcherImpl<>(
 			() -> null,
 			path -> null,
 			TestingUtils.defaultExecutor(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
index 76f81dc..a01dd78 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
@@ -67,7 +67,7 @@ public class AggregatingJobsMetricsHandlerTest extends AggregatingMetricsHandler
 	}
 
 	@Override
-	protected AggregatingJobsMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+	protected AggregatingJobsMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher fetcher) {
 		return new AggregatingJobsMetricsHandler(
 			leaderRetriever,
 			timeout,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
index 7db669a..41e5a06 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
@@ -88,7 +89,7 @@ public abstract class AggregatingMetricsHandlerTestBase<
 
 	@Before
 	public void setUp() throws Exception {
-		MetricFetcher<RestfulGateway> fetcher = new MetricFetcher<RestfulGateway>(
+		MetricFetcher fetcher = new MetricFetcherImpl<RestfulGateway>(
 			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
@@ -122,7 +123,7 @@ public abstract class AggregatingMetricsHandlerTestBase<
 		Time timeout,
 		Map<String, String> responseHeaders,
 		Executor executor,
-		MetricFetcher<?> fetcher
+		MetricFetcher fetcher
 	);
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
index 3c6c9f1..06b5b71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
@@ -79,7 +79,7 @@ public class AggregatingSubtasksMetricsHandlerTest extends AggregatingMetricsHan
 	}
 
 	@Override
-	protected AggregatingSubtasksMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+	protected AggregatingSubtasksMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher fetcher) {
 		return new AggregatingSubtasksMetricsHandler(
 			leaderRetriever,
 			timeout,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
index 14d3777..4e2cf24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
@@ -68,7 +68,7 @@ public class AggregatingTaskManagersMetricsHandlerTest extends AggregatingMetric
 	}
 
 	@Override
-	protected AggregatingTaskManagersMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+	protected AggregatingTaskManagersMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher fetcher) {
 		return new AggregatingTaskManagersMetricsHandler(
 			leaderRetriever,
 			timeout,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index 61c028f..e1ec719 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -108,7 +108,7 @@ public class MetricFetcherTest extends TestLogger {
 		when(queryServiceRetriever.retrieveService(eq(tmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
 
 		// ========= start MetricFetcher testing =======================================================================
-		MetricFetcher fetcher = new MetricFetcher<>(
+		MetricFetcher fetcher = new MetricFetcherImpl<>(
 			retriever,
 			queryServiceRetriever,
 			Executors.directExecutor(),


[flink] 03/03: [hotfix] Remove Mockito from MetricFetcherTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7d98940408d4d55cc40ef3622386036cf76d78e7
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 10 18:09:19 2019 +0100

    [hotfix] Remove Mockito from MetricFetcherTest
---
 .../handler/legacy/metrics/MetricFetcherTest.java  | 60 +++++++----------
 .../TestingMetricQueryServiceGateway.java          | 76 ++++++++++++++++++++++
 2 files changed, 99 insertions(+), 37 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index b478893..16ed248 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -30,8 +30,6 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.util.TestHistogram;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
@@ -41,39 +39,29 @@ import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.TestingMetricQueryServiceGateway;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import javax.annotation.Nonnull;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
 
 /**
  * Tests for the MetricFetcher.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(MetricFetcher.class)
 public class MetricFetcherTest extends TestLogger {
 	@Test
-	public void testUpdate() throws Exception {
+	public void testUpdate() {
 		final Time timeout = Time.seconds(10L);
 
 		// ========= setup TaskManager =================================================================================
@@ -81,39 +69,37 @@ public class MetricFetcherTest extends TestLogger {
 		ResourceID tmRID = ResourceID.generate();
 
 		// ========= setup JobManager ==================================================================================
-		JobDetails details = mock(JobDetails.class);
-		when(details.getJobId()).thenReturn(jobID);
 
 		final String jmMetricQueryServicePath = "/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
 		final String tmMetricQueryServicePath = "/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString();
 
-		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
-
-		when(jobManagerGateway.requestMultipleJobDetails(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList())));
-		when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn(
-			CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)));
-		when(jobManagerGateway.requestTaskManagerMetricQueryServicePaths(any(Time.class))).thenReturn(
-			CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmRID, tmMetricQueryServicePath))));
+		final TestingRestfulGateway restfulGateway = new TestingRestfulGateway.Builder()
+			.setRequestMultipleJobDetailsSupplier(() -> CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList())))
+			.setRequestMetricQueryServicePathsSupplier(() -> CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)))
+			.setRequestTaskManagerMetricQueryServicePathsSupplier(() -> CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmRID, tmMetricQueryServicePath))))
+			.build();
 
-		GatewayRetriever<JobManagerGateway> retriever = mock(AkkaJobManagerRetriever.class);
-		when(retriever.getNow())
-			.thenReturn(Optional.of(jobManagerGateway));
+		final GatewayRetriever<RestfulGateway> retriever = () -> CompletableFuture.completedFuture(restfulGateway);
 
 		// ========= setup QueryServices ================================================================================
-		MetricQueryServiceGateway jmQueryService = mock(MetricQueryServiceGateway.class);
-		MetricQueryServiceGateway tmQueryService = mock(MetricQueryServiceGateway.class);
+		final MetricQueryServiceGateway jmQueryService = new TestingMetricQueryServiceGateway.Builder()
+			.setQueryMetricsSupplier(() -> CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], new byte[0], new byte[0], new byte[0], 0, 0, 0, 0)))
+			.build();
 
 		MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmRID, jobID);
+		final MetricQueryServiceGateway tmQueryService = new TestingMetricQueryServiceGateway.Builder()
+			.setQueryMetricsSupplier(() -> CompletableFuture.completedFuture(requestMetricsAnswer))
+			.build();
 
-		when(jmQueryService.queryMetrics(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], new byte[0], new byte[0], new byte[0], 0, 0, 0, 0)));
-		when(tmQueryService.queryMetrics(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
-
-		MetricQueryServiceRetriever queryServiceRetriever = mock(MetricQueryServiceRetriever.class);
-		when(queryServiceRetriever.retrieveService(eq(jmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
-		when(queryServiceRetriever.retrieveService(eq(tmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
+		final MetricQueryServiceRetriever queryServiceRetriever = (path) -> {
+			if (path.equals(jmMetricQueryServicePath))  {
+				return CompletableFuture.completedFuture(jmQueryService);
+			} else if (path.equals(tmMetricQueryServicePath)) {
+				return CompletableFuture.completedFuture(tmQueryService);
+			} else {
+				throw new IllegalArgumentException("Unexpected argument.");
+			}
+		};
 
 		// ========= start MetricFetcher testing =======================================================================
 		MetricFetcher fetcher = new MetricFetcherImpl<>(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/TestingMetricQueryServiceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/TestingMetricQueryServiceGateway.java
new file mode 100644
index 0000000..0ef1921
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/TestingMetricQueryServiceGateway.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of the {@link MetricQueryServiceGateway}.
+ */
+public class TestingMetricQueryServiceGateway implements MetricQueryServiceGateway {
+
+	@Nonnull
+	private final Supplier<CompletableFuture<MetricDumpSerialization.MetricSerializationResult>> queryMetricsSupplier;
+
+	@Nonnull
+	private final String address;
+
+	public TestingMetricQueryServiceGateway(@Nonnull Supplier<CompletableFuture<MetricDumpSerialization.MetricSerializationResult>> queryMetricsSupplier, @Nonnull String address) {
+		this.queryMetricsSupplier = queryMetricsSupplier;
+		this.address = address;
+	}
+
+	@Override
+	public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout) {
+		return queryMetricsSupplier.get();
+	}
+
+	@Override
+	public String getAddress() {
+		return address;
+	}
+
+	/**
+	 * Builder for the {@link TestingMetricQueryServiceGateway}.
+	 */
+	public static class Builder {
+		private Supplier<CompletableFuture<MetricDumpSerialization.MetricSerializationResult>> queryMetricsSupplier = CompletableFuture::new;
+		private String address = "localhost";
+
+		public Builder setQueryMetricsSupplier(Supplier<CompletableFuture<MetricDumpSerialization.MetricSerializationResult>> queryMetricsSupplier) {
+			this.queryMetricsSupplier = queryMetricsSupplier;
+			return this;
+		}
+
+		public Builder setAddress(String address) {
+			this.address = address;
+			return this;
+		}
+
+		public TestingMetricQueryServiceGateway build() {
+			return new TestingMetricQueryServiceGateway(queryMetricsSupplier, address);
+		}
+	}
+}