You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/10/23 09:28:00 UTC

[jira] [Updated] (FLINK-29134) fetch metrics may cause oom(ThreadPool task pile up)

     [ https://issues.apache.org/jira/browse/FLINK-29134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ASF GitHub Bot updated FLINK-29134:
-----------------------------------
    Labels: pull-request-available  (was: )

> fetch metrics may cause oom(ThreadPool task pile up)
> ----------------------------------------------------
>
>                 Key: FLINK-29134
>                 URL: https://issues.apache.org/jira/browse/FLINK-29134
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Metrics
>    Affects Versions: 1.11.0, 1.15.2
>            Reporter: Sitan Pang
>            Assignee: Yuxin Tan
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: dump-queueTask.png, dump-threadPool.png
>
>
> When we queryMetrics we use thread pool to process the data which are returned by TMs. 
> {code:java}
> private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
>     LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
>     queryServiceGateway
>             .queryMetrics(timeout)
>             .whenCompleteAsync(
>                     (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
>                         if (t != null) {
>                             LOG.debug("Fetching metrics failed.", t);
>                         } else {
>                             metrics.addAll(deserializer.deserialize(result));
>                         }
>                     },
>                     executor);
> } {code}
> The only condition we will fetch metrics is update time is larger than updateInterval
> {code:java}
> public void update() {
>     synchronized (this) {
>         long currentTime = System.currentTimeMillis();
>         if (currentTime - lastUpdateTime > updateInterval) {
>             lastUpdateTime = currentTime;
>             fetchMetrics();
>         }
>     }
> } {code}
> Therefore, if we could not process the data in update-interval-time, metrics data will accumulate.
> Besides, webMonitorEndpoint, restHandlers and metrics share the thread pool. 
> When we open ui, it maybe even worse.
> {code:java}
> final ScheduledExecutorService executor =
>         WebMonitorEndpoint.createExecutorService(
>                 configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
>                 configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
>                 "DispatcherRestEndpoint");
> final long updateInterval =
>         configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
> final MetricFetcher metricFetcher =
>         updateInterval == 0
>                 ? VoidMetricFetcher.INSTANCE
>                 : MetricFetcherImpl.fromConfiguration(
>                         configuration,
>                         metricQueryServiceRetriever,
>                         dispatcherGatewayRetriever,
>                         executor);
> webMonitorEndpoint =
>         restEndpointFactory.createRestEndpoint(
>                 configuration,
>                 dispatcherGatewayRetriever,
>                 resourceManagerGatewayRetriever,
>                 blobServer,
>                 executor,
>                 metricFetcher,
>                 highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
>                 fatalErrorHandler); {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)