You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Marcelo Romaniuc <mr...@yahoo.com.INVALID> on 2015/12/30 20:19:34 UTC

Custom System Consumer filling up memory


Hi,

   I've created a custom System Consumer extending BlockingEnvelopeMap.
   All looks good until I reach about 10m messages processed by the StreamTask. At that point I see a lot of GC going on and the heap dump shows memory is mostly used by  "IncomingMessageEnvelope" and a ConcurrentHashMap (probably from BlockingEnvelopMap).
   It seems the messages are hanging around, even after "processed" by the StreamTask.
   Do I need to do something to dispose such messages ?

Thanks,
Marcelo



  

Re: Custom System Consumer filling up memory

Posted by Yi Pan <ni...@gmail.com>.
Hi, Marcelo,

The behavior you described sounds a bit weird to me. The SystemConsumer
should not be polled if there is no SystemStreamPartition in the set from
emptySystemStreamPartitionsBySystem(systemName). This SSP set per system is
removed from the poll when there are unprocessed messages pending. Hence,
if the MessageChooser does not remove all unprocessed messages for a
specific SSP, there will be no more polling via the SystemConsumer to get
more messages from the network. Hence, if the process is slow, there would
be more unprocessed messages in the SystemConsumers, which would mean that
those SystemStreamPartition objects should not be polled via the
corresponding SystemConsumer. This logic is built in SystemConsumers code
and has been functioning correctly. I would suspect that it might be
related to the batch request size you have in your SystemConsumer, which
may be too big for the configured container memory size?

-Yi

On Sat, Jan 23, 2016 at 6:56 AM, Marcelo Romaniuc <
mromaniu@yahoo.com.invalid> wrote:

> Hi,
>
>    Meantime I've figured out the issue...   The messages in
> BlockingEvelopeMap.queue dont get polled as fast as they are queued. The
> consequence is the queue object (in BlockingEnvelopeMap) grows until all
> memory is filled up.   A workaround I implemented on onEvent(...) method of
> my consumer is to check the size of the queue and sleep(1) in case it grows
> too much.   Perhaps this could be added as a check inside the method
> BlockingEnvelopeMap.put(...) - it is better to delay a bit than halt/crash
> the whole consumer due to memory limitation.
>
>
> Rgds,
> Marcelo
>
>
>  From: Yi Pan <ni...@gmail.com>
>  To: dev@samza.apache.org; Marcelo Romaniuc <mr...@yahoo.com>
>  Sent: Tuesday, January 19, 2016 5:51 AM
>  Subject: Re: Custom System Consumer filling up memory
>
> Hi, Marcelo,
>
> Sorry to get back to you late. I remember that Jagadish has some
> conversation w/ you on the implementation earlier. Did that include some
> hints to solve this problem as well? Generally, customized system consumers
> would need to be responsible for the memory usage in the customized code.
> We would need much more detailed info to see whether the memory leakage is
> in your customized SystemConsumer code or is in the base class provided
> (e.g. BlockingEnvelopeQueue). If you still need help, please provide code
> and steps to re-produce the issue, also the heap dump file.
>
> Thanks a lot!
>
> -Yi
>
> On Wed, Dec 30, 2015 at 11:19 AM, Marcelo Romaniuc <
> mromaniu@yahoo.com.invalid> wrote:
>
> >
> >
> > Hi,
> >
> >    I've created a custom System Consumer extending BlockingEnvelopeMap.
> >    All looks good until I reach about 10m messages processed by the
> > StreamTask. At that point I see a lot of GC going on and the heap dump
> > shows memory is mostly used by  "IncomingMessageEnvelope" and a
> > ConcurrentHashMap (probably from BlockingEnvelopMap).
> >    It seems the messages are hanging around, even after "processed" by
> the
> > StreamTask.
> >    Do I need to do something to dispose such messages ?
> >
> > Thanks,
> > Marcelo
> >
> >
> >
> >
>
>
>
>

Re: Custom System Consumer filling up memory

Posted by Marcelo Romaniuc <mr...@yahoo.com.INVALID>.
Hi,

   Meantime I've figured out the issue...   The messages in BlockingEvelopeMap.queue dont get polled as fast as they are queued. The consequence is the queue object (in BlockingEnvelopeMap) grows until all memory is filled up.   A workaround I implemented on onEvent(...) method of my consumer is to check the size of the queue and sleep(1) in case it grows too much.   Perhaps this could be added as a check inside the method BlockingEnvelopeMap.put(...) - it is better to delay a bit than halt/crash the whole consumer due to memory limitation.


Rgds,
Marcelo


 From: Yi Pan <ni...@gmail.com>
 To: dev@samza.apache.org; Marcelo Romaniuc <mr...@yahoo.com> 
 Sent: Tuesday, January 19, 2016 5:51 AM
 Subject: Re: Custom System Consumer filling up memory
   
Hi, Marcelo,

Sorry to get back to you late. I remember that Jagadish has some
conversation w/ you on the implementation earlier. Did that include some
hints to solve this problem as well? Generally, customized system consumers
would need to be responsible for the memory usage in the customized code.
We would need much more detailed info to see whether the memory leakage is
in your customized SystemConsumer code or is in the base class provided
(e.g. BlockingEnvelopeQueue). If you still need help, please provide code
and steps to re-produce the issue, also the heap dump file.

Thanks a lot!

-Yi

On Wed, Dec 30, 2015 at 11:19 AM, Marcelo Romaniuc <
mromaniu@yahoo.com.invalid> wrote:

>
>
> Hi,
>
>    I've created a custom System Consumer extending BlockingEnvelopeMap.
>    All looks good until I reach about 10m messages processed by the
> StreamTask. At that point I see a lot of GC going on and the heap dump
> shows memory is mostly used by  "IncomingMessageEnvelope" and a
> ConcurrentHashMap (probably from BlockingEnvelopMap).
>    It seems the messages are hanging around, even after "processed" by the
> StreamTask.
>    Do I need to do something to dispose such messages ?
>
> Thanks,
> Marcelo
>
>
>
>


  

Re: Custom System Consumer filling up memory

Posted by Yi Pan <ni...@gmail.com>.
Hi, Marcelo,

Sorry to get back to you late. I remember that Jagadish has some
conversation w/ you on the implementation earlier. Did that include some
hints to solve this problem as well? Generally, customized system consumers
would need to be responsible for the memory usage in the customized code.
We would need much more detailed info to see whether the memory leakage is
in your customized SystemConsumer code or is in the base class provided
(e.g. BlockingEnvelopeQueue). If you still need help, please provide code
and steps to re-produce the issue, also the heap dump file.

Thanks a lot!

-Yi

On Wed, Dec 30, 2015 at 11:19 AM, Marcelo Romaniuc <
mromaniu@yahoo.com.invalid> wrote:

>
>
> Hi,
>
>    I've created a custom System Consumer extending BlockingEnvelopeMap.
>    All looks good until I reach about 10m messages processed by the
> StreamTask. At that point I see a lot of GC going on and the heap dump
> shows memory is mostly used by  "IncomingMessageEnvelope" and a
> ConcurrentHashMap (probably from BlockingEnvelopMap).
>    It seems the messages are hanging around, even after "processed" by the
> StreamTask.
>    Do I need to do something to dispose such messages ?
>
> Thanks,
> Marcelo
>
>
>
>