You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dmitriy Gromov <dm...@knewton.com> on 2014/12/04 18:00:53 UTC

OutOfMemoryException when starting replacement node.

Hi,

We were recently trying to replace a broker instance and were getting an
OutOfMemoryException when the new node was coming up. The issue happened
during the log replication phase. We were able to circumvent this issue by
copying over all of the logs to the new node before starting it.

Details:

- The heap size on the old and new node was 8GB.
- There was about 50GB of log data to transfer.
- There were 1548 partitions across 11 topics
- We recently increased our num.replica.fetchers to solve the problem
described here: https://issues.apache.org/jira/browse/KAFKA-1196. However,
this process worked when we first changed that value.

[2014-12-04 12:10:22,746] ERROR OOME with size 1867671283 (kafka.network.
BoundedByteBufferReceive)
java.lang.OutOfMemoryError: Java heap space
  at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
  at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
  at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
BoundedByteBufferReceive.scala:80)
  at kafka.network.BoundedByteBufferReceive.readFrom(
BoundedByteBufferReceive.scala:63)
  at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
  at kafka.network.BoundedByteBufferReceive.readCompletely(
BoundedByteBufferReceive.scala:29)
  at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
  at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
$sendRequest(SimpleConsumer.scala:71)
  at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
  at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
  at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
  at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
SimpleConsumer.scala:108)
  at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
SimpleConsumer.scala:108)
  at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
SimpleConsumer.scala:108)
  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
  at kafka.server.AbstractFetcherThread.processFetchRequest(
AbstractFetcherThread.scala:96)
  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
88)
  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

Thank you

Re: OutOfMemoryException when starting replacement node.

Posted by Gwen Shapira <gs...@cloudera.com>.
If you have replica.fetch.max.bytes set to 10MB, I would not expect
2GB allocation in BoundedByteBufferReceive when doing a fetch.

Sorry, out of ideas on why this happens...

On Wed, Dec 10, 2014 at 8:41 AM, Solon Gordon <so...@knewton.com> wrote:
> Thanks for your help. We do have replica.fetch.max.bytes set to 10MB to
> allow larger messages, so perhaps that's related. But should that really be
> big enough to cause OOMs on an 8GB heap? Are there other broker settings we
> can tune to avoid this issue?
>
> On Wed, Dec 10, 2014 at 11:05 AM, Gwen Shapira <gs...@cloudera.com>
> wrote:
>
>> There is a parameter called replica.fetch.max.bytes that controls the
>> size of the messages buffer a broker will attempt to consume at once.
>> It defaults to 1MB, and has to be at least message.max.bytes (so at
>> least one message can be sent).
>>
>> If you try to support really large messages and increase these values,
>> you may run into OOM issues.
>>
>> Gwen
>>
>> On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon <so...@knewton.com> wrote:
>> > I just wanted to bump this issue to see if anyone has thoughts. Based on
>> > the error message it seems like the broker is attempting to consume
>> nearly
>> > 2GB of data in a single fetch. Is this expected behavior?
>> >
>> > Please let us know if more details would be helpful or if it would be
>> > better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
>> >
>> > Thanks,
>> > Solon
>> >
>> > On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov <dm...@knewton.com>
>> wrote:
>> >
>> >> Hi,
>> >>
>> >> We were recently trying to replace a broker instance and were getting an
>> >> OutOfMemoryException when the new node was coming up. The issue happened
>> >> during the log replication phase. We were able to circumvent this issue
>> by
>> >> copying over all of the logs to the new node before starting it.
>> >>
>> >> Details:
>> >>
>> >> - The heap size on the old and new node was 8GB.
>> >> - There was about 50GB of log data to transfer.
>> >> - There were 1548 partitions across 11 topics
>> >> - We recently increased our num.replica.fetchers to solve the problem
>> >> described here: https://issues.apache.org/jira/browse/KAFKA-1196.
>> However,
>> >> this process worked when we first changed that value.
>> >>
>> >> [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283
>> (kafka.network.
>> >> BoundedByteBufferReceive)
>> >> java.lang.OutOfMemoryError: Java heap space
>> >>   at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>> >>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>> >>   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
>> >> BoundedByteBufferReceive.scala:80)
>> >>   at kafka.network.BoundedByteBufferReceive.readFrom(
>> >> BoundedByteBufferReceive.scala:63)
>> >>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> >>   at kafka.network.BoundedByteBufferReceive.readCompletely(
>> >> BoundedByteBufferReceive.scala:29)
>> >>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> >>   at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>> >>   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>> >> $sendRequest(SimpleConsumer.scala:71)
>> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>> >> SimpleConsumer.scala:108)
>> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> >> SimpleConsumer.scala:108)
>> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> >> SimpleConsumer.scala:108)
>> >>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> >>   at kafka.server.AbstractFetcherThread.processFetchRequest(
>> >> AbstractFetcherThread.scala:96)
>> >>   at
>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
>> >> 88)
>> >>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> >>
>> >> Thank you
>> >>
>>

Re: OutOfMemoryException when starting replacement node.

Posted by Solon Gordon <so...@knewton.com>.
Thanks for your help. We do have replica.fetch.max.bytes set to 10MB to
allow larger messages, so perhaps that's related. But should that really be
big enough to cause OOMs on an 8GB heap? Are there other broker settings we
can tune to avoid this issue?

On Wed, Dec 10, 2014 at 11:05 AM, Gwen Shapira <gs...@cloudera.com>
wrote:

> There is a parameter called replica.fetch.max.bytes that controls the
> size of the messages buffer a broker will attempt to consume at once.
> It defaults to 1MB, and has to be at least message.max.bytes (so at
> least one message can be sent).
>
> If you try to support really large messages and increase these values,
> you may run into OOM issues.
>
> Gwen
>
> On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon <so...@knewton.com> wrote:
> > I just wanted to bump this issue to see if anyone has thoughts. Based on
> > the error message it seems like the broker is attempting to consume
> nearly
> > 2GB of data in a single fetch. Is this expected behavior?
> >
> > Please let us know if more details would be helpful or if it would be
> > better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
> >
> > Thanks,
> > Solon
> >
> > On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov <dm...@knewton.com>
> wrote:
> >
> >> Hi,
> >>
> >> We were recently trying to replace a broker instance and were getting an
> >> OutOfMemoryException when the new node was coming up. The issue happened
> >> during the log replication phase. We were able to circumvent this issue
> by
> >> copying over all of the logs to the new node before starting it.
> >>
> >> Details:
> >>
> >> - The heap size on the old and new node was 8GB.
> >> - There was about 50GB of log data to transfer.
> >> - There were 1548 partitions across 11 topics
> >> - We recently increased our num.replica.fetchers to solve the problem
> >> described here: https://issues.apache.org/jira/browse/KAFKA-1196.
> However,
> >> this process worked when we first changed that value.
> >>
> >> [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283
> (kafka.network.
> >> BoundedByteBufferReceive)
> >> java.lang.OutOfMemoryError: Java heap space
> >>   at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> >>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> >>   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
> >> BoundedByteBufferReceive.scala:80)
> >>   at kafka.network.BoundedByteBufferReceive.readFrom(
> >> BoundedByteBufferReceive.scala:63)
> >>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >>   at kafka.network.BoundedByteBufferReceive.readCompletely(
> >> BoundedByteBufferReceive.scala:29)
> >>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >>   at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
> >>   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
> >> $sendRequest(SimpleConsumer.scala:71)
> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> >> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> >> apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> >> apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
> >> SimpleConsumer.scala:108)
> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> >> SimpleConsumer.scala:108)
> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> >> SimpleConsumer.scala:108)
> >>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >>   at kafka.server.AbstractFetcherThread.processFetchRequest(
> >> AbstractFetcherThread.scala:96)
> >>   at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
> >> 88)
> >>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >>
> >> Thank you
> >>
>

Re: OutOfMemoryException when starting replacement node.

Posted by Gwen Shapira <gs...@cloudera.com>.
There is a parameter called replica.fetch.max.bytes that controls the
size of the messages buffer a broker will attempt to consume at once.
It defaults to 1MB, and has to be at least message.max.bytes (so at
least one message can be sent).

If you try to support really large messages and increase these values,
you may run into OOM issues.

Gwen

On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon <so...@knewton.com> wrote:
> I just wanted to bump this issue to see if anyone has thoughts. Based on
> the error message it seems like the broker is attempting to consume nearly
> 2GB of data in a single fetch. Is this expected behavior?
>
> Please let us know if more details would be helpful or if it would be
> better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
>
> Thanks,
> Solon
>
> On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov <dm...@knewton.com> wrote:
>
>> Hi,
>>
>> We were recently trying to replace a broker instance and were getting an
>> OutOfMemoryException when the new node was coming up. The issue happened
>> during the log replication phase. We were able to circumvent this issue by
>> copying over all of the logs to the new node before starting it.
>>
>> Details:
>>
>> - The heap size on the old and new node was 8GB.
>> - There was about 50GB of log data to transfer.
>> - There were 1548 partitions across 11 topics
>> - We recently increased our num.replica.fetchers to solve the problem
>> described here: https://issues.apache.org/jira/browse/KAFKA-1196. However,
>> this process worked when we first changed that value.
>>
>> [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283 (kafka.network.
>> BoundedByteBufferReceive)
>> java.lang.OutOfMemoryError: Java heap space
>>   at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>>   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
>> BoundedByteBufferReceive.scala:80)
>>   at kafka.network.BoundedByteBufferReceive.readFrom(
>> BoundedByteBufferReceive.scala:63)
>>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>   at kafka.network.BoundedByteBufferReceive.readCompletely(
>> BoundedByteBufferReceive.scala:29)
>>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>>   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>> $sendRequest(SimpleConsumer.scala:71)
>>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>> SimpleConsumer.scala:108)
>>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> SimpleConsumer.scala:108)
>>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> SimpleConsumer.scala:108)
>>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>>   at kafka.server.AbstractFetcherThread.processFetchRequest(
>> AbstractFetcherThread.scala:96)
>>   at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
>> 88)
>>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>
>> Thank you
>>

Re: OutOfMemoryException when starting replacement node.

Posted by Gwen Shapira <gs...@cloudera.com>.
Agree that the docs can be better. Perhaps you want to open a JIRA (at
issues.apache.org) with this suggestion?

On Wed, Dec 10, 2014 at 4:03 PM, Solon Gordon <so...@knewton.com> wrote:
> I see, thank you for the explanation. You might consider being more
> explicit about this in your documentation. We didn't realize we needed to
> take the (partitions * fetch size) calculation into account when choosing
> partition counts for our topics, so this is a bit of a rude surprise.
>
> On Wed, Dec 10, 2014 at 3:50 PM, Gwen Shapira <gs...@cloudera.com> wrote:
>
>> Ah, found where we actually size the request as partitions * fetch size.
>>
>> Thanks for the correction, Jay and sorry for the mix-up, Solon.
>>
>> On Wed, Dec 10, 2014 at 10:41 AM, Jay Kreps <ja...@confluent.io> wrote:
>> > Hey Solon,
>> >
>> > The 10MB size is per-partition. The rationale for this is that the fetch
>> > size per-partition is effectively a max message size. However with so
>> many
>> > partitions on one machine this will lead to a very large fetch size. We
>> > don't do a great job of scheduling these to stay under a memory bound
>> > today. Ideally the broker and consumer should do something intelligent to
>> > stay under a fixed memory budget, this is something we'd like to address
>> as
>> > part of the new consumer.
>> >
>> > For now you need to either bump up your heap or decrease your fetch size.
>> >
>> > -jay
>> >
>> > On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon <so...@knewton.com> wrote:
>> >
>> >> I just wanted to bump this issue to see if anyone has thoughts. Based on
>> >> the error message it seems like the broker is attempting to consume
>> nearly
>> >> 2GB of data in a single fetch. Is this expected behavior?
>> >>
>> >> Please let us know if more details would be helpful or if it would be
>> >> better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
>> >>
>> >> Thanks,
>> >> Solon
>> >>
>> >> On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov <dm...@knewton.com>
>> >> wrote:
>> >>
>> >> > Hi,
>> >> >
>> >> > We were recently trying to replace a broker instance and were getting
>> an
>> >> > OutOfMemoryException when the new node was coming up. The issue
>> happened
>> >> > during the log replication phase. We were able to circumvent this
>> issue
>> >> by
>> >> > copying over all of the logs to the new node before starting it.
>> >> >
>> >> > Details:
>> >> >
>> >> > - The heap size on the old and new node was 8GB.
>> >> > - There was about 50GB of log data to transfer.
>> >> > - There were 1548 partitions across 11 topics
>> >> > - We recently increased our num.replica.fetchers to solve the problem
>> >> > described here: https://issues.apache.org/jira/browse/KAFKA-1196.
>> >> However,
>> >> > this process worked when we first changed that value.
>> >> >
>> >> > [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283
>> (kafka.network.
>> >> > BoundedByteBufferReceive)
>> >> > java.lang.OutOfMemoryError: Java heap space
>> >> >   at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>> >> >   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>> >> >   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
>> >> > BoundedByteBufferReceive.scala:80)
>> >> >   at kafka.network.BoundedByteBufferReceive.readFrom(
>> >> > BoundedByteBufferReceive.scala:63)
>> >> >   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> >> >   at kafka.network.BoundedByteBufferReceive.readCompletely(
>> >> > BoundedByteBufferReceive.scala:29)
>> >> >   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> >> >   at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>> >> >   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>> >> > $sendRequest(SimpleConsumer.scala:71)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> > apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>> >> > SimpleConsumer.scala:108)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> >> > SimpleConsumer.scala:108)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> >> > SimpleConsumer.scala:108)
>> >> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >> >   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> >> >   at kafka.server.AbstractFetcherThread.processFetchRequest(
>> >> > AbstractFetcherThread.scala:96)
>> >> >   at
>> >> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
>> >> > 88)
>> >> >   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> >> >
>> >> > Thank you
>> >> >
>> >>
>>

Re: OutOfMemoryException when starting replacement node.

Posted by Solon Gordon <so...@knewton.com>.
I see, thank you for the explanation. You might consider being more
explicit about this in your documentation. We didn't realize we needed to
take the (partitions * fetch size) calculation into account when choosing
partition counts for our topics, so this is a bit of a rude surprise.

On Wed, Dec 10, 2014 at 3:50 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> Ah, found where we actually size the request as partitions * fetch size.
>
> Thanks for the correction, Jay and sorry for the mix-up, Solon.
>
> On Wed, Dec 10, 2014 at 10:41 AM, Jay Kreps <ja...@confluent.io> wrote:
> > Hey Solon,
> >
> > The 10MB size is per-partition. The rationale for this is that the fetch
> > size per-partition is effectively a max message size. However with so
> many
> > partitions on one machine this will lead to a very large fetch size. We
> > don't do a great job of scheduling these to stay under a memory bound
> > today. Ideally the broker and consumer should do something intelligent to
> > stay under a fixed memory budget, this is something we'd like to address
> as
> > part of the new consumer.
> >
> > For now you need to either bump up your heap or decrease your fetch size.
> >
> > -jay
> >
> > On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon <so...@knewton.com> wrote:
> >
> >> I just wanted to bump this issue to see if anyone has thoughts. Based on
> >> the error message it seems like the broker is attempting to consume
> nearly
> >> 2GB of data in a single fetch. Is this expected behavior?
> >>
> >> Please let us know if more details would be helpful or if it would be
> >> better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
> >>
> >> Thanks,
> >> Solon
> >>
> >> On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov <dm...@knewton.com>
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > We were recently trying to replace a broker instance and were getting
> an
> >> > OutOfMemoryException when the new node was coming up. The issue
> happened
> >> > during the log replication phase. We were able to circumvent this
> issue
> >> by
> >> > copying over all of the logs to the new node before starting it.
> >> >
> >> > Details:
> >> >
> >> > - The heap size on the old and new node was 8GB.
> >> > - There was about 50GB of log data to transfer.
> >> > - There were 1548 partitions across 11 topics
> >> > - We recently increased our num.replica.fetchers to solve the problem
> >> > described here: https://issues.apache.org/jira/browse/KAFKA-1196.
> >> However,
> >> > this process worked when we first changed that value.
> >> >
> >> > [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283
> (kafka.network.
> >> > BoundedByteBufferReceive)
> >> > java.lang.OutOfMemoryError: Java heap space
> >> >   at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> >> >   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> >> >   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
> >> > BoundedByteBufferReceive.scala:80)
> >> >   at kafka.network.BoundedByteBufferReceive.readFrom(
> >> > BoundedByteBufferReceive.scala:63)
> >> >   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >> >   at kafka.network.BoundedByteBufferReceive.readCompletely(
> >> > BoundedByteBufferReceive.scala:29)
> >> >   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >> >   at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
> >> >   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
> >> > $sendRequest(SimpleConsumer.scala:71)
> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> >> > apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
> >> > SimpleConsumer.scala:108)
> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> >> > SimpleConsumer.scala:108)
> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> >> > SimpleConsumer.scala:108)
> >> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> >   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >> >   at kafka.server.AbstractFetcherThread.processFetchRequest(
> >> > AbstractFetcherThread.scala:96)
> >> >   at
> >> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
> >> > 88)
> >> >   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >> >
> >> > Thank you
> >> >
> >>
>

Re: OutOfMemoryException when starting replacement node.

Posted by Gwen Shapira <gs...@cloudera.com>.
Ah, found where we actually size the request as partitions * fetch size.

Thanks for the correction, Jay and sorry for the mix-up, Solon.

On Wed, Dec 10, 2014 at 10:41 AM, Jay Kreps <ja...@confluent.io> wrote:
> Hey Solon,
>
> The 10MB size is per-partition. The rationale for this is that the fetch
> size per-partition is effectively a max message size. However with so many
> partitions on one machine this will lead to a very large fetch size. We
> don't do a great job of scheduling these to stay under a memory bound
> today. Ideally the broker and consumer should do something intelligent to
> stay under a fixed memory budget, this is something we'd like to address as
> part of the new consumer.
>
> For now you need to either bump up your heap or decrease your fetch size.
>
> -jay
>
> On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon <so...@knewton.com> wrote:
>
>> I just wanted to bump this issue to see if anyone has thoughts. Based on
>> the error message it seems like the broker is attempting to consume nearly
>> 2GB of data in a single fetch. Is this expected behavior?
>>
>> Please let us know if more details would be helpful or if it would be
>> better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
>>
>> Thanks,
>> Solon
>>
>> On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov <dm...@knewton.com>
>> wrote:
>>
>> > Hi,
>> >
>> > We were recently trying to replace a broker instance and were getting an
>> > OutOfMemoryException when the new node was coming up. The issue happened
>> > during the log replication phase. We were able to circumvent this issue
>> by
>> > copying over all of the logs to the new node before starting it.
>> >
>> > Details:
>> >
>> > - The heap size on the old and new node was 8GB.
>> > - There was about 50GB of log data to transfer.
>> > - There were 1548 partitions across 11 topics
>> > - We recently increased our num.replica.fetchers to solve the problem
>> > described here: https://issues.apache.org/jira/browse/KAFKA-1196.
>> However,
>> > this process worked when we first changed that value.
>> >
>> > [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283 (kafka.network.
>> > BoundedByteBufferReceive)
>> > java.lang.OutOfMemoryError: Java heap space
>> >   at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>> >   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>> >   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
>> > BoundedByteBufferReceive.scala:80)
>> >   at kafka.network.BoundedByteBufferReceive.readFrom(
>> > BoundedByteBufferReceive.scala:63)
>> >   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> >   at kafka.network.BoundedByteBufferReceive.readCompletely(
>> > BoundedByteBufferReceive.scala:29)
>> >   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> >   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>> >   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>> > $sendRequest(SimpleConsumer.scala:71)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> > apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>> > SimpleConsumer.scala:108)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> > SimpleConsumer.scala:108)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> > SimpleConsumer.scala:108)
>> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> >   at kafka.server.AbstractFetcherThread.processFetchRequest(
>> > AbstractFetcherThread.scala:96)
>> >   at
>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
>> > 88)
>> >   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> >
>> > Thank you
>> >
>>

Re: OutOfMemoryException when starting replacement node.

Posted by Jay Kreps <ja...@confluent.io>.
Hey Solon,

The 10MB size is per-partition. The rationale for this is that the fetch
size per-partition is effectively a max message size. However with so many
partitions on one machine this will lead to a very large fetch size. We
don't do a great job of scheduling these to stay under a memory bound
today. Ideally the broker and consumer should do something intelligent to
stay under a fixed memory budget, this is something we'd like to address as
part of the new consumer.

For now you need to either bump up your heap or decrease your fetch size.

-jay

On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon <so...@knewton.com> wrote:

> I just wanted to bump this issue to see if anyone has thoughts. Based on
> the error message it seems like the broker is attempting to consume nearly
> 2GB of data in a single fetch. Is this expected behavior?
>
> Please let us know if more details would be helpful or if it would be
> better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
>
> Thanks,
> Solon
>
> On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov <dm...@knewton.com>
> wrote:
>
> > Hi,
> >
> > We were recently trying to replace a broker instance and were getting an
> > OutOfMemoryException when the new node was coming up. The issue happened
> > during the log replication phase. We were able to circumvent this issue
> by
> > copying over all of the logs to the new node before starting it.
> >
> > Details:
> >
> > - The heap size on the old and new node was 8GB.
> > - There was about 50GB of log data to transfer.
> > - There were 1548 partitions across 11 topics
> > - We recently increased our num.replica.fetchers to solve the problem
> > described here: https://issues.apache.org/jira/browse/KAFKA-1196.
> However,
> > this process worked when we first changed that value.
> >
> > [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283 (kafka.network.
> > BoundedByteBufferReceive)
> > java.lang.OutOfMemoryError: Java heap space
> >   at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> >   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> >   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
> > BoundedByteBufferReceive.scala:80)
> >   at kafka.network.BoundedByteBufferReceive.readFrom(
> > BoundedByteBufferReceive.scala:63)
> >   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >   at kafka.network.BoundedByteBufferReceive.readCompletely(
> > BoundedByteBufferReceive.scala:29)
> >   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
> >   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
> > $sendRequest(SimpleConsumer.scala:71)
> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> > apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
> > SimpleConsumer.scala:108)
> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> > SimpleConsumer.scala:108)
> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> > SimpleConsumer.scala:108)
> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >   at kafka.server.AbstractFetcherThread.processFetchRequest(
> > AbstractFetcherThread.scala:96)
> >   at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
> > 88)
> >   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> > Thank you
> >
>

Re: OutOfMemoryException when starting replacement node.

Posted by Solon Gordon <so...@knewton.com>.
I just wanted to bump this issue to see if anyone has thoughts. Based on
the error message it seems like the broker is attempting to consume nearly
2GB of data in a single fetch. Is this expected behavior?

Please let us know if more details would be helpful or if it would be
better for us to file a JIRA issue. We're using Kafka 0.8.1.1.

Thanks,
Solon

On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov <dm...@knewton.com> wrote:

> Hi,
>
> We were recently trying to replace a broker instance and were getting an
> OutOfMemoryException when the new node was coming up. The issue happened
> during the log replication phase. We were able to circumvent this issue by
> copying over all of the logs to the new node before starting it.
>
> Details:
>
> - The heap size on the old and new node was 8GB.
> - There was about 50GB of log data to transfer.
> - There were 1548 partitions across 11 topics
> - We recently increased our num.replica.fetchers to solve the problem
> described here: https://issues.apache.org/jira/browse/KAFKA-1196. However,
> this process worked when we first changed that value.
>
> [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283 (kafka.network.
> BoundedByteBufferReceive)
> java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
> BoundedByteBufferReceive.scala:80)
>   at kafka.network.BoundedByteBufferReceive.readFrom(
> BoundedByteBufferReceive.scala:63)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at kafka.network.BoundedByteBufferReceive.readCompletely(
> BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
> $sendRequest(SimpleConsumer.scala:71)
>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
> SimpleConsumer.scala:108)
>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> SimpleConsumer.scala:108)
>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> SimpleConsumer.scala:108)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>   at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:96)
>   at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
> 88)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> Thank you
>