You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "pc wang (Jira)" <ji...@apache.org> on 2022/01/20 12:27:00 UTC

[jira] [Commented] (FLINK-25728) StreamMultipleInputProcessor holds mass CompletableFuture instances under certain scenario

    [ https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479328#comment-17479328 ] 

pc wang commented on FLINK-25728:
---------------------------------

The issue has the same symptoms as FLINK-24300, beside the momery issue, the mass *CompletableFuture$UniRun* will also cause CPU busy after long idle.

> StreamMultipleInputProcessor holds mass CompletableFuture instances under certain scenario
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25728
>                 URL: https://issues.apache.org/jira/browse/FLINK-25728
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.12.5, 1.13.5, 1.14.2
>            Reporter: pc wang
>            Priority: Major
>         Attachments: image-2022-01-20-18-43-32-816.png
>
>
> We have an application that contains a broadcast process stage. The none-broadcast input has roughly 10 million messages per second, and the broadcast side is some kind of control stream, rarely has message follow through. 
> After several hours of running, the TaskManager will run out of heap memory and restart. We reviewed the application code without finding any relevant issues.
> We found that the running to crash time was roughly the same. Then we make a heap dump before the crash and found mass `CompletableFuture$UniRun` instances. 
> These `CompletableFuture$UniRun` instances consume several GibaByte memory.
>  
> The following pic is from the heap dump we get from a mock testing stream with the same scenario.
> !image-2022-01-20-18-43-32-816.png|width=1161,height=471!
>  
> After some source code research. We found that it might be caused by the *StreamMultipleInputProcessor.getAvailableFuture()*.
> *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's *availableFuture* got completed when any of it's input's *availableFuture* is complete. 
> The current implementation create a new *CompletableFuture* and a new *CompletableFuture$UniRun* append to delegate inputProcessor's *avaiableFuture*.
> The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow inputProcessor's *avaiableFuture*. 
> See the source code below.
> [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65]
> Because the *UniRun* holds the reference of outside *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass *CompletableFuture* instance which can not be recycled.
> We made some modifications to the *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that the issue is gone on our modified version. 
> We are willing to make a PR if the community agrees.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)