You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/16 21:50:00 UTC

[flink] 02/03: [FLINK-10822] Make MetricFetcher update interval configurable

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 474fe8e8ae063e250f7f8f4eddcd799b15f8b69b
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 10 11:17:54 2019 +0100

    [FLINK-10822] Make MetricFetcher update interval configurable
    
    Introduce WebOptions.METRIC_FETCHER_UPDATE_INTERVAL configuration option to configure
    the update interval of the MetricFetcher.
    
    This closes #7459.
---
 docs/_includes/generated/metric_configuration.html |  5 ++
 .../apache/flink/configuration/MetricOptions.java  |  8 +++
 .../flink/docs/rest/RestAPIDocGenerator.java       |  5 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java      |  4 +-
 .../runtime/dispatcher/DispatcherRestEndpoint.java |  5 +-
 ...tDispatcherResourceManagerComponentFactory.java |  7 ++-
 .../jobmaster/MiniDispatcherRestEndpoint.java      |  6 +-
 .../flink/runtime/minicluster/MiniCluster.java     | 11 +++-
 .../flink/runtime/rest/JobRestEndpointFactory.java |  5 +-
 .../flink/runtime/rest/RestEndpointFactory.java    |  3 +-
 .../runtime/rest/SessionRestEndpointFactory.java   |  5 +-
 .../handler/legacy/metrics/MetricFetcherImpl.java  | 14 ++++-
 .../rest/handler/util/MutableIOMetrics.java        |  1 -
 .../runtime/webmonitor/WebMonitorEndpoint.java     | 10 +---
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |  4 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |  4 +-
 .../metrics/AggregatingMetricsHandlerTestBase.java |  4 +-
 .../handler/legacy/metrics/MetricFetcherTest.java  | 67 +++++++++++++++++++++-
 18 files changed, 129 insertions(+), 39 deletions(-)

diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html
index 39a76ce..bb4b2a8 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -8,6 +8,11 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>metrics.fetcher.update-interval</h5></td>
+            <td style="word-wrap: break-word;">10000</td>
+            <td>Update interval for the metric fetcher used by the web UI in milliseconds. Decrease this value for faster updating metrics. Increase this value if the metric fetcher causes too much load.</td>
+        </tr>
+        <tr>
             <td><h5>metrics.internal.query-service.port</h5></td>
             <td style="word-wrap: break-word;">"0"</td>
             <td>The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 2c22d92..3deb766 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -175,6 +175,14 @@ public class MetricOptions {
 			" by Akka's thread pool executor. " +
 			"The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). " +
 			"Warning, increasing this value may bring the main Flink components down.");
+	/**
+	 * The config parameter defining the update interval for the metric fetcher used by the web UI in milliseconds.
+	 */
+	public static final ConfigOption<Long> METRIC_FETCHER_UPDATE_INTERVAL =
+		key("metrics.fetcher.update-interval")
+			.defaultValue(10000L)
+			.withDescription("Update interval for the metric fetcher used by the web UI in milliseconds. Decrease this value for " +
+				"faster updating metrics. Increase this value if the metric fetcher causes too much load.");
 
 	private MetricOptions() {
 	}
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 891ae08..a0ec804 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.ConfigurationException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -325,7 +324,6 @@ public class RestAPIDocGenerator {
 		private static final RestHandlerConfiguration handlerConfig;
 		private static final GatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever;
 		private static final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
-		private static final MetricQueryServiceRetriever metricQueryServiceRetriever;
 
 		static {
 			config = new Configuration();
@@ -341,7 +339,6 @@ public class RestAPIDocGenerator {
 
 			dispatcherGatewayRetriever = () -> null;
 			resourceManagerGatewayRetriever = () -> null;
-			metricQueryServiceRetriever = path -> null;
 		}
 
 		private DocumentingDispatcherRestEndpoint() throws IOException {
@@ -353,7 +350,7 @@ public class RestAPIDocGenerator {
 				resourceManagerGatewayRetriever,
 				NoOpTransientBlobService.INSTANCE,
 				Executors.newFixedThreadPool(1),
-				metricQueryServiceRetriever,
+				VoidMetricFetcher.INSTANCE,
 				NoOpElectionService.INSTANCE,
 				NoOpFatalErrorHandler.INSTANCE);
 		}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 826b93a..10db5a7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
@@ -197,7 +198,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 			retriever,
 			queryServiceRetriever,
 			scheduledExecutor,
-			timeout);
+			timeout,
+			MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
 
 		Router router = new Router();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index b36fd6f..3d52331 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.WebMonitorExtension;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -61,7 +60,7 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 			GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
 			TransientBlobService transientBlobService,
 			ExecutorService executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			MetricFetcher metricFetcher,
 			LeaderElectionService leaderElectionService,
 			FatalErrorHandler fatalErrorHandler) throws IOException {
 
@@ -73,7 +72,7 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 			resourceManagerRetriever,
 			transientBlobService,
 			executor,
-			metricQueryServiceRetriever,
+			metricFetcher,
 			leaderElectionService,
 			fatalErrorHandler);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 8b593fc..9ab348d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rest.RestEndpointFactory;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -140,7 +141,11 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
 				resourceManagerGatewayRetriever,
 				blobServer,
 				executor,
-				metricQueryServiceRetriever,
+				MetricFetcherImpl.fromConfiguration(
+					configuration,
+					metricQueryServiceRetriever,
+					dispatcherGatewayRetriever,
+					executor),
 				highAvailabilityServices.getWebMonitorLeaderElectionService(),
 				fatalErrorHandler);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
index dae9d8e..f11b669 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
@@ -25,11 +25,11 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
@@ -47,7 +47,7 @@ public class MiniDispatcherRestEndpoint extends WebMonitorEndpoint<RestfulGatewa
 			GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
 			TransientBlobService transientBlobService,
 			ExecutorService executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			MetricFetcher metricFetcher,
 			LeaderElectionService leaderElectionService,
 			FatalErrorHandler fatalErrorHandler) throws IOException {
 		super(
@@ -58,7 +58,7 @@ public class MiniDispatcherRestEndpoint extends WebMonitorEndpoint<RestfulGatewa
 			resourceManagerRetriever,
 			transientBlobService,
 			executor,
-			metricQueryServiceRetriever,
+			metricFetcher,
 			leaderElectionService,
 			fatalErrorHandler);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 92738ca..80f8c1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -65,6 +65,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
@@ -361,9 +362,13 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					resourceManagerGatewayRetriever,
 					blobServer.getTransientBlobService(),
 					executor,
-					new AkkaQueryServiceRetriever(
-						metricQueryServiceActorSystem,
-						Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
+					MetricFetcherImpl.fromConfiguration(
+						configuration,
+						new AkkaQueryServiceRetriever(
+							metricQueryServiceActorSystem,
+							Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
+						dispatcherGatewayRetriever,
+						executor),
 					haServices.getWebMonitorLeaderElectionService(),
 					new ShutDownFatalErrorHandler());
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
index dec7768..21df821 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import java.util.concurrent.ExecutorService;
 
@@ -47,7 +46,7 @@ public enum JobRestEndpointFactory implements RestEndpointFactory<RestfulGateway
 			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
 			TransientBlobService transientBlobService,
 			ExecutorService executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			MetricFetcher metricFetcher,
 			LeaderElectionService leaderElectionService,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
 		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
@@ -60,7 +59,7 @@ public enum JobRestEndpointFactory implements RestEndpointFactory<RestfulGateway
 			resourceManagerGatewayRetriever,
 			transientBlobService,
 			executor,
-			metricQueryServiceRetriever,
+			metricFetcher,
 			leaderElectionService,
 			fatalErrorHandler);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
index ff10be6..840a4be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import java.util.concurrent.ExecutorService;
 
@@ -45,7 +44,7 @@ public interface RestEndpointFactory<T extends RestfulGateway> {
 		LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
 		TransientBlobService transientBlobService,
 		ExecutorService executor,
-		MetricQueryServiceRetriever metricQueryServiceRetriever,
+		MetricFetcher metricFetcher,
 		LeaderElectionService leaderElectionService,
 		FatalErrorHandler fatalErrorHandler) throws Exception;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
index 2dcf6de..15d5fe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import java.util.concurrent.ExecutorService;
 
@@ -46,7 +45,7 @@ public enum SessionRestEndpointFactory implements RestEndpointFactory<Dispatcher
 			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
 			TransientBlobService transientBlobService,
 			ExecutorService executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			MetricFetcher metricFetcher,
 			LeaderElectionService leaderElectionService,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
 		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
@@ -59,7 +58,7 @@ public enum SessionRestEndpointFactory implements RestEndpointFactory<Dispatcher
 			resourceManagerGatewayRetriever,
 			transientBlobService,
 			executor,
-			metricQueryServiceRetriever,
+			metricFetcher,
 			leaderElectionService,
 			fatalErrorHandler);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
index 861f512..7e904e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -63,6 +64,7 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche
 
 	private final MetricStore metrics = new MetricStore();
 	private final MetricDumpDeserializer deserializer = new MetricDumpDeserializer();
+	private final long updateInterval;
 
 	private long lastUpdateTime;
 
@@ -70,11 +72,15 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche
 			GatewayRetriever<T> retriever,
 			MetricQueryServiceRetriever queryServiceRetriever,
 			Executor executor,
-			Time timeout) {
+			Time timeout,
+			long updateInterval) {
 		this.retriever = Preconditions.checkNotNull(retriever);
 		this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever);
 		this.executor = Preconditions.checkNotNull(executor);
 		this.timeout = Preconditions.checkNotNull(timeout);
+
+		Preconditions.checkArgument(updateInterval > 0, "The update interval must be larger than 0.");
+		this.updateInterval = updateInterval;
 	}
 
 	/**
@@ -94,7 +100,7 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche
 	public void update() {
 		synchronized (this) {
 			long currentTime = System.currentTimeMillis();
-			if (currentTime - lastUpdateTime > 10000L) {
+			if (currentTime - lastUpdateTime > updateInterval) {
 				lastUpdateTime = currentTime;
 				fetchMetrics();
 			}
@@ -221,11 +227,13 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche
 			final GatewayRetriever<T> dispatcherGatewayRetriever,
 			final ExecutorService executor) {
 		final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+		final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
 
 		return new MetricFetcherImpl<>(
 			dispatcherGatewayRetriever,
 			metricQueryServiceRetriever,
 			executor,
-			timeout);
+			timeout,
+			updateInterval);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index 948ec93..6822417 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.rest.handler.job.JobVertexDetailsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
-import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index f215ab8..8efbe7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -77,7 +77,6 @@ import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandle
 import org.apache.flink.runtime.rest.handler.legacy.files.StdoutFileHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
-import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler;
@@ -119,7 +118,6 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FileUtils;
@@ -176,7 +174,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
 			TransientBlobService transientBlobService,
 			ExecutorService executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			MetricFetcher metricFetcher,
 			LeaderElectionService leaderElectionService,
 			FatalErrorHandler fatalErrorHandler) throws IOException {
 		super(endpointConfiguration);
@@ -194,11 +192,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		this.checkpointStatsCache = new CheckpointStatsCache(
 			restConfiguration.getMaxCheckpointStatisticCacheEntries());
 
-		this.metricFetcher = MetricFetcherImpl.fromConfiguration(
-			clusterConfiguration,
-			metricQueryServiceRetriever,
-			leaderRetriever,
-			executor);
+		this.metricFetcher = metricFetcher;
 
 		this.leaderElectionService = Preconditions.checkNotNull(leaderElectionService);
 		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 10d670e..5c688fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -123,7 +124,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
 			() -> null,
 			path -> null,
 			TestingUtils.defaultExecutor(),
-			Time.milliseconds(1000L));
+			Time.milliseconds(1000L),
+			MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
 
 		final SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(
 			() -> null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 6160d4a..d0a78d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecution;
@@ -121,7 +122,8 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
 			() -> null,
 			path -> null,
 			TestingUtils.defaultExecutor(),
-			Time.milliseconds(1000L));
+			Time.milliseconds(1000L),
+			MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
 
 		// Instance the handler.
 		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
index 41e5a06..a982ac1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.job.metrics;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
@@ -93,7 +94,8 @@ public abstract class AggregatingMetricsHandlerTestBase<
 			mock(GatewayRetriever.class),
 			mock(MetricQueryServiceRetriever.class),
 			Executors.directExecutor(),
-			TestingUtils.TIMEOUT());
+			TestingUtils.TIMEOUT(),
+			MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
 		store = fetcher.getMetricStore();
 
 		Collection<MetricDump> metricDumps = getMetricDumps();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index e1ec719..b478893 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
@@ -35,6 +36,8 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -46,13 +49,18 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -112,7 +120,8 @@ public class MetricFetcherTest extends TestLogger {
 			retriever,
 			queryServiceRetriever,
 			Executors.directExecutor(),
-			timeout);
+			timeout,
+			MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
 
 		// verify that update fetches metrics and updates the store
 		fetcher.update();
@@ -184,4 +193,60 @@ public class MetricFetcherTest extends TestLogger {
 
 		return dump;
 	}
+
+	@Test
+	public void testLongUpdateInterval() {
+		final long updateInterval = 1000L;
+		final AtomicInteger requestMetricQueryServicePathsCounter = new AtomicInteger(0);
+		final RestfulGateway restfulGateway = createRestfulGateway(requestMetricQueryServicePathsCounter);
+
+		final MetricFetcher fetcher = createMetricFetcher(updateInterval, restfulGateway);
+
+		fetcher.update();
+		fetcher.update();
+
+		assertThat(requestMetricQueryServicePathsCounter.get(), is(1));
+	}
+
+	@Test
+	public void testShortUpdateInterval() throws InterruptedException {
+		final long updateInterval = 1L;
+		final AtomicInteger requestMetricQueryServicePathsCounter = new AtomicInteger(0);
+		final RestfulGateway restfulGateway = createRestfulGateway(requestMetricQueryServicePathsCounter);
+
+		final MetricFetcher fetcher = createMetricFetcher(updateInterval, restfulGateway);
+
+		fetcher.update();
+
+		final long start = System.currentTimeMillis();
+		long difference = 0L;
+
+		while (difference <= updateInterval) {
+			Thread.sleep(2L * updateInterval);
+			difference = System.currentTimeMillis() - start;
+		}
+
+		fetcher.update();
+
+		assertThat(requestMetricQueryServicePathsCounter.get(), is(2));
+	}
+
+	@Nonnull
+	private MetricFetcher createMetricFetcher(long updateInterval, RestfulGateway restfulGateway) {
+		return new MetricFetcherImpl<>(
+			() -> CompletableFuture.completedFuture(restfulGateway),
+			path -> new CompletableFuture<>(),
+			Executors.directExecutor(),
+			Time.seconds(10L),
+			updateInterval);
+	}
+
+	private RestfulGateway createRestfulGateway(AtomicInteger requestMetricQueryServicePathsCounter) {
+		return new TestingRestfulGateway.Builder()
+			.setRequestMetricQueryServicePathsSupplier(() -> {
+				requestMetricQueryServicePathsCounter.incrementAndGet();
+				return new CompletableFuture<>();
+			})
+			.build();
+	}
 }