You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/31 20:26:40 UTC

[flink] branch master updated: [FLINK-11382][metrics] Disable MetricFetcher if interval is configured to 0

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 11b9a2b  [FLINK-11382][metrics] Disable MetricFetcher if interval is configured to 0
11b9a2b is described below

commit 11b9a2bba425667c35325373f489378bcbf93741
Author: leesf <49...@qq.com>
AuthorDate: Fri Feb 1 04:26:27 2019 +0800

    [FLINK-11382][metrics] Disable MetricFetcher if interval is configured to 0
---
 .../flink/runtime/webmonitor/WebRuntimeMonitor.java | 18 ++++++++++++------
 ...ctDispatcherResourceManagerComponentFactory.java | 18 +++++++++++++-----
 .../flink/runtime/minicluster/MiniCluster.java      | 21 ++++++++++++++-------
 3 files changed, 39 insertions(+), 18 deletions(-)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 10db5a7..9207f05 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureSta
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
 import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -194,12 +195,17 @@ public class WebRuntimeMonitor implements WebMonitor {
 		} else {
 			sslFactory = null;
 		}
-		metricFetcher = new MetricFetcherImpl<>(
-			retriever,
-			queryServiceRetriever,
-			scheduledExecutor,
-			timeout,
-			MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
+
+		final long updateInterval =
+			config.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
+		metricFetcher = updateInterval == 0
+			? VoidMetricFetcher.INSTANCE
+			: new MetricFetcherImpl<>(
+				retriever,
+				queryServiceRetriever,
+				scheduledExecutor,
+				timeout,
+				updateInterval);
 
 		Router router = new Router();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 9ab348d..fb514a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.entrypoint.component;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -43,7 +44,9 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rest.RestEndpointFactory;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -135,17 +138,22 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
 				configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
 				"DispatcherRestEndpoint");
 
+			final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
+			final MetricFetcher metricFetcher = updateInterval == 0
+				? VoidMetricFetcher.INSTANCE
+				: MetricFetcherImpl.fromConfiguration(
+					configuration,
+					metricQueryServiceRetriever,
+					dispatcherGatewayRetriever,
+					executor);
+
 			webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
 				configuration,
 				dispatcherGatewayRetriever,
 				resourceManagerGatewayRetriever,
 				blobServer,
 				executor,
-				MetricFetcherImpl.fromConfiguration(
-					configuration,
-					metricQueryServiceRetriever,
-					dispatcherGatewayRetriever,
-					executor),
+				metricFetcher,
 				highAvailabilityServices.getWebMonitorLeaderElectionService(),
 				fatalErrorHandler);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 72182ad..7488852 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -66,7 +67,9 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
@@ -360,6 +363,16 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
 					"DispatcherRestEndpoint");
 
+				final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
+				final MetricFetcher metricFetcher = updateInterval == 0
+					? VoidMetricFetcher.INSTANCE
+					: MetricFetcherImpl.fromConfiguration(configuration,
+						new AkkaQueryServiceRetriever(
+							metricQueryServiceActorSystem,
+							Time.milliseconds(
+							configuration.getLong(WebOptions.TIMEOUT))),
+						dispatcherGatewayRetriever, executor);
+
 				this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
 					RestServerEndpointConfiguration.fromConfiguration(configuration),
 					dispatcherGatewayRetriever,
@@ -368,13 +381,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					resourceManagerGatewayRetriever,
 					blobServer.getTransientBlobService(),
 					executor,
-					MetricFetcherImpl.fromConfiguration(
-						configuration,
-						new AkkaQueryServiceRetriever(
-							metricQueryServiceActorSystem,
-							Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
-						dispatcherGatewayRetriever,
-						executor),
+					metricFetcher,
 					haServices.getWebMonitorLeaderElectionService(),
 					new ShutDownFatalErrorHandler());