You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Xintong Song (Jira)" <ji...@apache.org> on 2022/10/26 02:50:00 UTC
[jira] [Closed] (FLINK-29134) fetch metrics may cause oom(ThreadPool task pile up)
[ https://issues.apache.org/jira/browse/FLINK-29134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xintong Song closed FLINK-29134.
--------------------------------
Fix Version/s: 1.17.0
1.15.3
1.16.1
Resolution: Fixed
- master (1.17): 93af1e45a1e868ad3752923453562861d831104f
- release-1.16: c52e1519b509b6a918df037fe3665872ed8146fb
- release-1.15: eeda260d71ec11476edf532c5ff655828b0c4c7f
> fetch metrics may cause oom(ThreadPool task pile up)
> ----------------------------------------------------
>
> Key: FLINK-29134
> URL: https://issues.apache.org/jira/browse/FLINK-29134
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Metrics
> Affects Versions: 1.11.0, 1.15.2
> Reporter: Sitan Pang
> Assignee: Yuxin Tan
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0, 1.15.3, 1.16.1
>
> Attachments: dump-queueTask.png, dump-threadPool.png
>
>
> When we queryMetrics we use thread pool to process the data which are returned by TMs.
> {code:java}
> private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
> LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
> queryServiceGateway
> .queryMetrics(timeout)
> .whenCompleteAsync(
> (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
> if (t != null) {
> LOG.debug("Fetching metrics failed.", t);
> } else {
> metrics.addAll(deserializer.deserialize(result));
> }
> },
> executor);
> } {code}
> The only condition we will fetch metrics is update time is larger than updateInterval
> {code:java}
> public void update() {
> synchronized (this) {
> long currentTime = System.currentTimeMillis();
> if (currentTime - lastUpdateTime > updateInterval) {
> lastUpdateTime = currentTime;
> fetchMetrics();
> }
> }
> } {code}
> Therefore, if we could not process the data in update-interval-time, metrics data will accumulate.
> Besides, webMonitorEndpoint, restHandlers and metrics share the thread pool.
> When we open ui, it maybe even worse.
> {code:java}
> final ScheduledExecutorService executor =
> WebMonitorEndpoint.createExecutorService(
> configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
> configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
> "DispatcherRestEndpoint");
> final long updateInterval =
> configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
> final MetricFetcher metricFetcher =
> updateInterval == 0
> ? VoidMetricFetcher.INSTANCE
> : MetricFetcherImpl.fromConfiguration(
> configuration,
> metricQueryServiceRetriever,
> dispatcherGatewayRetriever,
> executor);
> webMonitorEndpoint =
> restEndpointFactory.createRestEndpoint(
> configuration,
> dispatcherGatewayRetriever,
> resourceManagerGatewayRetriever,
> blobServer,
> executor,
> metricFetcher,
> highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
> fatalErrorHandler); {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)