You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by AshokKumar J <as...@gmail.com> on 2018/08/15 03:17:44 UTC

Kafka streams - runs out of memory

Hi,

we have a stream application that uses the low level API.  We persist the
data into the key value state store.  For each record that we retrieve from
the topic we perform a lookup against the store to see if it exists, if it
does then we update the existing, else we simply add the new record.  With
this we are running into significant memory issue, basically whatever the
memory we allocate they all get fully utilized (all the objects goes into
the older generations).  The caching has been enabled and we specified 40%
of the total memory to the caching.  Let's say we have total application
memory as 24GB and we specify the caching size as 12GB, ideally we expect
12GB to reside in older generation and rest should be younger, but for some
reason everything is going into older generation and eventually we are
running out of memory within a day.  Please see below objects dominator
tree. Kindly suggest

https://files.slack.com/files-pri/T47H7EWH0-FC8EZ9L66/image.png

Re: Kafka streams - runs out of memory

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Ashok,

Definitely, please feel free to edit the FAQ page:
https://cwiki.apache.org/confluence/display/KAFKA/FAQ

It's a wiki, so anyone can contribute to it :)


Guozhang


On Sat, Aug 25, 2018 at 7:30 PM, AshokKumar J <as...@gmail.com>
wrote:

> Hi Guozhang,
>
> Thanks for the input.  Yes, confirmed that enabling and overriding the
> Rocks DB config setter class (with default parameters) in parallel to Kafka
> streams cache goes to indefinite memory usage.  After removing the
> override, the application memory usage is consistent within 24GB.  Can this
> be added to the Kafka Streams FAQ?
>
> Ashok
>
> On Wed, Aug 22, 2018 at 6:03 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Ashok,
> >
> > Your implementation looks okay to me: I did not know how "handleTasks" is
> > implemented, just that if you are iterating over the store, you'd need to
> > close the iterator after used it.
> >
> > One thing I suspect is that your memory usage combing the streams cache
> > plus rocksDB's own buffering may be simply running beyond 24GB. You can
> > take a look at this JIRA comment and see if it is similar to your
> scenario
> > (note your application is only using key-value stores, so should not have
> > the segmentation amplification factor):
> >
> >
> > https://issues.apache.org/jira/browse/KAFKA-5122?
> focusedCommentId=15984467&page=com.atlassian.jira.
> plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15984467
> >
> >
> >
> > Guozhang
> >
> >
> > On Sun, Aug 19, 2018 at 7:18 PM, AshokKumar J <as...@gmail.com>
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > Please find below.  I have tried with the latest 2.0.0 libraries and no
> > > improvement observed.
> > >
> > >
> > > Kafka version - 1.0.1
> > > Total Memory allocated - 24 GB
> > > Max Stream Cache - 8GB
> > >
> > > ---------------------------------------
> > > Processor class code:
> > >
> > > private KeyValueStore<String, HourlyUsage> hourlyStore = null; // Local
> > > store
> > > private KeyValueStore<String, Integer> hourlyProcessedStore = null; //
> > > Local store
> > >
> > > @Override
> > > public void init(ProcessorContext context) {
> > >     this.context = context;
> > >     this.hourlyStore = (KeyValueStore<String, HourlyUsage>)
> > > context.getStateStore("kvshourly"); // Stores the hourly JSON payload
> > >     this.hourlyProcessedStore = (KeyValueStore<String, Integer>)
> > > context.getStateStore("kvshourlyprocessed"); // Stores just the key
> sent
> > > to
> > > downstream
> > >
> > >     this.context.schedule(punctuateMs, PunctuationType.WALL_CLOCK_
> TIME,
> > > new
> > > Punctuator() {
> > >         public void punctuate(long timestamp) {
> > >             handleTasks();
> > >         }
> > >     });
> > > }
> > >
> > > @Override
> > > public void process(String key, HourlyUsage newVal) {
> > >     if (hourlyProcessedStore.get(key) == null) {
> > >         currentVal = hourlyStore.get(key);
> > >
> > >         if (currentVal != null) {
> > >             currentVal.flattenRecord(newVal);
> > >             hourlyStore.put(key, currentVal);
> > >
> > >             if (currentVal.hourlyCompleted()) {
> > >                 context.forward(key, currentVal, "materializehourly");
> > >                 hourlyProcessedStore.put(key, 0);
> > >             }
> > >             currentVal = null;
> > >         }
> > >         else {
> > >             hourlyStore.put(key, newVal);
> > >         }
> > >     }
> > > }
> > > ---------------------------------------
> > >
> > > Thanks,
> > > Ashok
> > >
> > > On Fri, Aug 17, 2018 at 3:11 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hello AshokKumar,
> > > >
> > > > Which version of Kafka are you using? And could you share your code
> > > snippet
> > > > for us to help investigate the issue (you can omit any concrete logic
> > > that
> > > > involves your business logic, just the sketch of the code is fine).
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Fri, Aug 17, 2018 at 8:52 AM, AshokKumar J <
> ashokkumar.js@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > Any thoughts on the below issue?  I think the behavior should be
> > > > > reproducible if we perform both the put, get from the store (cache
> > > > > enabled), when processing each record from the topic, with
> processing
> > > > > volume of 2-3 million records each 15 mins, each JSON on an average
> > > > having
> > > > > 400 to 500 KB approx.  Overtime the app runs out of the total
> memory
> > > > within
> > > > > 24 hours.
> > > > >
> > > > > Thanks,
> > > > > Ashok
> > > > >
> > > > > On Wed, Aug 15, 2018 at 5:15 AM, AshokKumar J <
> > ashokkumar.js@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Disabling the stream cache prevents the unbounded memory usage,
> > > however
> > > > > > the throughput is low (with ROCKSDB cache enabled).  Can you
> please
> > > > > advise
> > > > > > why the cache objects reference doesn't get released in time (for
> > GC
> > > > > > cleanup) and grows continuously?
> > > > > >
> > > > > > On Tue, Aug 14, 2018 at 11:17 PM, AshokKumar J <
> > > > ashokkumar.js@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi,
> > > > > >>
> > > > > >> we have a stream application that uses the low level API.  We
> > > persist
> > > > > the
> > > > > >> data into the key value state store.  For each record that we
> > > retrieve
> > > > > from
> > > > > >> the topic we perform a lookup against the store to see if it
> > exists,
> > > > if
> > > > > it
> > > > > >> does then we update the existing, else we simply add the new
> > record.
> > > > > With
> > > > > >> this we are running into significant memory issue, basically
> > > whatever
> > > > > the
> > > > > >> memory we allocate they all get fully utilized (all the objects
> > goes
> > > > > into
> > > > > >> the older generations).  The caching has been enabled and we
> > > specified
> > > > > 40%
> > > > > >> of the total memory to the caching.  Let's say we have total
> > > > application
> > > > > >> memory as 24GB and we specify the caching size as 12GB, ideally
> we
> > > > > expect
> > > > > >> 12GB to reside in older generation and rest should be younger,
> but
> > > for
> > > > > some
> > > > > >> reason everything is going into older generation and eventually
> we
> > > are
> > > > > >> running out of memory within a day.  Please see below objects
> > > > dominator
> > > > > >> tree. Kindly suggest
> > > > > >>
> > > > > >> https://files.slack.com/files-pri/T47H7EWH0-FC8EZ9L66/image.png
> > > > > >>
> > > > > >>
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Kafka streams - runs out of memory

Posted by AshokKumar J <as...@gmail.com>.
Hi Guozhang,

Thanks for the input.  Yes, confirmed that enabling and overriding the
Rocks DB config setter class (with default parameters) in parallel to Kafka
streams cache goes to indefinite memory usage.  After removing the
override, the application memory usage is consistent within 24GB.  Can this
be added to the Kafka Streams FAQ?

Ashok

On Wed, Aug 22, 2018 at 6:03 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Ashok,
>
> Your implementation looks okay to me: I did not know how "handleTasks" is
> implemented, just that if you are iterating over the store, you'd need to
> close the iterator after used it.
>
> One thing I suspect is that your memory usage combing the streams cache
> plus rocksDB's own buffering may be simply running beyond 24GB. You can
> take a look at this JIRA comment and see if it is similar to your scenario
> (note your application is only using key-value stores, so should not have
> the segmentation amplification factor):
>
>
> https://issues.apache.org/jira/browse/KAFKA-5122?focusedCommentId=15984467&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15984467
>
>
>
> Guozhang
>
>
> On Sun, Aug 19, 2018 at 7:18 PM, AshokKumar J <as...@gmail.com>
> wrote:
>
> > Hi Guozhang,
> >
> > Please find below.  I have tried with the latest 2.0.0 libraries and no
> > improvement observed.
> >
> >
> > Kafka version - 1.0.1
> > Total Memory allocated - 24 GB
> > Max Stream Cache - 8GB
> >
> > ---------------------------------------
> > Processor class code:
> >
> > private KeyValueStore<String, HourlyUsage> hourlyStore = null; // Local
> > store
> > private KeyValueStore<String, Integer> hourlyProcessedStore = null; //
> > Local store
> >
> > @Override
> > public void init(ProcessorContext context) {
> >     this.context = context;
> >     this.hourlyStore = (KeyValueStore<String, HourlyUsage>)
> > context.getStateStore("kvshourly"); // Stores the hourly JSON payload
> >     this.hourlyProcessedStore = (KeyValueStore<String, Integer>)
> > context.getStateStore("kvshourlyprocessed"); // Stores just the key sent
> > to
> > downstream
> >
> >     this.context.schedule(punctuateMs, PunctuationType.WALL_CLOCK_TIME,
> > new
> > Punctuator() {
> >         public void punctuate(long timestamp) {
> >             handleTasks();
> >         }
> >     });
> > }
> >
> > @Override
> > public void process(String key, HourlyUsage newVal) {
> >     if (hourlyProcessedStore.get(key) == null) {
> >         currentVal = hourlyStore.get(key);
> >
> >         if (currentVal != null) {
> >             currentVal.flattenRecord(newVal);
> >             hourlyStore.put(key, currentVal);
> >
> >             if (currentVal.hourlyCompleted()) {
> >                 context.forward(key, currentVal, "materializehourly");
> >                 hourlyProcessedStore.put(key, 0);
> >             }
> >             currentVal = null;
> >         }
> >         else {
> >             hourlyStore.put(key, newVal);
> >         }
> >     }
> > }
> > ---------------------------------------
> >
> > Thanks,
> > Ashok
> >
> > On Fri, Aug 17, 2018 at 3:11 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello AshokKumar,
> > >
> > > Which version of Kafka are you using? And could you share your code
> > snippet
> > > for us to help investigate the issue (you can omit any concrete logic
> > that
> > > involves your business logic, just the sketch of the code is fine).
> > >
> > >
> > > Guozhang
> > >
> > > On Fri, Aug 17, 2018 at 8:52 AM, AshokKumar J <ashokkumar.js@gmail.com
> >
> > > wrote:
> > >
> > > > Hi,
> > > > Any thoughts on the below issue?  I think the behavior should be
> > > > reproducible if we perform both the put, get from the store (cache
> > > > enabled), when processing each record from the topic, with processing
> > > > volume of 2-3 million records each 15 mins, each JSON on an average
> > > having
> > > > 400 to 500 KB approx.  Overtime the app runs out of the total memory
> > > within
> > > > 24 hours.
> > > >
> > > > Thanks,
> > > > Ashok
> > > >
> > > > On Wed, Aug 15, 2018 at 5:15 AM, AshokKumar J <
> ashokkumar.js@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Disabling the stream cache prevents the unbounded memory usage,
> > however
> > > > > the throughput is low (with ROCKSDB cache enabled).  Can you please
> > > > advise
> > > > > why the cache objects reference doesn't get released in time (for
> GC
> > > > > cleanup) and grows continuously?
> > > > >
> > > > > On Tue, Aug 14, 2018 at 11:17 PM, AshokKumar J <
> > > ashokkumar.js@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hi,
> > > > >>
> > > > >> we have a stream application that uses the low level API.  We
> > persist
> > > > the
> > > > >> data into the key value state store.  For each record that we
> > retrieve
> > > > from
> > > > >> the topic we perform a lookup against the store to see if it
> exists,
> > > if
> > > > it
> > > > >> does then we update the existing, else we simply add the new
> record.
> > > > With
> > > > >> this we are running into significant memory issue, basically
> > whatever
> > > > the
> > > > >> memory we allocate they all get fully utilized (all the objects
> goes
> > > > into
> > > > >> the older generations).  The caching has been enabled and we
> > specified
> > > > 40%
> > > > >> of the total memory to the caching.  Let's say we have total
> > > application
> > > > >> memory as 24GB and we specify the caching size as 12GB, ideally we
> > > > expect
> > > > >> 12GB to reside in older generation and rest should be younger, but
> > for
> > > > some
> > > > >> reason everything is going into older generation and eventually we
> > are
> > > > >> running out of memory within a day.  Please see below objects
> > > dominator
> > > > >> tree. Kindly suggest
> > > > >>
> > > > >> https://files.slack.com/files-pri/T47H7EWH0-FC8EZ9L66/image.png
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka streams - runs out of memory

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Ashok,

Your implementation looks okay to me: I did not know how "handleTasks" is
implemented, just that if you are iterating over the store, you'd need to
close the iterator after used it.

One thing I suspect is that your memory usage combing the streams cache
plus rocksDB's own buffering may be simply running beyond 24GB. You can
take a look at this JIRA comment and see if it is similar to your scenario
(note your application is only using key-value stores, so should not have
the segmentation amplification factor):

https://issues.apache.org/jira/browse/KAFKA-5122?focusedCommentId=15984467&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15984467



Guozhang


On Sun, Aug 19, 2018 at 7:18 PM, AshokKumar J <as...@gmail.com>
wrote:

> Hi Guozhang,
>
> Please find below.  I have tried with the latest 2.0.0 libraries and no
> improvement observed.
>
>
> Kafka version - 1.0.1
> Total Memory allocated - 24 GB
> Max Stream Cache - 8GB
>
> ---------------------------------------
> Processor class code:
>
> private KeyValueStore<String, HourlyUsage> hourlyStore = null; // Local
> store
> private KeyValueStore<String, Integer> hourlyProcessedStore = null; //
> Local store
>
> @Override
> public void init(ProcessorContext context) {
>     this.context = context;
>     this.hourlyStore = (KeyValueStore<String, HourlyUsage>)
> context.getStateStore("kvshourly"); // Stores the hourly JSON payload
>     this.hourlyProcessedStore = (KeyValueStore<String, Integer>)
> context.getStateStore("kvshourlyprocessed"); // Stores just the key sent
> to
> downstream
>
>     this.context.schedule(punctuateMs, PunctuationType.WALL_CLOCK_TIME,
> new
> Punctuator() {
>         public void punctuate(long timestamp) {
>             handleTasks();
>         }
>     });
> }
>
> @Override
> public void process(String key, HourlyUsage newVal) {
>     if (hourlyProcessedStore.get(key) == null) {
>         currentVal = hourlyStore.get(key);
>
>         if (currentVal != null) {
>             currentVal.flattenRecord(newVal);
>             hourlyStore.put(key, currentVal);
>
>             if (currentVal.hourlyCompleted()) {
>                 context.forward(key, currentVal, "materializehourly");
>                 hourlyProcessedStore.put(key, 0);
>             }
>             currentVal = null;
>         }
>         else {
>             hourlyStore.put(key, newVal);
>         }
>     }
> }
> ---------------------------------------
>
> Thanks,
> Ashok
>
> On Fri, Aug 17, 2018 at 3:11 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello AshokKumar,
> >
> > Which version of Kafka are you using? And could you share your code
> snippet
> > for us to help investigate the issue (you can omit any concrete logic
> that
> > involves your business logic, just the sketch of the code is fine).
> >
> >
> > Guozhang
> >
> > On Fri, Aug 17, 2018 at 8:52 AM, AshokKumar J <as...@gmail.com>
> > wrote:
> >
> > > Hi,
> > > Any thoughts on the below issue?  I think the behavior should be
> > > reproducible if we perform both the put, get from the store (cache
> > > enabled), when processing each record from the topic, with processing
> > > volume of 2-3 million records each 15 mins, each JSON on an average
> > having
> > > 400 to 500 KB approx.  Overtime the app runs out of the total memory
> > within
> > > 24 hours.
> > >
> > > Thanks,
> > > Ashok
> > >
> > > On Wed, Aug 15, 2018 at 5:15 AM, AshokKumar J <ashokkumar.js@gmail.com
> >
> > > wrote:
> > >
> > > > Disabling the stream cache prevents the unbounded memory usage,
> however
> > > > the throughput is low (with ROCKSDB cache enabled).  Can you please
> > > advise
> > > > why the cache objects reference doesn't get released in time (for GC
> > > > cleanup) and grows continuously?
> > > >
> > > > On Tue, Aug 14, 2018 at 11:17 PM, AshokKumar J <
> > ashokkumar.js@gmail.com>
> > > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> we have a stream application that uses the low level API.  We
> persist
> > > the
> > > >> data into the key value state store.  For each record that we
> retrieve
> > > from
> > > >> the topic we perform a lookup against the store to see if it exists,
> > if
> > > it
> > > >> does then we update the existing, else we simply add the new record.
> > > With
> > > >> this we are running into significant memory issue, basically
> whatever
> > > the
> > > >> memory we allocate they all get fully utilized (all the objects goes
> > > into
> > > >> the older generations).  The caching has been enabled and we
> specified
> > > 40%
> > > >> of the total memory to the caching.  Let's say we have total
> > application
> > > >> memory as 24GB and we specify the caching size as 12GB, ideally we
> > > expect
> > > >> 12GB to reside in older generation and rest should be younger, but
> for
> > > some
> > > >> reason everything is going into older generation and eventually we
> are
> > > >> running out of memory within a day.  Please see below objects
> > dominator
> > > >> tree. Kindly suggest
> > > >>
> > > >> https://files.slack.com/files-pri/T47H7EWH0-FC8EZ9L66/image.png
> > > >>
> > > >>
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Kafka streams - runs out of memory

Posted by AshokKumar J <as...@gmail.com>.
Hi Guozhang,

Please find below.  I have tried with the latest 2.0.0 libraries and no
improvement observed.


Kafka version - 1.0.1
Total Memory allocated - 24 GB
Max Stream Cache - 8GB

---------------------------------------
Processor class code:

private KeyValueStore<String, HourlyUsage> hourlyStore = null; // Local
store
private KeyValueStore<String, Integer> hourlyProcessedStore = null; //
Local store

@Override
public void init(ProcessorContext context) {
    this.context = context;
    this.hourlyStore = (KeyValueStore<String, HourlyUsage>)
context.getStateStore("kvshourly"); // Stores the hourly JSON payload
    this.hourlyProcessedStore = (KeyValueStore<String, Integer>)
context.getStateStore("kvshourlyprocessed"); // Stores just the key sent to
downstream

    this.context.schedule(punctuateMs, PunctuationType.WALL_CLOCK_TIME, new
Punctuator() {
        public void punctuate(long timestamp) {
            handleTasks();
        }
    });
}

@Override
public void process(String key, HourlyUsage newVal) {
    if (hourlyProcessedStore.get(key) == null) {
        currentVal = hourlyStore.get(key);

        if (currentVal != null) {
            currentVal.flattenRecord(newVal);
            hourlyStore.put(key, currentVal);

            if (currentVal.hourlyCompleted()) {
                context.forward(key, currentVal, "materializehourly");
                hourlyProcessedStore.put(key, 0);
            }
            currentVal = null;
        }
        else {
            hourlyStore.put(key, newVal);
        }
    }
}
---------------------------------------

Thanks,
Ashok

On Fri, Aug 17, 2018 at 3:11 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello AshokKumar,
>
> Which version of Kafka are you using? And could you share your code snippet
> for us to help investigate the issue (you can omit any concrete logic that
> involves your business logic, just the sketch of the code is fine).
>
>
> Guozhang
>
> On Fri, Aug 17, 2018 at 8:52 AM, AshokKumar J <as...@gmail.com>
> wrote:
>
> > Hi,
> > Any thoughts on the below issue?  I think the behavior should be
> > reproducible if we perform both the put, get from the store (cache
> > enabled), when processing each record from the topic, with processing
> > volume of 2-3 million records each 15 mins, each JSON on an average
> having
> > 400 to 500 KB approx.  Overtime the app runs out of the total memory
> within
> > 24 hours.
> >
> > Thanks,
> > Ashok
> >
> > On Wed, Aug 15, 2018 at 5:15 AM, AshokKumar J <as...@gmail.com>
> > wrote:
> >
> > > Disabling the stream cache prevents the unbounded memory usage, however
> > > the throughput is low (with ROCKSDB cache enabled).  Can you please
> > advise
> > > why the cache objects reference doesn't get released in time (for GC
> > > cleanup) and grows continuously?
> > >
> > > On Tue, Aug 14, 2018 at 11:17 PM, AshokKumar J <
> ashokkumar.js@gmail.com>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> we have a stream application that uses the low level API.  We persist
> > the
> > >> data into the key value state store.  For each record that we retrieve
> > from
> > >> the topic we perform a lookup against the store to see if it exists,
> if
> > it
> > >> does then we update the existing, else we simply add the new record.
> > With
> > >> this we are running into significant memory issue, basically whatever
> > the
> > >> memory we allocate they all get fully utilized (all the objects goes
> > into
> > >> the older generations).  The caching has been enabled and we specified
> > 40%
> > >> of the total memory to the caching.  Let's say we have total
> application
> > >> memory as 24GB and we specify the caching size as 12GB, ideally we
> > expect
> > >> 12GB to reside in older generation and rest should be younger, but for
> > some
> > >> reason everything is going into older generation and eventually we are
> > >> running out of memory within a day.  Please see below objects
> dominator
> > >> tree. Kindly suggest
> > >>
> > >> https://files.slack.com/files-pri/T47H7EWH0-FC8EZ9L66/image.png
> > >>
> > >>
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka streams - runs out of memory

Posted by Guozhang Wang <wa...@gmail.com>.
Hello AshokKumar,

Which version of Kafka are you using? And could you share your code snippet
for us to help investigate the issue (you can omit any concrete logic that
involves your business logic, just the sketch of the code is fine).


Guozhang

On Fri, Aug 17, 2018 at 8:52 AM, AshokKumar J <as...@gmail.com>
wrote:

> Hi,
> Any thoughts on the below issue?  I think the behavior should be
> reproducible if we perform both the put, get from the store (cache
> enabled), when processing each record from the topic, with processing
> volume of 2-3 million records each 15 mins, each JSON on an average having
> 400 to 500 KB approx.  Overtime the app runs out of the total memory within
> 24 hours.
>
> Thanks,
> Ashok
>
> On Wed, Aug 15, 2018 at 5:15 AM, AshokKumar J <as...@gmail.com>
> wrote:
>
> > Disabling the stream cache prevents the unbounded memory usage, however
> > the throughput is low (with ROCKSDB cache enabled).  Can you please
> advise
> > why the cache objects reference doesn't get released in time (for GC
> > cleanup) and grows continuously?
> >
> > On Tue, Aug 14, 2018 at 11:17 PM, AshokKumar J <as...@gmail.com>
> > wrote:
> >
> >> Hi,
> >>
> >> we have a stream application that uses the low level API.  We persist
> the
> >> data into the key value state store.  For each record that we retrieve
> from
> >> the topic we perform a lookup against the store to see if it exists, if
> it
> >> does then we update the existing, else we simply add the new record.
> With
> >> this we are running into significant memory issue, basically whatever
> the
> >> memory we allocate they all get fully utilized (all the objects goes
> into
> >> the older generations).  The caching has been enabled and we specified
> 40%
> >> of the total memory to the caching.  Let's say we have total application
> >> memory as 24GB and we specify the caching size as 12GB, ideally we
> expect
> >> 12GB to reside in older generation and rest should be younger, but for
> some
> >> reason everything is going into older generation and eventually we are
> >> running out of memory within a day.  Please see below objects dominator
> >> tree. Kindly suggest
> >>
> >> https://files.slack.com/files-pri/T47H7EWH0-FC8EZ9L66/image.png
> >>
> >>
> >
>



-- 
-- Guozhang

Re: Kafka streams - runs out of memory

Posted by AshokKumar J <as...@gmail.com>.
Hi,
Any thoughts on the below issue?  I think the behavior should be
reproducible if we perform both the put, get from the store (cache
enabled), when processing each record from the topic, with processing
volume of 2-3 million records each 15 mins, each JSON on an average having
400 to 500 KB approx.  Overtime the app runs out of the total memory within
24 hours.

Thanks,
Ashok

On Wed, Aug 15, 2018 at 5:15 AM, AshokKumar J <as...@gmail.com>
wrote:

> Disabling the stream cache prevents the unbounded memory usage, however
> the throughput is low (with ROCKSDB cache enabled).  Can you please advise
> why the cache objects reference doesn't get released in time (for GC
> cleanup) and grows continuously?
>
> On Tue, Aug 14, 2018 at 11:17 PM, AshokKumar J <as...@gmail.com>
> wrote:
>
>> Hi,
>>
>> we have a stream application that uses the low level API.  We persist the
>> data into the key value state store.  For each record that we retrieve from
>> the topic we perform a lookup against the store to see if it exists, if it
>> does then we update the existing, else we simply add the new record.  With
>> this we are running into significant memory issue, basically whatever the
>> memory we allocate they all get fully utilized (all the objects goes into
>> the older generations).  The caching has been enabled and we specified 40%
>> of the total memory to the caching.  Let's say we have total application
>> memory as 24GB and we specify the caching size as 12GB, ideally we expect
>> 12GB to reside in older generation and rest should be younger, but for some
>> reason everything is going into older generation and eventually we are
>> running out of memory within a day.  Please see below objects dominator
>> tree. Kindly suggest
>>
>> https://files.slack.com/files-pri/T47H7EWH0-FC8EZ9L66/image.png
>>
>>
>

Re: Kafka streams - runs out of memory

Posted by AshokKumar J <as...@gmail.com>.
Disabling the stream cache prevents the unbounded memory usage, however the
throughput is low (with ROCKSDB cache enabled).  Can you please advise why
the cache objects reference doesn't get released in time (for GC cleanup)
and grows continuously?

On Tue, Aug 14, 2018 at 11:17 PM, AshokKumar J <as...@gmail.com>
wrote:

> Hi,
>
> we have a stream application that uses the low level API.  We persist the
> data into the key value state store.  For each record that we retrieve from
> the topic we perform a lookup against the store to see if it exists, if it
> does then we update the existing, else we simply add the new record.  With
> this we are running into significant memory issue, basically whatever the
> memory we allocate they all get fully utilized (all the objects goes into
> the older generations).  The caching has been enabled and we specified 40%
> of the total memory to the caching.  Let's say we have total application
> memory as 24GB and we specify the caching size as 12GB, ideally we expect
> 12GB to reside in older generation and rest should be younger, but for some
> reason everything is going into older generation and eventually we are
> running out of memory within a day.  Please see below objects dominator
> tree. Kindly suggest
>
> https://files.slack.com/files-pri/T47H7EWH0-FC8EZ9L66/image.png
>
>