You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (Jira)" <ji...@apache.org> on 2022/01/25 07:59:00 UTC

[jira] [Commented] (FLINK-25728) Potential memory leaks in StreamMultipleInputProcessor

    [ https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17481626#comment-17481626 ] 

Till Rohrmann commented on FLINK-25728:
---------------------------------------

cc [~pnowojski]

> Potential memory leaks in StreamMultipleInputProcessor
> ------------------------------------------------------
>
>                 Key: FLINK-25728
>                 URL: https://issues.apache.org/jira/browse/FLINK-25728
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.12.5, 1.15.0, 1.13.5, 1.14.2
>            Reporter: pc wang
>            Priority: Blocker
>              Labels: pull-request-available
>         Attachments: flink-completablefuture-issue.tar.xz, image-2022-01-20-18-43-32-816.png
>
>
> We have an application that contains a broadcast process stage. The none-broadcast input has roughly 10 million messages per second, and the broadcast side is some kind of control stream, rarely has message follow through. 
> After several hours of running, the TaskManager will run out of heap memory and restart. We reviewed the application code without finding any relevant issues.
> We found that the running to crash time was roughly the same. Then we make a heap dump before the crash and found mass `CompletableFuture$UniRun` instances. 
> These `CompletableFuture$UniRun` instances consume several gigabytes memories.
>  
> The following pic is from the heap dump we get from a mock testing stream with the same scenario.
> !image-2022-01-20-18-43-32-816.png|width=1161,height=471!
>  
> After some source code research. We found that it might be caused by the *StreamMultipleInputProcessor.getAvailableFuture()*.
> *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's *availableFuture* got completed when any of it's input's *availableFuture* is complete. 
> The current implementation create a new *CompletableFuture* and a new *CompletableFuture$UniRun* append to delegate inputProcessor's *avaiableFuture*.
> The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow inputProcessor's *avaiableFuture*. 
> See the source code below.
> [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65]
> Because the *UniRun* holds the reference of outside *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass *CompletableFuture* instance which can not be recycled.
> We made some modifications to the *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that the issue is gone on our modified version. 
> We are willing to make a PR for this fix.
>  Heap Dump File [^flink-completablefuture-issue.tar.xz] 
> PS: This is a YourKit heap dump. may be not compatible HPROF files.
> [Sample Code to reproduce the issue|https://github.com/wpc009/flink/blob/FLINK-25728/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/MultipleInputStreamMemoryIssueTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)