You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Pranay Agarwal <ag...@gmail.com> on 2015/01/19 23:10:17 UTC

Kafka Out of Memory error

Hi All,

I have a kafka cluster setup which has 2 topics

topic1 with 10 partitions
topic2 with 1000 partitions.

While, I am able to consume messages from topic1 just fine, I get following
error from the topic2. There is a resolved issue here on the same thing
https://issues.apache.org/jira/browse/KAFKA-664

I am using latest kafka server version, and I have used kafka command line
tools to consumer from the topics.

>>>>
[2015-01-19 22:08:10,758] ERROR OOME with size 201332748
(kafka.network.BoundedByteBufferReceive)

java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at
kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
<<<


Thanks
-Pranay

Re: Kafka Out of Memory error

Posted by Pranay Agarwal <ag...@gmail.com>.
Thanks a lot Gwen. I bumped up the JVM to 1g on the consumer side and it
works :)

All the consumer belong to the same group and I am using the High level
group API to consume from the kafka. It seems there is some initial meta
data exchange or something about all the partitions are sent to all the
consumer. Also, I launch 10 consumer from each machine at a time and keep
on adding till 200 consumers. I see that initial consumers seem to require
initially lot more cpu and memory. Should I launch all the consumers at one
go instead of adding 10 at a time?

On different issue, I couldn't find anyway of keeping the current read
offset metadata while using the High level API( I am using the gem
mentioned in earlier mail). Is there anyway to record the current read
metadata periodically to monitor the progress of the consumer. Further,
everytime a consumer dies and restart it seems to start reading from the
beginning, is there anyway to read from last read offsets only?

Thanks
-Pranay

On Mon, Jan 19, 2015 at 6:54 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> Two things:
> 1. The OOM happened on the consumer, right? So the memory that matters
> is the RAM on the consumer machine, not on the Kafka cluster nodes.
>
> 2. If the consumers belong to the same consumer group, each will
> consume a subset of the partitions and will only need to allocate
> memory for those partitions.
>
> So, assuming all your consumers belong to the same group:
> 2 consumers  -> each has 500 partitions -> each uses 500MB.
>
> The total remains 1GB no matter how many consumers you have, as long
> as they are all in the same group.
>
> If the consumer belong to different groups (i.e. they read copies of
> the same messages from the same partitions), then yes, you are limited
> to 8 per server (probably less because there are other stuff on the
> server).
>
> Gwen
>
> On Mon, Jan 19, 2015 at 3:06 PM, Pranay Agarwal
> <ag...@gmail.com> wrote:
> > Thanks a lot Natty.
> >
> > I am using this Ruby gem on the client side with all the default config
> >
> https://github.com/joekiller/jruby-kafka/blob/master/lib/jruby-kafka/group.rb
> > and the value fetch.message.max.bytes is set to 1MB.
> >
> > Currently I only have 3 nodes setup in the Kafka cluster (with 8 GB RAM)
> > and if 1 consumer if going to take 1000 partitions X 1mb ~ 1GB, does it
> > mean 1 kafka node can at best support 8 consumer only? Also, when I do
> > top/free on the Kafka cluster nodes (Both zookeeper and kafka is deployed
> > on each 3 nodes of the cluster) I don't see lots of memory being used on
> > the machine. Also, even with this calculation, I shouldn't be facing any
> > issue with only 1 consumer, as I have 8GB of JVM space given to Kafka
> > nodes, right?
> >
> > Thanks
> > -Pranay
> >
> > On Mon, Jan 19, 2015 at 2:53 PM, Jonathan Natkins <na...@streamsets.com>
> > wrote:
> >
> >> The fetch.message.max.size is actually a client-side configuration. With
> >> regard to increasing the number of threads, I think the calculation may
> be
> >> a little more subtle than what you're proposing, and frankly, it's
> unlikely
> >> that your servers can handle allocating 200MB x 1000 threads = 200GB of
> >> memory at a single time.
> >>
> >> I believe that if you have every partition on a single broker, and all
> of
> >> your consumer threads are requesting data simultaneously, then yes, the
> >> broker would attempt to allocate 200GB of heap, and probably you'll hit
> an
> >> OOME. However, since each consumer is only reading from one partition,
> >> those 1000 threads should be making requests that are spread out over
> the
> >> entire Kafka cluster. Depending on the memory on your servers, you may
> need
> >> to increase the number of brokers in your cluster to support the 1000
> >> threads. For example, I would expect that you can support this with 10
> >> brokers if each broker has something north of 20GB of heap allocated.
> >>
> >> Some of this is a little bit of guess work on my part, and I'm not super
> >> confident of my numbers...Can anybody else on the list validate my math?
> >>
> >> Thanks,
> >> Natty
> >>
> >> Jonathan "Natty" Natkins
> >> StreamSets | Customer Engagement Engineer
> >> mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
> >>
> >>
> >> On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal <
> agarwalpranaya@gmail.com>
> >> wrote:
> >>
> >> > Thanks Natty.
> >> >
> >> > Is there any config which I need to change on the client side as well?
> >> > Also, currently I am trying with only 1 consumer thread. Does the
> >> equation
> >> > changes to
> >> > (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read
> with
> >> > 1000 threads from from topic2(1000 partitions)?
> >> >
> >> > -Pranay
> >> >
> >> > On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <
> natty@streamsets.com>
> >> > wrote:
> >> >
> >> > > Hi Pranay,
> >> > >
> >> > > I think the JIRA you're referencing is a bit orthogonal to the OOME
> >> that
> >> > > you're experiencing. Based on the stacktrace, it looks like your
> OOME
> >> is
> >> > > coming from a consumer request, which is attempting to allocate
> 200MB.
> >> > > There was a thread (relatively recently) that discussed what I
> think is
> >> > > your issue:
> >> > >
> >> > >
> >> > >
> >> >
> >>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5gA@mail.gmail.com%3E
> >> > >
> >> > > I suspect that the takeaway is that the way Kafka determines the
> >> required
> >> > > memory for a consumer request is (#partitions in the topic) x
> >> > > (replica.fetch.max.bytes), and seemingly you don't have enough
> memory
> >> > > allocated to handle that request. The solution is likely to increase
> >> the
> >> > > heap size on your brokers or to decrease your max fetch size.
> >> > >
> >> > > Thanks,
> >> > > Natty
> >> > >
> >> > > Jonathan "Natty" Natkins
> >> > > StreamSets | Customer Engagement Engineer
> >> > > mobile: 609.577.1600 | linkedin <
> http://www.linkedin.com/in/nattyice>
> >> > >
> >> > >
> >> > > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <
> >> > agarwalpranaya@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > Hi All,
> >> > > >
> >> > > > I have a kafka cluster setup which has 2 topics
> >> > > >
> >> > > > topic1 with 10 partitions
> >> > > > topic2 with 1000 partitions.
> >> > > >
> >> > > > While, I am able to consume messages from topic1 just fine, I get
> >> > > following
> >> > > > error from the topic2. There is a resolved issue here on the same
> >> thing
> >> > > > https://issues.apache.org/jira/browse/KAFKA-664
> >> > > >
> >> > > > I am using latest kafka server version, and I have used kafka
> command
> >> > > line
> >> > > > tools to consumer from the topics.
> >> > > >
> >> > > > >>>>
> >> > > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
> >> > > > (kafka.network.BoundedByteBufferReceive)
> >> > > >
> >> > > > java.lang.OutOfMemoryError: Java heap space
> >> > > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> >> > > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> >> > > > at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >> > > > at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >> > > > at
> >> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> >> > > > at
> >> > > >
> >> >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >> > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >> > > > <<<
> >> > > >
> >> > > >
> >> > > > Thanks
> >> > > > -Pranay
> >> > > >
> >> > >
> >> >
> >>
>

Re: Kafka Out of Memory error

Posted by Pranay Agarwal <ag...@gmail.com>.
Thanks a lot Gwen. I bumped up the JVM to 1g on the consumer side and it
works :)

All the consumer belong to the same group and I am using the High level
group API to consume from the kafka. It seems there is some initial meta
data exchange or something about all the partitions are sent to all the
consumer. Also, I launch 10 consumer from each machine at a time and keep
on adding till 200 consumers. I see that initial consumers seem to require
initially lot more cpu and memory. Should I launch all the consumers at one
go instead of adding 10 at a time?

On different issue, I couldn't find anyway of keeping the current read
offset metadata while using the High level API( I am using the gem
mentioned in earlier mail). Is there anyway to record the current read
metadata periodically to monitor the progress of the consumer. Further,
everytime a consumer dies and restart it seems to start reading from the
beginning, is there anyway to read from last read offsets only?

Thanks
-Pranay

On Mon, Jan 19, 2015 at 6:54 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> Two things:
> 1. The OOM happened on the consumer, right? So the memory that matters
> is the RAM on the consumer machine, not on the Kafka cluster nodes.
>
> 2. If the consumers belong to the same consumer group, each will
> consume a subset of the partitions and will only need to allocate
> memory for those partitions.
>
> So, assuming all your consumers belong to the same group:
> 2 consumers  -> each has 500 partitions -> each uses 500MB.
>
> The total remains 1GB no matter how many consumers you have, as long
> as they are all in the same group.
>
> If the consumer belong to different groups (i.e. they read copies of
> the same messages from the same partitions), then yes, you are limited
> to 8 per server (probably less because there are other stuff on the
> server).
>
> Gwen
>
> On Mon, Jan 19, 2015 at 3:06 PM, Pranay Agarwal
> <ag...@gmail.com> wrote:
> > Thanks a lot Natty.
> >
> > I am using this Ruby gem on the client side with all the default config
> >
> https://github.com/joekiller/jruby-kafka/blob/master/lib/jruby-kafka/group.rb
> > and the value fetch.message.max.bytes is set to 1MB.
> >
> > Currently I only have 3 nodes setup in the Kafka cluster (with 8 GB RAM)
> > and if 1 consumer if going to take 1000 partitions X 1mb ~ 1GB, does it
> > mean 1 kafka node can at best support 8 consumer only? Also, when I do
> > top/free on the Kafka cluster nodes (Both zookeeper and kafka is deployed
> > on each 3 nodes of the cluster) I don't see lots of memory being used on
> > the machine. Also, even with this calculation, I shouldn't be facing any
> > issue with only 1 consumer, as I have 8GB of JVM space given to Kafka
> > nodes, right?
> >
> > Thanks
> > -Pranay
> >
> > On Mon, Jan 19, 2015 at 2:53 PM, Jonathan Natkins <na...@streamsets.com>
> > wrote:
> >
> >> The fetch.message.max.size is actually a client-side configuration. With
> >> regard to increasing the number of threads, I think the calculation may
> be
> >> a little more subtle than what you're proposing, and frankly, it's
> unlikely
> >> that your servers can handle allocating 200MB x 1000 threads = 200GB of
> >> memory at a single time.
> >>
> >> I believe that if you have every partition on a single broker, and all
> of
> >> your consumer threads are requesting data simultaneously, then yes, the
> >> broker would attempt to allocate 200GB of heap, and probably you'll hit
> an
> >> OOME. However, since each consumer is only reading from one partition,
> >> those 1000 threads should be making requests that are spread out over
> the
> >> entire Kafka cluster. Depending on the memory on your servers, you may
> need
> >> to increase the number of brokers in your cluster to support the 1000
> >> threads. For example, I would expect that you can support this with 10
> >> brokers if each broker has something north of 20GB of heap allocated.
> >>
> >> Some of this is a little bit of guess work on my part, and I'm not super
> >> confident of my numbers...Can anybody else on the list validate my math?
> >>
> >> Thanks,
> >> Natty
> >>
> >> Jonathan "Natty" Natkins
> >> StreamSets | Customer Engagement Engineer
> >> mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
> >>
> >>
> >> On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal <
> agarwalpranaya@gmail.com>
> >> wrote:
> >>
> >> > Thanks Natty.
> >> >
> >> > Is there any config which I need to change on the client side as well?
> >> > Also, currently I am trying with only 1 consumer thread. Does the
> >> equation
> >> > changes to
> >> > (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read
> with
> >> > 1000 threads from from topic2(1000 partitions)?
> >> >
> >> > -Pranay
> >> >
> >> > On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <
> natty@streamsets.com>
> >> > wrote:
> >> >
> >> > > Hi Pranay,
> >> > >
> >> > > I think the JIRA you're referencing is a bit orthogonal to the OOME
> >> that
> >> > > you're experiencing. Based on the stacktrace, it looks like your
> OOME
> >> is
> >> > > coming from a consumer request, which is attempting to allocate
> 200MB.
> >> > > There was a thread (relatively recently) that discussed what I
> think is
> >> > > your issue:
> >> > >
> >> > >
> >> > >
> >> >
> >>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5gA@mail.gmail.com%3E
> >> > >
> >> > > I suspect that the takeaway is that the way Kafka determines the
> >> required
> >> > > memory for a consumer request is (#partitions in the topic) x
> >> > > (replica.fetch.max.bytes), and seemingly you don't have enough
> memory
> >> > > allocated to handle that request. The solution is likely to increase
> >> the
> >> > > heap size on your brokers or to decrease your max fetch size.
> >> > >
> >> > > Thanks,
> >> > > Natty
> >> > >
> >> > > Jonathan "Natty" Natkins
> >> > > StreamSets | Customer Engagement Engineer
> >> > > mobile: 609.577.1600 | linkedin <
> http://www.linkedin.com/in/nattyice>
> >> > >
> >> > >
> >> > > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <
> >> > agarwalpranaya@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > Hi All,
> >> > > >
> >> > > > I have a kafka cluster setup which has 2 topics
> >> > > >
> >> > > > topic1 with 10 partitions
> >> > > > topic2 with 1000 partitions.
> >> > > >
> >> > > > While, I am able to consume messages from topic1 just fine, I get
> >> > > following
> >> > > > error from the topic2. There is a resolved issue here on the same
> >> thing
> >> > > > https://issues.apache.org/jira/browse/KAFKA-664
> >> > > >
> >> > > > I am using latest kafka server version, and I have used kafka
> command
> >> > > line
> >> > > > tools to consumer from the topics.
> >> > > >
> >> > > > >>>>
> >> > > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
> >> > > > (kafka.network.BoundedByteBufferReceive)
> >> > > >
> >> > > > java.lang.OutOfMemoryError: Java heap space
> >> > > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> >> > > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> >> > > > at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >> > > > at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >> > > > at
> >> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> >> > > > at
> >> > > >
> >> >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >> > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >> > > > <<<
> >> > > >
> >> > > >
> >> > > > Thanks
> >> > > > -Pranay
> >> > > >
> >> > >
> >> >
> >>
>

Re: Kafka Out of Memory error

Posted by Gwen Shapira <gs...@cloudera.com>.
Two things:
1. The OOM happened on the consumer, right? So the memory that matters
is the RAM on the consumer machine, not on the Kafka cluster nodes.

2. If the consumers belong to the same consumer group, each will
consume a subset of the partitions and will only need to allocate
memory for those partitions.

So, assuming all your consumers belong to the same group:
2 consumers  -> each has 500 partitions -> each uses 500MB.

The total remains 1GB no matter how many consumers you have, as long
as they are all in the same group.

If the consumer belong to different groups (i.e. they read copies of
the same messages from the same partitions), then yes, you are limited
to 8 per server (probably less because there are other stuff on the
server).

Gwen

On Mon, Jan 19, 2015 at 3:06 PM, Pranay Agarwal
<ag...@gmail.com> wrote:
> Thanks a lot Natty.
>
> I am using this Ruby gem on the client side with all the default config
> https://github.com/joekiller/jruby-kafka/blob/master/lib/jruby-kafka/group.rb
> and the value fetch.message.max.bytes is set to 1MB.
>
> Currently I only have 3 nodes setup in the Kafka cluster (with 8 GB RAM)
> and if 1 consumer if going to take 1000 partitions X 1mb ~ 1GB, does it
> mean 1 kafka node can at best support 8 consumer only? Also, when I do
> top/free on the Kafka cluster nodes (Both zookeeper and kafka is deployed
> on each 3 nodes of the cluster) I don't see lots of memory being used on
> the machine. Also, even with this calculation, I shouldn't be facing any
> issue with only 1 consumer, as I have 8GB of JVM space given to Kafka
> nodes, right?
>
> Thanks
> -Pranay
>
> On Mon, Jan 19, 2015 at 2:53 PM, Jonathan Natkins <na...@streamsets.com>
> wrote:
>
>> The fetch.message.max.size is actually a client-side configuration. With
>> regard to increasing the number of threads, I think the calculation may be
>> a little more subtle than what you're proposing, and frankly, it's unlikely
>> that your servers can handle allocating 200MB x 1000 threads = 200GB of
>> memory at a single time.
>>
>> I believe that if you have every partition on a single broker, and all of
>> your consumer threads are requesting data simultaneously, then yes, the
>> broker would attempt to allocate 200GB of heap, and probably you'll hit an
>> OOME. However, since each consumer is only reading from one partition,
>> those 1000 threads should be making requests that are spread out over the
>> entire Kafka cluster. Depending on the memory on your servers, you may need
>> to increase the number of brokers in your cluster to support the 1000
>> threads. For example, I would expect that you can support this with 10
>> brokers if each broker has something north of 20GB of heap allocated.
>>
>> Some of this is a little bit of guess work on my part, and I'm not super
>> confident of my numbers...Can anybody else on the list validate my math?
>>
>> Thanks,
>> Natty
>>
>> Jonathan "Natty" Natkins
>> StreamSets | Customer Engagement Engineer
>> mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
>>
>>
>> On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal <ag...@gmail.com>
>> wrote:
>>
>> > Thanks Natty.
>> >
>> > Is there any config which I need to change on the client side as well?
>> > Also, currently I am trying with only 1 consumer thread. Does the
>> equation
>> > changes to
>> > (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with
>> > 1000 threads from from topic2(1000 partitions)?
>> >
>> > -Pranay
>> >
>> > On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <na...@streamsets.com>
>> > wrote:
>> >
>> > > Hi Pranay,
>> > >
>> > > I think the JIRA you're referencing is a bit orthogonal to the OOME
>> that
>> > > you're experiencing. Based on the stacktrace, it looks like your OOME
>> is
>> > > coming from a consumer request, which is attempting to allocate 200MB.
>> > > There was a thread (relatively recently) that discussed what I think is
>> > > your issue:
>> > >
>> > >
>> > >
>> >
>> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5gA@mail.gmail.com%3E
>> > >
>> > > I suspect that the takeaway is that the way Kafka determines the
>> required
>> > > memory for a consumer request is (#partitions in the topic) x
>> > > (replica.fetch.max.bytes), and seemingly you don't have enough memory
>> > > allocated to handle that request. The solution is likely to increase
>> the
>> > > heap size on your brokers or to decrease your max fetch size.
>> > >
>> > > Thanks,
>> > > Natty
>> > >
>> > > Jonathan "Natty" Natkins
>> > > StreamSets | Customer Engagement Engineer
>> > > mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
>> > >
>> > >
>> > > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <
>> > agarwalpranaya@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi All,
>> > > >
>> > > > I have a kafka cluster setup which has 2 topics
>> > > >
>> > > > topic1 with 10 partitions
>> > > > topic2 with 1000 partitions.
>> > > >
>> > > > While, I am able to consume messages from topic1 just fine, I get
>> > > following
>> > > > error from the topic2. There is a resolved issue here on the same
>> thing
>> > > > https://issues.apache.org/jira/browse/KAFKA-664
>> > > >
>> > > > I am using latest kafka server version, and I have used kafka command
>> > > line
>> > > > tools to consumer from the topics.
>> > > >
>> > > > >>>>
>> > > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
>> > > > (kafka.network.BoundedByteBufferReceive)
>> > > >
>> > > > java.lang.OutOfMemoryError: Java heap space
>> > > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>> > > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>> > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>> > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> > > > at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>> > > > at
>> > > >
>> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>> > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> > > > <<<
>> > > >
>> > > >
>> > > > Thanks
>> > > > -Pranay
>> > > >
>> > >
>> >
>>

Re: Kafka Out of Memory error

Posted by Gwen Shapira <gs...@cloudera.com>.
Two things:
1. The OOM happened on the consumer, right? So the memory that matters
is the RAM on the consumer machine, not on the Kafka cluster nodes.

2. If the consumers belong to the same consumer group, each will
consume a subset of the partitions and will only need to allocate
memory for those partitions.

So, assuming all your consumers belong to the same group:
2 consumers  -> each has 500 partitions -> each uses 500MB.

The total remains 1GB no matter how many consumers you have, as long
as they are all in the same group.

If the consumer belong to different groups (i.e. they read copies of
the same messages from the same partitions), then yes, you are limited
to 8 per server (probably less because there are other stuff on the
server).

Gwen

On Mon, Jan 19, 2015 at 3:06 PM, Pranay Agarwal
<ag...@gmail.com> wrote:
> Thanks a lot Natty.
>
> I am using this Ruby gem on the client side with all the default config
> https://github.com/joekiller/jruby-kafka/blob/master/lib/jruby-kafka/group.rb
> and the value fetch.message.max.bytes is set to 1MB.
>
> Currently I only have 3 nodes setup in the Kafka cluster (with 8 GB RAM)
> and if 1 consumer if going to take 1000 partitions X 1mb ~ 1GB, does it
> mean 1 kafka node can at best support 8 consumer only? Also, when I do
> top/free on the Kafka cluster nodes (Both zookeeper and kafka is deployed
> on each 3 nodes of the cluster) I don't see lots of memory being used on
> the machine. Also, even with this calculation, I shouldn't be facing any
> issue with only 1 consumer, as I have 8GB of JVM space given to Kafka
> nodes, right?
>
> Thanks
> -Pranay
>
> On Mon, Jan 19, 2015 at 2:53 PM, Jonathan Natkins <na...@streamsets.com>
> wrote:
>
>> The fetch.message.max.size is actually a client-side configuration. With
>> regard to increasing the number of threads, I think the calculation may be
>> a little more subtle than what you're proposing, and frankly, it's unlikely
>> that your servers can handle allocating 200MB x 1000 threads = 200GB of
>> memory at a single time.
>>
>> I believe that if you have every partition on a single broker, and all of
>> your consumer threads are requesting data simultaneously, then yes, the
>> broker would attempt to allocate 200GB of heap, and probably you'll hit an
>> OOME. However, since each consumer is only reading from one partition,
>> those 1000 threads should be making requests that are spread out over the
>> entire Kafka cluster. Depending on the memory on your servers, you may need
>> to increase the number of brokers in your cluster to support the 1000
>> threads. For example, I would expect that you can support this with 10
>> brokers if each broker has something north of 20GB of heap allocated.
>>
>> Some of this is a little bit of guess work on my part, and I'm not super
>> confident of my numbers...Can anybody else on the list validate my math?
>>
>> Thanks,
>> Natty
>>
>> Jonathan "Natty" Natkins
>> StreamSets | Customer Engagement Engineer
>> mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
>>
>>
>> On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal <ag...@gmail.com>
>> wrote:
>>
>> > Thanks Natty.
>> >
>> > Is there any config which I need to change on the client side as well?
>> > Also, currently I am trying with only 1 consumer thread. Does the
>> equation
>> > changes to
>> > (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with
>> > 1000 threads from from topic2(1000 partitions)?
>> >
>> > -Pranay
>> >
>> > On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <na...@streamsets.com>
>> > wrote:
>> >
>> > > Hi Pranay,
>> > >
>> > > I think the JIRA you're referencing is a bit orthogonal to the OOME
>> that
>> > > you're experiencing. Based on the stacktrace, it looks like your OOME
>> is
>> > > coming from a consumer request, which is attempting to allocate 200MB.
>> > > There was a thread (relatively recently) that discussed what I think is
>> > > your issue:
>> > >
>> > >
>> > >
>> >
>> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5gA@mail.gmail.com%3E
>> > >
>> > > I suspect that the takeaway is that the way Kafka determines the
>> required
>> > > memory for a consumer request is (#partitions in the topic) x
>> > > (replica.fetch.max.bytes), and seemingly you don't have enough memory
>> > > allocated to handle that request. The solution is likely to increase
>> the
>> > > heap size on your brokers or to decrease your max fetch size.
>> > >
>> > > Thanks,
>> > > Natty
>> > >
>> > > Jonathan "Natty" Natkins
>> > > StreamSets | Customer Engagement Engineer
>> > > mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
>> > >
>> > >
>> > > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <
>> > agarwalpranaya@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi All,
>> > > >
>> > > > I have a kafka cluster setup which has 2 topics
>> > > >
>> > > > topic1 with 10 partitions
>> > > > topic2 with 1000 partitions.
>> > > >
>> > > > While, I am able to consume messages from topic1 just fine, I get
>> > > following
>> > > > error from the topic2. There is a resolved issue here on the same
>> thing
>> > > > https://issues.apache.org/jira/browse/KAFKA-664
>> > > >
>> > > > I am using latest kafka server version, and I have used kafka command
>> > > line
>> > > > tools to consumer from the topics.
>> > > >
>> > > > >>>>
>> > > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
>> > > > (kafka.network.BoundedByteBufferReceive)
>> > > >
>> > > > java.lang.OutOfMemoryError: Java heap space
>> > > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>> > > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>> > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>> > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> > > > at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>> > > > at
>> > > >
>> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>> > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> > > > <<<
>> > > >
>> > > >
>> > > > Thanks
>> > > > -Pranay
>> > > >
>> > >
>> >
>>

Re: Kafka Out of Memory error

Posted by Pranay Agarwal <ag...@gmail.com>.
Thanks a lot Natty.

I am using this Ruby gem on the client side with all the default config
https://github.com/joekiller/jruby-kafka/blob/master/lib/jruby-kafka/group.rb
and the value fetch.message.max.bytes is set to 1MB.

Currently I only have 3 nodes setup in the Kafka cluster (with 8 GB RAM)
and if 1 consumer if going to take 1000 partitions X 1mb ~ 1GB, does it
mean 1 kafka node can at best support 8 consumer only? Also, when I do
top/free on the Kafka cluster nodes (Both zookeeper and kafka is deployed
on each 3 nodes of the cluster) I don't see lots of memory being used on
the machine. Also, even with this calculation, I shouldn't be facing any
issue with only 1 consumer, as I have 8GB of JVM space given to Kafka
nodes, right?

Thanks
-Pranay

On Mon, Jan 19, 2015 at 2:53 PM, Jonathan Natkins <na...@streamsets.com>
wrote:

> The fetch.message.max.size is actually a client-side configuration. With
> regard to increasing the number of threads, I think the calculation may be
> a little more subtle than what you're proposing, and frankly, it's unlikely
> that your servers can handle allocating 200MB x 1000 threads = 200GB of
> memory at a single time.
>
> I believe that if you have every partition on a single broker, and all of
> your consumer threads are requesting data simultaneously, then yes, the
> broker would attempt to allocate 200GB of heap, and probably you'll hit an
> OOME. However, since each consumer is only reading from one partition,
> those 1000 threads should be making requests that are spread out over the
> entire Kafka cluster. Depending on the memory on your servers, you may need
> to increase the number of brokers in your cluster to support the 1000
> threads. For example, I would expect that you can support this with 10
> brokers if each broker has something north of 20GB of heap allocated.
>
> Some of this is a little bit of guess work on my part, and I'm not super
> confident of my numbers...Can anybody else on the list validate my math?
>
> Thanks,
> Natty
>
> Jonathan "Natty" Natkins
> StreamSets | Customer Engagement Engineer
> mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
>
>
> On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal <ag...@gmail.com>
> wrote:
>
> > Thanks Natty.
> >
> > Is there any config which I need to change on the client side as well?
> > Also, currently I am trying with only 1 consumer thread. Does the
> equation
> > changes to
> > (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with
> > 1000 threads from from topic2(1000 partitions)?
> >
> > -Pranay
> >
> > On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <na...@streamsets.com>
> > wrote:
> >
> > > Hi Pranay,
> > >
> > > I think the JIRA you're referencing is a bit orthogonal to the OOME
> that
> > > you're experiencing. Based on the stacktrace, it looks like your OOME
> is
> > > coming from a consumer request, which is attempting to allocate 200MB.
> > > There was a thread (relatively recently) that discussed what I think is
> > > your issue:
> > >
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5gA@mail.gmail.com%3E
> > >
> > > I suspect that the takeaway is that the way Kafka determines the
> required
> > > memory for a consumer request is (#partitions in the topic) x
> > > (replica.fetch.max.bytes), and seemingly you don't have enough memory
> > > allocated to handle that request. The solution is likely to increase
> the
> > > heap size on your brokers or to decrease your max fetch size.
> > >
> > > Thanks,
> > > Natty
> > >
> > > Jonathan "Natty" Natkins
> > > StreamSets | Customer Engagement Engineer
> > > mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
> > >
> > >
> > > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <
> > agarwalpranaya@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I have a kafka cluster setup which has 2 topics
> > > >
> > > > topic1 with 10 partitions
> > > > topic2 with 1000 partitions.
> > > >
> > > > While, I am able to consume messages from topic1 just fine, I get
> > > following
> > > > error from the topic2. There is a resolved issue here on the same
> thing
> > > > https://issues.apache.org/jira/browse/KAFKA-664
> > > >
> > > > I am using latest kafka server version, and I have used kafka command
> > > line
> > > > tools to consumer from the topics.
> > > >
> > > > >>>>
> > > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
> > > > (kafka.network.BoundedByteBufferReceive)
> > > >
> > > > java.lang.OutOfMemoryError: Java heap space
> > > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> > > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > > at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > > at
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > <<<
> > > >
> > > >
> > > > Thanks
> > > > -Pranay
> > > >
> > >
> >
>

Re: Kafka Out of Memory error

Posted by Pranay Agarwal <ag...@gmail.com>.
Thanks a lot Natty.

I am using this Ruby gem on the client side with all the default config
https://github.com/joekiller/jruby-kafka/blob/master/lib/jruby-kafka/group.rb
and the value fetch.message.max.bytes is set to 1MB.

Currently I only have 3 nodes setup in the Kafka cluster (with 8 GB RAM)
and if 1 consumer if going to take 1000 partitions X 1mb ~ 1GB, does it
mean 1 kafka node can at best support 8 consumer only? Also, when I do
top/free on the Kafka cluster nodes (Both zookeeper and kafka is deployed
on each 3 nodes of the cluster) I don't see lots of memory being used on
the machine. Also, even with this calculation, I shouldn't be facing any
issue with only 1 consumer, as I have 8GB of JVM space given to Kafka
nodes, right?

Thanks
-Pranay

On Mon, Jan 19, 2015 at 2:53 PM, Jonathan Natkins <na...@streamsets.com>
wrote:

> The fetch.message.max.size is actually a client-side configuration. With
> regard to increasing the number of threads, I think the calculation may be
> a little more subtle than what you're proposing, and frankly, it's unlikely
> that your servers can handle allocating 200MB x 1000 threads = 200GB of
> memory at a single time.
>
> I believe that if you have every partition on a single broker, and all of
> your consumer threads are requesting data simultaneously, then yes, the
> broker would attempt to allocate 200GB of heap, and probably you'll hit an
> OOME. However, since each consumer is only reading from one partition,
> those 1000 threads should be making requests that are spread out over the
> entire Kafka cluster. Depending on the memory on your servers, you may need
> to increase the number of brokers in your cluster to support the 1000
> threads. For example, I would expect that you can support this with 10
> brokers if each broker has something north of 20GB of heap allocated.
>
> Some of this is a little bit of guess work on my part, and I'm not super
> confident of my numbers...Can anybody else on the list validate my math?
>
> Thanks,
> Natty
>
> Jonathan "Natty" Natkins
> StreamSets | Customer Engagement Engineer
> mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
>
>
> On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal <ag...@gmail.com>
> wrote:
>
> > Thanks Natty.
> >
> > Is there any config which I need to change on the client side as well?
> > Also, currently I am trying with only 1 consumer thread. Does the
> equation
> > changes to
> > (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with
> > 1000 threads from from topic2(1000 partitions)?
> >
> > -Pranay
> >
> > On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <na...@streamsets.com>
> > wrote:
> >
> > > Hi Pranay,
> > >
> > > I think the JIRA you're referencing is a bit orthogonal to the OOME
> that
> > > you're experiencing. Based on the stacktrace, it looks like your OOME
> is
> > > coming from a consumer request, which is attempting to allocate 200MB.
> > > There was a thread (relatively recently) that discussed what I think is
> > > your issue:
> > >
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5gA@mail.gmail.com%3E
> > >
> > > I suspect that the takeaway is that the way Kafka determines the
> required
> > > memory for a consumer request is (#partitions in the topic) x
> > > (replica.fetch.max.bytes), and seemingly you don't have enough memory
> > > allocated to handle that request. The solution is likely to increase
> the
> > > heap size on your brokers or to decrease your max fetch size.
> > >
> > > Thanks,
> > > Natty
> > >
> > > Jonathan "Natty" Natkins
> > > StreamSets | Customer Engagement Engineer
> > > mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
> > >
> > >
> > > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <
> > agarwalpranaya@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I have a kafka cluster setup which has 2 topics
> > > >
> > > > topic1 with 10 partitions
> > > > topic2 with 1000 partitions.
> > > >
> > > > While, I am able to consume messages from topic1 just fine, I get
> > > following
> > > > error from the topic2. There is a resolved issue here on the same
> thing
> > > > https://issues.apache.org/jira/browse/KAFKA-664
> > > >
> > > > I am using latest kafka server version, and I have used kafka command
> > > line
> > > > tools to consumer from the topics.
> > > >
> > > > >>>>
> > > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
> > > > (kafka.network.BoundedByteBufferReceive)
> > > >
> > > > java.lang.OutOfMemoryError: Java heap space
> > > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> > > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > > at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > > at
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > <<<
> > > >
> > > >
> > > > Thanks
> > > > -Pranay
> > > >
> > >
> >
>

Re: Kafka Out of Memory error

Posted by Jonathan Natkins <na...@streamsets.com>.
The fetch.message.max.size is actually a client-side configuration. With
regard to increasing the number of threads, I think the calculation may be
a little more subtle than what you're proposing, and frankly, it's unlikely
that your servers can handle allocating 200MB x 1000 threads = 200GB of
memory at a single time.

I believe that if you have every partition on a single broker, and all of
your consumer threads are requesting data simultaneously, then yes, the
broker would attempt to allocate 200GB of heap, and probably you'll hit an
OOME. However, since each consumer is only reading from one partition,
those 1000 threads should be making requests that are spread out over the
entire Kafka cluster. Depending on the memory on your servers, you may need
to increase the number of brokers in your cluster to support the 1000
threads. For example, I would expect that you can support this with 10
brokers if each broker has something north of 20GB of heap allocated.

Some of this is a little bit of guess work on my part, and I'm not super
confident of my numbers...Can anybody else on the list validate my math?

Thanks,
Natty

Jonathan "Natty" Natkins
StreamSets | Customer Engagement Engineer
mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>


On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal <ag...@gmail.com>
wrote:

> Thanks Natty.
>
> Is there any config which I need to change on the client side as well?
> Also, currently I am trying with only 1 consumer thread. Does the equation
> changes to
> (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with
> 1000 threads from from topic2(1000 partitions)?
>
> -Pranay
>
> On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <na...@streamsets.com>
> wrote:
>
> > Hi Pranay,
> >
> > I think the JIRA you're referencing is a bit orthogonal to the OOME that
> > you're experiencing. Based on the stacktrace, it looks like your OOME is
> > coming from a consumer request, which is attempting to allocate 200MB.
> > There was a thread (relatively recently) that discussed what I think is
> > your issue:
> >
> >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5gA@mail.gmail.com%3E
> >
> > I suspect that the takeaway is that the way Kafka determines the required
> > memory for a consumer request is (#partitions in the topic) x
> > (replica.fetch.max.bytes), and seemingly you don't have enough memory
> > allocated to handle that request. The solution is likely to increase the
> > heap size on your brokers or to decrease your max fetch size.
> >
> > Thanks,
> > Natty
> >
> > Jonathan "Natty" Natkins
> > StreamSets | Customer Engagement Engineer
> > mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
> >
> >
> > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <
> agarwalpranaya@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > I have a kafka cluster setup which has 2 topics
> > >
> > > topic1 with 10 partitions
> > > topic2 with 1000 partitions.
> > >
> > > While, I am able to consume messages from topic1 just fine, I get
> > following
> > > error from the topic2. There is a resolved issue here on the same thing
> > > https://issues.apache.org/jira/browse/KAFKA-664
> > >
> > > I am using latest kafka server version, and I have used kafka command
> > line
> > > tools to consumer from the topics.
> > >
> > > >>>>
> > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
> > > (kafka.network.BoundedByteBufferReceive)
> > >
> > > java.lang.OutOfMemoryError: Java heap space
> > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > > at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > <<<
> > >
> > >
> > > Thanks
> > > -Pranay
> > >
> >
>

Re: Kafka Out of Memory error

Posted by Jonathan Natkins <na...@streamsets.com>.
The fetch.message.max.size is actually a client-side configuration. With
regard to increasing the number of threads, I think the calculation may be
a little more subtle than what you're proposing, and frankly, it's unlikely
that your servers can handle allocating 200MB x 1000 threads = 200GB of
memory at a single time.

I believe that if you have every partition on a single broker, and all of
your consumer threads are requesting data simultaneously, then yes, the
broker would attempt to allocate 200GB of heap, and probably you'll hit an
OOME. However, since each consumer is only reading from one partition,
those 1000 threads should be making requests that are spread out over the
entire Kafka cluster. Depending on the memory on your servers, you may need
to increase the number of brokers in your cluster to support the 1000
threads. For example, I would expect that you can support this with 10
brokers if each broker has something north of 20GB of heap allocated.

Some of this is a little bit of guess work on my part, and I'm not super
confident of my numbers...Can anybody else on the list validate my math?

Thanks,
Natty

Jonathan "Natty" Natkins
StreamSets | Customer Engagement Engineer
mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>


On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal <ag...@gmail.com>
wrote:

> Thanks Natty.
>
> Is there any config which I need to change on the client side as well?
> Also, currently I am trying with only 1 consumer thread. Does the equation
> changes to
> (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with
> 1000 threads from from topic2(1000 partitions)?
>
> -Pranay
>
> On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <na...@streamsets.com>
> wrote:
>
> > Hi Pranay,
> >
> > I think the JIRA you're referencing is a bit orthogonal to the OOME that
> > you're experiencing. Based on the stacktrace, it looks like your OOME is
> > coming from a consumer request, which is attempting to allocate 200MB.
> > There was a thread (relatively recently) that discussed what I think is
> > your issue:
> >
> >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5gA@mail.gmail.com%3E
> >
> > I suspect that the takeaway is that the way Kafka determines the required
> > memory for a consumer request is (#partitions in the topic) x
> > (replica.fetch.max.bytes), and seemingly you don't have enough memory
> > allocated to handle that request. The solution is likely to increase the
> > heap size on your brokers or to decrease your max fetch size.
> >
> > Thanks,
> > Natty
> >
> > Jonathan "Natty" Natkins
> > StreamSets | Customer Engagement Engineer
> > mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
> >
> >
> > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <
> agarwalpranaya@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > I have a kafka cluster setup which has 2 topics
> > >
> > > topic1 with 10 partitions
> > > topic2 with 1000 partitions.
> > >
> > > While, I am able to consume messages from topic1 just fine, I get
> > following
> > > error from the topic2. There is a resolved issue here on the same thing
> > > https://issues.apache.org/jira/browse/KAFKA-664
> > >
> > > I am using latest kafka server version, and I have used kafka command
> > line
> > > tools to consumer from the topics.
> > >
> > > >>>>
> > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
> > > (kafka.network.BoundedByteBufferReceive)
> > >
> > > java.lang.OutOfMemoryError: Java heap space
> > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > > at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > <<<
> > >
> > >
> > > Thanks
> > > -Pranay
> > >
> >
>

Re: Kafka Out of Memory error

Posted by Pranay Agarwal <ag...@gmail.com>.
Thanks Natty.

Is there any config which I need to change on the client side as well?
Also, currently I am trying with only 1 consumer thread. Does the equation
changes to
(#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with
1000 threads from from topic2(1000 partitions)?

-Pranay

On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <na...@streamsets.com>
wrote:

> Hi Pranay,
>
> I think the JIRA you're referencing is a bit orthogonal to the OOME that
> you're experiencing. Based on the stacktrace, it looks like your OOME is
> coming from a consumer request, which is attempting to allocate 200MB.
> There was a thread (relatively recently) that discussed what I think is
> your issue:
>
>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5gA@mail.gmail.com%3E
>
> I suspect that the takeaway is that the way Kafka determines the required
> memory for a consumer request is (#partitions in the topic) x
> (replica.fetch.max.bytes), and seemingly you don't have enough memory
> allocated to handle that request. The solution is likely to increase the
> heap size on your brokers or to decrease your max fetch size.
>
> Thanks,
> Natty
>
> Jonathan "Natty" Natkins
> StreamSets | Customer Engagement Engineer
> mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
>
>
> On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <ag...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > I have a kafka cluster setup which has 2 topics
> >
> > topic1 with 10 partitions
> > topic2 with 1000 partitions.
> >
> > While, I am able to consume messages from topic1 just fine, I get
> following
> > error from the topic2. There is a resolved issue here on the same thing
> > https://issues.apache.org/jira/browse/KAFKA-664
> >
> > I am using latest kafka server version, and I have used kafka command
> line
> > tools to consumer from the topics.
> >
> > >>>>
> > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
> > (kafka.network.BoundedByteBufferReceive)
> >
> > java.lang.OutOfMemoryError: Java heap space
> > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > <<<
> >
> >
> > Thanks
> > -Pranay
> >
>

Re: Kafka Out of Memory error

Posted by Pranay Agarwal <ag...@gmail.com>.
Thanks Natty.

Is there any config which I need to change on the client side as well?
Also, currently I am trying with only 1 consumer thread. Does the equation
changes to
(#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with
1000 threads from from topic2(1000 partitions)?

-Pranay

On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins <na...@streamsets.com>
wrote:

> Hi Pranay,
>
> I think the JIRA you're referencing is a bit orthogonal to the OOME that
> you're experiencing. Based on the stacktrace, it looks like your OOME is
> coming from a consumer request, which is attempting to allocate 200MB.
> There was a thread (relatively recently) that discussed what I think is
> your issue:
>
>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5gA@mail.gmail.com%3E
>
> I suspect that the takeaway is that the way Kafka determines the required
> memory for a consumer request is (#partitions in the topic) x
> (replica.fetch.max.bytes), and seemingly you don't have enough memory
> allocated to handle that request. The solution is likely to increase the
> heap size on your brokers or to decrease your max fetch size.
>
> Thanks,
> Natty
>
> Jonathan "Natty" Natkins
> StreamSets | Customer Engagement Engineer
> mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>
>
>
> On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <ag...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > I have a kafka cluster setup which has 2 topics
> >
> > topic1 with 10 partitions
> > topic2 with 1000 partitions.
> >
> > While, I am able to consume messages from topic1 just fine, I get
> following
> > error from the topic2. There is a resolved issue here on the same thing
> > https://issues.apache.org/jira/browse/KAFKA-664
> >
> > I am using latest kafka server version, and I have used kafka command
> line
> > tools to consumer from the topics.
> >
> > >>>>
> > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
> > (kafka.network.BoundedByteBufferReceive)
> >
> > java.lang.OutOfMemoryError: Java heap space
> > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > <<<
> >
> >
> > Thanks
> > -Pranay
> >
>

Re: Kafka Out of Memory error

Posted by Jonathan Natkins <na...@streamsets.com>.
Hi Pranay,

I think the JIRA you're referencing is a bit orthogonal to the OOME that
you're experiencing. Based on the stacktrace, it looks like your OOME is
coming from a consumer request, which is attempting to allocate 200MB.
There was a thread (relatively recently) that discussed what I think is
your issue:

http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5gA@mail.gmail.com%3E

I suspect that the takeaway is that the way Kafka determines the required
memory for a consumer request is (#partitions in the topic) x
(replica.fetch.max.bytes), and seemingly you don't have enough memory
allocated to handle that request. The solution is likely to increase the
heap size on your brokers or to decrease your max fetch size.

Thanks,
Natty

Jonathan "Natty" Natkins
StreamSets | Customer Engagement Engineer
mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>


On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <ag...@gmail.com>
wrote:

> Hi All,
>
> I have a kafka cluster setup which has 2 topics
>
> topic1 with 10 partitions
> topic2 with 1000 partitions.
>
> While, I am able to consume messages from topic1 just fine, I get following
> error from the topic2. There is a resolved issue here on the same thing
> https://issues.apache.org/jira/browse/KAFKA-664
>
> I am using latest kafka server version, and I have used kafka command line
> tools to consumer from the topics.
>
> >>>>
> [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
> (kafka.network.BoundedByteBufferReceive)
>
> java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> at
>
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> <<<
>
>
> Thanks
> -Pranay
>

Re: Kafka Out of Memory error

Posted by Jonathan Natkins <na...@streamsets.com>.
Hi Pranay,

I think the JIRA you're referencing is a bit orthogonal to the OOME that
you're experiencing. Based on the stacktrace, it looks like your OOME is
coming from a consumer request, which is attempting to allocate 200MB.
There was a thread (relatively recently) that discussed what I think is
your issue:

http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5gA@mail.gmail.com%3E

I suspect that the takeaway is that the way Kafka determines the required
memory for a consumer request is (#partitions in the topic) x
(replica.fetch.max.bytes), and seemingly you don't have enough memory
allocated to handle that request. The solution is likely to increase the
heap size on your brokers or to decrease your max fetch size.

Thanks,
Natty

Jonathan "Natty" Natkins
StreamSets | Customer Engagement Engineer
mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice>


On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <ag...@gmail.com>
wrote:

> Hi All,
>
> I have a kafka cluster setup which has 2 topics
>
> topic1 with 10 partitions
> topic2 with 1000 partitions.
>
> While, I am able to consume messages from topic1 just fine, I get following
> error from the topic2. There is a resolved issue here on the same thing
> https://issues.apache.org/jira/browse/KAFKA-664
>
> I am using latest kafka server version, and I have used kafka command line
> tools to consumer from the topics.
>
> >>>>
> [2015-01-19 22:08:10,758] ERROR OOME with size 201332748
> (kafka.network.BoundedByteBufferReceive)
>
> java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> at
>
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> <<<
>
>
> Thanks
> -Pranay
>