You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shashank Timmarajus <sh...@gmail.com> on 2017/10/25 19:34:52 UTC

Heap memory not released after aggregation operator

Hello all,

I am running a flink streaming job which consumes messages from Kafka and
writes to S3 after performing the aggregation on source records. Something
like below:




​
My TargetRecord is an ArrayList of byte arrays and they accumulate on the
memory over time(after 4 days of continuous run). The below is the heap
analysis taken on a machine which is taken before crash and 6GB/8GB is
occupied by the byte array ArrayList.
​
Is there anything wrong I am doing here like passing my aggregate result to
an async function and map function and then to sink.

Thanks for your time, much appreciated.


-- 

*Best RegardsShashank*
ᐧ

Re: Heap memory not released after aggregation operator

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

in a WindowedStream it is the responsibility of the Trigger to purge the
window state at some point in time. Otherwise, the window operator
accumulates data.
In your code snippet, you define a custom trigger but the call to use it is
commented out.

The built-in trigger of a TimeWIndow should correctly clean the state of a
window when the watermark passes the window end boundary.
If your program fails with the custom trigger, I would have a closer look
at its implementation.

Best, Fabian

2017-10-25 21:34 GMT+02:00 Shashank Timmarajus <sh...@gmail.com>:

> Hello all,
>
> I am running a flink streaming job which consumes messages from Kafka and
> writes to S3 after performing the aggregation on source records. Something
> like below:
>
>
>
>
> ​
> My TargetRecord is an ArrayList of byte arrays and they accumulate on the
> memory over time(after 4 days of continuous run). The below is the heap
> analysis taken on a machine which is taken before crash and 6GB/8GB is
> occupied by the byte array ArrayList.
> ​
> Is there anything wrong I am doing here like passing my aggregate result
> to an async function and map function and then to sink.
>
> Thanks for your time, much appreciated.
>
>
> --
>
> *Best RegardsShashank*
> ᐧ
>