You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/31 20:26:40 UTC
[flink] branch master updated: [FLINK-11382][metrics] Disable
MetricFetcher if interval is configured to 0
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 11b9a2b [FLINK-11382][metrics] Disable MetricFetcher if interval is configured to 0
11b9a2b is described below
commit 11b9a2bba425667c35325373f489378bcbf93741
Author: leesf <49...@qq.com>
AuthorDate: Fri Feb 1 04:26:27 2019 +0800
[FLINK-11382][metrics] Disable MetricFetcher if interval is configured to 0
---
.../flink/runtime/webmonitor/WebRuntimeMonitor.java | 18 ++++++++++++------
...ctDispatcherResourceManagerComponentFactory.java | 18 +++++++++++++-----
.../flink/runtime/minicluster/MiniCluster.java | 21 ++++++++++++++-------
3 files changed, 39 insertions(+), 18 deletions(-)
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 10db5a7..9207f05 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureSta
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -194,12 +195,17 @@ public class WebRuntimeMonitor implements WebMonitor {
} else {
sslFactory = null;
}
- metricFetcher = new MetricFetcherImpl<>(
- retriever,
- queryServiceRetriever,
- scheduledExecutor,
- timeout,
- MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
+
+ final long updateInterval =
+ config.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
+ metricFetcher = updateInterval == 0
+ ? VoidMetricFetcher.INSTANCE
+ : new MetricFetcherImpl<>(
+ retriever,
+ queryServiceRetriever,
+ scheduledExecutor,
+ timeout,
+ updateInterval);
Router router = new Router();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 9ab348d..fb514a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.entrypoint.component;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -43,7 +44,9 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rest.RestEndpointFactory;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -135,17 +138,22 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
+ final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
+ final MetricFetcher metricFetcher = updateInterval == 0
+ ? VoidMetricFetcher.INSTANCE
+ : MetricFetcherImpl.fromConfiguration(
+ configuration,
+ metricQueryServiceRetriever,
+ dispatcherGatewayRetriever,
+ executor);
+
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
- MetricFetcherImpl.fromConfiguration(
- configuration,
- metricQueryServiceRetriever,
- dispatcherGatewayRetriever,
- executor),
+ metricFetcher,
highAvailabilityServices.getWebMonitorLeaderElectionService(),
fatalErrorHandler);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 72182ad..7488852 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -66,7 +67,9 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
@@ -360,6 +363,16 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
+ final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
+ final MetricFetcher metricFetcher = updateInterval == 0
+ ? VoidMetricFetcher.INSTANCE
+ : MetricFetcherImpl.fromConfiguration(configuration,
+ new AkkaQueryServiceRetriever(
+ metricQueryServiceActorSystem,
+ Time.milliseconds(
+ configuration.getLong(WebOptions.TIMEOUT))),
+ dispatcherGatewayRetriever, executor);
+
this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever,
@@ -368,13 +381,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
resourceManagerGatewayRetriever,
blobServer.getTransientBlobService(),
executor,
- MetricFetcherImpl.fromConfiguration(
- configuration,
- new AkkaQueryServiceRetriever(
- metricQueryServiceActorSystem,
- Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
- dispatcherGatewayRetriever,
- executor),
+ metricFetcher,
haServices.getWebMonitorLeaderElectionService(),
new ShutDownFatalErrorHandler());