You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jon Yeargers <jo...@cedexis.com> on 2016/12/08 23:35:30 UTC

controlling memory growth when aggregating

I working with JSON data that has an array member. Im aggregating values
into this using minute long windows.

I ran the app for ~10 minutes and watched it consume 40% of the memory on a
box with 32G. It was still growing when I stopped it. At this point it had
created ~800 values each of which was < 1Mb in size (owing to the
limitations on message size set at the broker). (I wrote all the values
into Redis so I could count them and check the aggregation).

1. Why is it consuming so much memory?
2. Is there a strategy for controlling this growth?

I get that it's keeping every window open in case a new value shows up.
Maybe some way to relax this using event time vs clock time?

Re: controlling memory growth when aggregating

Posted by Jon Yeargers <jo...@cedexis.com>.
I updated my consumer to that build. The memory issue seems to have abated.
TY!

Have started seeing this exception semi-regularly though:

ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
MinuteAgg failed on partition assignment

java.lang.IllegalStateException: task [1_4] Log end offset should not
change while restoring

        at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245)

        at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:198)

        at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)

        at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:206)

        at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)

        at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64)

        at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)

        at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)

        at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)

        at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)

        at
org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)

        at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)

        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)

        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)

        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)

        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)

        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)

        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)

        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)

        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

On Fri, Dec 9, 2016 at 4:29 PM, Jon Yeargers <jo...@cedexis.com>
wrote:

> Perhaps that's the problem. Yes - I'm still using 0.10.1.0.
>
> Does this involve a broker update?
>
> On Fri, Dec 9, 2016 at 7:12 AM, Damian Guy <da...@gmail.com> wrote:
>
>> Hi Jon,
>>
>> Are you using 0.10.1? There is a resource leak to do with the Window
>> Iterator. The bug has been fixed on the 0.10.1 branch (which will soon be
>> released as 0.10.1.1)
>> and it is also fixed in the confluent fork.
>>
>> You can get the confluent version by using the following:
>>
>> <repositories>
>>     <repository>
>>         <id>confluent</id>
>>         <url>http://packages.confluent.io/maven/</url>
>>     </repository></repositories>
>>
>> <dependency>
>>     <groupId>org.apache.kafka</groupId>
>>     <artifactId>kafka-streams</artifactId>
>>     <version>0.10.1.0-cp2</version></dependency><dependency>
>>     <groupId>org.apache.kafka</groupId>
>>     <artifactId>kafka-clients</artifactId>
>>     <version>0.10.1.0-cp2</version></dependency>
>>
>>
>> On Thu, 8 Dec 2016 at 23:37 Jon Yeargers <jo...@cedexis.com>
>> wrote:
>>
>> I working with JSON data that has an array member. Im aggregating values
>> into this using minute long windows.
>>
>> I ran the app for ~10 minutes and watched it consume 40% of the memory on
>> a
>> box with 32G. It was still growing when I stopped it. At this point it had
>> created ~800 values each of which was < 1Mb in size (owing to the
>> limitations on message size set at the broker). (I wrote all the values
>> into Redis so I could count them and check the aggregation).
>>
>> 1. Why is it consuming so much memory?
>> 2. Is there a strategy for controlling this growth?
>>
>> I get that it's keeping every window open in case a new value shows up.
>> Maybe some way to relax this using event time vs clock time?
>>
>
>

Re: controlling memory growth when aggregating

Posted by Jon Yeargers <jo...@cedexis.com>.
Perhaps that's the problem. Yes - I'm still using 0.10.1.0.

Does this involve a broker update?

On Fri, Dec 9, 2016 at 7:12 AM, Damian Guy <da...@gmail.com> wrote:

> Hi Jon,
>
> Are you using 0.10.1? There is a resource leak to do with the Window
> Iterator. The bug has been fixed on the 0.10.1 branch (which will soon be
> released as 0.10.1.1)
> and it is also fixed in the confluent fork.
>
> You can get the confluent version by using the following:
>
> <repositories>
>     <repository>
>         <id>confluent</id>
>         <url>http://packages.confluent.io/maven/</url>
>     </repository></repositories>
>
> <dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka-streams</artifactId>
>     <version>0.10.1.0-cp2</version></dependency><dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka-clients</artifactId>
>     <version>0.10.1.0-cp2</version></dependency>
>
>
> On Thu, 8 Dec 2016 at 23:37 Jon Yeargers <jo...@cedexis.com> wrote:
>
> I working with JSON data that has an array member. Im aggregating values
> into this using minute long windows.
>
> I ran the app for ~10 minutes and watched it consume 40% of the memory on a
> box with 32G. It was still growing when I stopped it. At this point it had
> created ~800 values each of which was < 1Mb in size (owing to the
> limitations on message size set at the broker). (I wrote all the values
> into Redis so I could count them and check the aggregation).
>
> 1. Why is it consuming so much memory?
> 2. Is there a strategy for controlling this growth?
>
> I get that it's keeping every window open in case a new value shows up.
> Maybe some way to relax this using event time vs clock time?
>

Re: controlling memory growth when aggregating

Posted by Damian Guy <da...@gmail.com>.
Hi Jon,

Are you using 0.10.1? There is a resource leak to do with the Window
Iterator. The bug has been fixed on the 0.10.1 branch (which will soon be
released as 0.10.1.1)
and it is also fixed in the confluent fork.

You can get the confluent version by using the following:

<repositories>
    <repository>
        <id>confluent</id>
        <url>http://packages.confluent.io/maven/</url>
    </repository></repositories>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.10.1.0-cp2</version></dependency><dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.1.0-cp2</version></dependency>


On Thu, 8 Dec 2016 at 23:37 Jon Yeargers <jo...@cedexis.com> wrote:

I working with JSON data that has an array member. Im aggregating values
into this using minute long windows.

I ran the app for ~10 minutes and watched it consume 40% of the memory on a
box with 32G. It was still growing when I stopped it. At this point it had
created ~800 values each of which was < 1Mb in size (owing to the
limitations on message size set at the broker). (I wrote all the values
into Redis so I could count them and check the aggregation).

1. Why is it consuming so much memory?
2. Is there a strategy for controlling this growth?

I get that it's keeping every window open in case a new value shows up.
Maybe some way to relax this using event time vs clock time?