You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Haibo Sun (JIRA)" <ji...@apache.org> on 2019/05/17 10:23:00 UTC

[jira] [Comment Edited] (FLINK-12529) Release record-deserializer buffers timely to improve the efficiency of heap usage on taskmanager

    [ https://issues.apache.org/jira/browse/FLINK-12529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842078#comment-16842078 ] 

Haibo Sun edited comment on FLINK-12529 at 5/17/19 10:22 AM:
-------------------------------------------------------------

> But as far as I understand you would only need to modify the {{Stream(Two)InputProcessor#processBufferOrEvent}} code (assuming that this change would base on my PR?).
  
 If this change based on your [PR#8467|[https://github.com/apache/flink/pull/8467]], your are right. This change is simple, so I will put it after your PR.


was (Author: sunhaibotb):
>But as far as I understand you would only need to modify the {{Stream(Two)InputProcessor#processBufferOrEvent}} code (assuming that this change would base on my PR?).
 
If this change based on your [PR#8467|[https://github.com/apache/flink/pull/8467]], your are right. This change is simple, so I will put it after your PR.

> Release record-deserializer buffers timely to improve the efficiency of heap usage on taskmanager
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-12529
>                 URL: https://issues.apache.org/jira/browse/FLINK-12529
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Operators
>    Affects Versions: 1.8.0
>            Reporter: Haibo Sun
>            Assignee: Haibo Sun
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> In input processors (`StreamInputProcessor` and `StreamTwoInputProcessor`), each input channel has a corresponding record deserializer. Currently, these record deserializers are cleaned up at the end of the task (look at `StreamInputProcessor#cleanup()` and `StreamTwoInputProcessor#cleanup()`). This is not a problem for unbounded streams, but it may reduce the efficiency of heap memory usage on taskmanger when input is bounded stream.
> For example, in case that all inputs are bounded streams, some of them end very early because of the small amount of data, and the other end very late because of the large amount of data, then the buffers of the record deserializers corresponding to the input channels finished early is idle for a long time and no longer used.
> In another case, when both unbounded and bounded streams exist in the inputs, the buffers of the record deserializers corresponding to the bounded stream are idle for ever (no longer used) after the bounded streams are finished. Especially when the record and the parallelism of upstream are large, the total size of `SpanningWrapper#buffer` are very large. The size of `SpanningWrapper#buffer` is allowed to reach up to 5 MB, and if the parallelism of upstream is 100, the maximum total size will reach 500 MB (in our production, there are jobs with the record size up to hundreds of KB and the parallelism of upstream up to 1000).
> Overall, after receiving `EndOfPartitionEvent` from the input channel, the corresponding record deserializer should be cleared immediately to improve the efficiency of heap memory usage on taskmanager.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)