You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Xinyu Liu (JIRA)" <ji...@apache.org> on 2019/01/14 22:35:00 UTC

[jira] [Commented] (SAMZA-2044) EOSMessage causes out of memory Exceptions related to WindowOperatorImpl

    [ https://issues.apache.org/jira/browse/SAMZA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16742547#comment-16742547 ] 

Xinyu Liu commented on SAMZA-2044:
----------------------------------

Hi, [~pgeorgantas]: first, thanks a lot for looking into this problem!

I did a quick look into your rb to understand the problem you are solving here. Seems the original problem is due to the amount of data emitted from window when we reach end-of-stream. The approach in the RB is to consume the data on the spot instead of returning the whole collection. To me, seems there is a step already loads the data into memory before getting into this code. The loading happens in WindowOperatorImpl, line 258. This line will read from RocksDb iterator and load all the data in this window pane into memory, which might already cause the OOM.

In my opinion, Samza's current window api has two flavors: with a aggregation function (i.e. the foldleft fn) or without the aggregation function. To reduce the memory footprint, using window with aggregation fn seems to be the way to go. Applying aggregation on the spot instead of buffering all the data then apply some aggregation, which can cause memory problem. Could you please see whether your use case can use this flavor of the window? 

The same problem might not be present in other functions, e.g. timer, process since Samza doesn't accumulate the state by itself. You can perform some clean up and emit the results by implementing the ClosableFunction. If that's not enough, please let us know your use case and we can enrich the api together.

> EOSMessage causes out of memory Exceptions related to WindowOperatorImpl
> ------------------------------------------------------------------------
>
>                 Key: SAMZA-2044
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2044
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.14.0, 1.0
>            Reporter: Peter Georgantas
>            Priority: Major
>
> The contract of the handleEndOfStream method dictates that a collection of results be returned. In the case of WindowOperatorImpl which has a backing RocksDB store, this effectively causes the entirety of the store to be pulled into memory. In many cases, this will cause out of memory exceptions (otherwise why not keep the store in memory in the first place).
> Since this is a protected api, I have a relatively simple change to propose which could allow data to be consumed downstream as it is brought out of the store:
> {{protected void handleEndOfStream(Consumer<WindowPane<K, Object>> consumer, MessageCollector collector, TaskCoordinator coordinator)}}
> [Pull Request |https://github.com/apache/samza/pull/862]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)