You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Govindarajan Srinivasaraghavan <go...@gmail.com> on 2017/08/21 03:29:44 UTC

Memory Issue

Hi,

I have a pipeline running on flink which ingests around 6k messages per
second. Each message is around 1kb and it passes through various stages
like filter, 5 sec tumbling window per key etc.. and finally flatmap to
computation before sending it to kafka sink. The data is first ingested as
protocol buffers and then in subsequent operators they are converted into
POJO's.

There are lots objects created inside the user functions and some of them
are cached as well. I have been running this pipeline on 48 task slots
across 3 task manages with each one allocated with 22GB memory.

The issue I'm having is within a period of 10 hours, almost 19k young
generation GC have been run which is roughly every 2 seconds and GC time
taken value is more than 2 million. I have also enabled object reuse. Any
suggestions on how this issue could be resolved? Thanks.

Regards,
Govind

Re: Memory Issue

Posted by Govindarajan Srinivasaraghavan <go...@gmail.com>.
Thanks Stephan, any pointers on how managed memory is used in streaming application will really help.

Regards,
Govind

> On Aug 24, 2017, at 1:53 AM, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi!
> 
> RocksDB will be used when it is selected as the state backend, independent of the checkpointing configuration.
> 
> Using RocksDB as the state backend, Flink will have some objects on the heap, like timers (we will move them to RocksDB as well in the near future) but the majority will be off heap.
> 
> Stephan
> 
> 
>> On Thu, Aug 24, 2017 at 5:28 AM, Govindarajan Srinivasaraghavan <go...@gmail.com> wrote:
>> I have couple more questions regarding flink's jvm memory.
>> 
>> In a streaming application what is managed memory used for? I read from a blog that all objects created inside the user function will go into unmanaged memory. Where does the managed key/ operator state state reside?
>> 
>> Also when does the state gets persisted into rocksdb, is it only when checkpointing is enabled? If the state backend is rocksdb but the checkpointing is not enabled what will happen?
>> 
>> Thanks.
>> 
>>> On Sun, Aug 20, 2017 at 11:14 PM, Jörn Franke <jo...@gmail.com> wrote:
>>> One would need to look at your code and possible on some heap statistics. Maybe something wrong happens when you cache them (do you use a 3rd party library or your own implementation?). Do you use a stable version of your protobuf library (not necessarily the most recent). You also may want to look at buffers to avoid creating objects (bytebuffer, stringbuffer etc).
>>> 
>>> Probably you are creating a lot of objects due to conversion into PoJo. You could increase the heap for the Java objects of the young generation.
>>> You can also switch to the G1-Garbage collector (if Jdk 8) or at least the parallel one.
>>> Generally you should avoid creating PoJo/objects as much as possible in a long running Streaming job.
>>> 
>>> 
>>> 
>>> > On 21. Aug 2017, at 05:29, Govindarajan Srinivasaraghavan <go...@gmail.com> wrote:
>>> >
>>> > Hi,
>>> >
>>> > I have a pipeline running on flink which ingests around 6k messages per second. Each message is around 1kb and it passes through various stages like filter, 5 sec tumbling window per key etc.. and finally flatmap to computation before sending it to kafka sink. The data is first ingested as protocol buffers and then in subsequent operators they are converted into POJO's.
>>> >
>>> > There are lots objects created inside the user functions and some of them are cached as well. I have been running this pipeline on 48 task slots across 3 task manages with each one allocated with 22GB memory.
>>> >
>>> > The issue I'm having is within a period of 10 hours, almost 19k young generation GC have been run which is roughly every 2 seconds and GC time taken value is more than 2 million. I have also enabled object reuse. Any suggestions on how this issue could be resolved? Thanks.
>>> >
>>> > Regards,
>>> > Govind
>>> >
>> 
> 

Re: Memory Issue

Posted by Stephan Ewen <se...@apache.org>.
Hi!

RocksDB will be used when it is selected as the state backend, independent
of the checkpointing configuration.

Using RocksDB as the state backend, Flink will have some objects on the
heap, like timers (we will move them to RocksDB as well in the near future)
but the majority will be off heap.

Stephan


On Thu, Aug 24, 2017 at 5:28 AM, Govindarajan Srinivasaraghavan <
govindraghvan@gmail.com> wrote:

> I have couple more questions regarding flink's jvm memory.
>
> In a streaming application what is managed memory used for? I read from a
> blog that all objects created inside the user function will go into
> unmanaged memory. Where does the managed key/ operator state state reside?
>
> Also when does the state gets persisted into rocksdb, is it only when
> checkpointing is enabled? If the state backend is rocksdb but the
> checkpointing is not enabled what will happen?
>
> Thanks.
>
> On Sun, Aug 20, 2017 at 11:14 PM, Jörn Franke <jo...@gmail.com>
> wrote:
>
>> One would need to look at your code and possible on some heap statistics.
>> Maybe something wrong happens when you cache them (do you use a 3rd party
>> library or your own implementation?). Do you use a stable version of your
>> protobuf library (not necessarily the most recent). You also may want to
>> look at buffers to avoid creating objects (bytebuffer, stringbuffer etc).
>>
>> Probably you are creating a lot of objects due to conversion into PoJo.
>> You could increase the heap for the Java objects of the young generation.
>> You can also switch to the G1-Garbage collector (if Jdk 8) or at least
>> the parallel one.
>> Generally you should avoid creating PoJo/objects as much as possible in a
>> long running Streaming job.
>>
>>
>>
>> > On 21. Aug 2017, at 05:29, Govindarajan Srinivasaraghavan <
>> govindraghvan@gmail.com> wrote:
>> >
>> > Hi,
>> >
>> > I have a pipeline running on flink which ingests around 6k messages per
>> second. Each message is around 1kb and it passes through various stages
>> like filter, 5 sec tumbling window per key etc.. and finally flatmap to
>> computation before sending it to kafka sink. The data is first ingested as
>> protocol buffers and then in subsequent operators they are converted into
>> POJO's.
>> >
>> > There are lots objects created inside the user functions and some of
>> them are cached as well. I have been running this pipeline on 48 task slots
>> across 3 task manages with each one allocated with 22GB memory.
>> >
>> > The issue I'm having is within a period of 10 hours, almost 19k young
>> generation GC have been run which is roughly every 2 seconds and GC time
>> taken value is more than 2 million. I have also enabled object reuse. Any
>> suggestions on how this issue could be resolved? Thanks.
>> >
>> > Regards,
>> > Govind
>> >
>>
>
>

Re: Memory Issue

Posted by Govindarajan Srinivasaraghavan <go...@gmail.com>.
I have couple more questions regarding flink's jvm memory.

In a streaming application what is managed memory used for? I read from a
blog that all objects created inside the user function will go into
unmanaged memory. Where does the managed key/ operator state state reside?

Also when does the state gets persisted into rocksdb, is it only when
checkpointing is enabled? If the state backend is rocksdb but the
checkpointing is not enabled what will happen?

Thanks.

On Sun, Aug 20, 2017 at 11:14 PM, Jörn Franke <jo...@gmail.com> wrote:

> One would need to look at your code and possible on some heap statistics.
> Maybe something wrong happens when you cache them (do you use a 3rd party
> library or your own implementation?). Do you use a stable version of your
> protobuf library (not necessarily the most recent). You also may want to
> look at buffers to avoid creating objects (bytebuffer, stringbuffer etc).
>
> Probably you are creating a lot of objects due to conversion into PoJo.
> You could increase the heap for the Java objects of the young generation.
> You can also switch to the G1-Garbage collector (if Jdk 8) or at least the
> parallel one.
> Generally you should avoid creating PoJo/objects as much as possible in a
> long running Streaming job.
>
>
>
> > On 21. Aug 2017, at 05:29, Govindarajan Srinivasaraghavan <
> govindraghvan@gmail.com> wrote:
> >
> > Hi,
> >
> > I have a pipeline running on flink which ingests around 6k messages per
> second. Each message is around 1kb and it passes through various stages
> like filter, 5 sec tumbling window per key etc.. and finally flatmap to
> computation before sending it to kafka sink. The data is first ingested as
> protocol buffers and then in subsequent operators they are converted into
> POJO's.
> >
> > There are lots objects created inside the user functions and some of
> them are cached as well. I have been running this pipeline on 48 task slots
> across 3 task manages with each one allocated with 22GB memory.
> >
> > The issue I'm having is within a period of 10 hours, almost 19k young
> generation GC have been run which is roughly every 2 seconds and GC time
> taken value is more than 2 million. I have also enabled object reuse. Any
> suggestions on how this issue could be resolved? Thanks.
> >
> > Regards,
> > Govind
> >
>

Re: Memory Issue

Posted by Jörn Franke <jo...@gmail.com>.
One would need to look at your code and possible on some heap statistics. Maybe something wrong happens when you cache them (do you use a 3rd party library or your own implementation?). Do you use a stable version of your protobuf library (not necessarily the most recent). You also may want to look at buffers to avoid creating objects (bytebuffer, stringbuffer etc).

Probably you are creating a lot of objects due to conversion into PoJo. You could increase the heap for the Java objects of the young generation.
You can also switch to the G1-Garbage collector (if Jdk 8) or at least the parallel one.
Generally you should avoid creating PoJo/objects as much as possible in a long running Streaming job.



> On 21. Aug 2017, at 05:29, Govindarajan Srinivasaraghavan <go...@gmail.com> wrote:
> 
> Hi,
> 
> I have a pipeline running on flink which ingests around 6k messages per second. Each message is around 1kb and it passes through various stages like filter, 5 sec tumbling window per key etc.. and finally flatmap to computation before sending it to kafka sink. The data is first ingested as protocol buffers and then in subsequent operators they are converted into POJO's.
> 
> There are lots objects created inside the user functions and some of them are cached as well. I have been running this pipeline on 48 task slots across 3 task manages with each one allocated with 22GB memory.
> 
> The issue I'm having is within a period of 10 hours, almost 19k young generation GC have been run which is roughly every 2 seconds and GC time taken value is more than 2 million. I have also enabled object reuse. Any suggestions on how this issue could be resolved? Thanks.
> 
> Regards,
> Govind
>