You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Elias Levy <fe...@gmail.com> on 2017/05/04 18:25:36 UTC

CEP memory requirements

I am observing odd memory behavior with the CEP library and I am wondering
if it is expected.

If I write a simple local streaming Flink job that reads from a 65MB
compressed file of JSON objects, one per line, parses the JSON, performs a
filter operation, and then a keyBy, heap usage is stable, staying below
250MB throughout per VisualVM.

But if I create a CEP pattern that matches nothing
(Pattern.begin[T]("foo").where( _ => false )) and match it against the
stream produced by the last keyBy (CEP.pattern(stream, pattern).select),
then memory balloons until the program terminates, steadily growing until
3GB.

The VisualVM memory profiler appears unable to account for that used heap
space.  If I add the Live Bytes column I'd get only between 200-100 MB.

Any idea what is going on?

Flink 1.2.  Java 8.

Re: CEP memory requirements

Posted by Dawid Wysakowicz <wy...@gmail.com>.
Yes you are right, prior to 1.3.0 the state per key was never cleared.
Right now due to FLINK-5174
<https://issues.apache.org/jira/browse/FLINK-5174>, in master branch, it is
stored only if necessary.

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-05-04 22:12 GMT+02:00 Elias Levy <fe...@gmail.com>:

> Looking at the code I gather that 1.2 does not clear the per key NFA state
> even if there is no state to keep, whereas this appears fixed in the master
> branch. Yes?
>
> On Thu, May 4, 2017 at 11:25 AM, Elias Levy <fe...@gmail.com>
> wrote:
>
>> I am observing odd memory behavior with the CEP library and I am
>> wondering if it is expected.
>>
>> If I write a simple local streaming Flink job that reads from a 65MB
>> compressed file of JSON objects, one per line, parses the JSON, performs a
>> filter operation, and then a keyBy, heap usage is stable, staying below
>> 250MB throughout per VisualVM.
>>
>> But if I create a CEP pattern that matches nothing
>> (Pattern.begin[T]("foo").where( _ => false )) and match it against the
>> stream produced by the last keyBy (CEP.pattern(stream, pattern).select),
>> then memory balloons until the program terminates, steadily growing until
>> 3GB.
>>
>> The VisualVM memory profiler appears unable to account for that used heap
>> space.  If I add the Live Bytes column I'd get only between 200-100 MB.
>>
>> Any idea what is going on?
>>
>> Flink 1.2.  Java 8.
>>
>>
>

Re: CEP memory requirements

Posted by Elias Levy <fe...@gmail.com>.
Looking at the code I gather that 1.2 does not clear the per key NFA state
even if there is no state to keep, whereas this appears fixed in the master
branch. Yes?

On Thu, May 4, 2017 at 11:25 AM, Elias Levy <fe...@gmail.com>
wrote:

> I am observing odd memory behavior with the CEP library and I am wondering
> if it is expected.
>
> If I write a simple local streaming Flink job that reads from a 65MB
> compressed file of JSON objects, one per line, parses the JSON, performs a
> filter operation, and then a keyBy, heap usage is stable, staying below
> 250MB throughout per VisualVM.
>
> But if I create a CEP pattern that matches nothing
> (Pattern.begin[T]("foo").where( _ => false )) and match it against the
> stream produced by the last keyBy (CEP.pattern(stream, pattern).select),
> then memory balloons until the program terminates, steadily growing until
> 3GB.
>
> The VisualVM memory profiler appears unable to account for that used heap
> space.  If I add the Live Bytes column I'd get only between 200-100 MB.
>
> Any idea what is going on?
>
> Flink 1.2.  Java 8.
>
>