You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by francesco vigotti <vi...@gmail.com> on 2015/07/15 00:14:16 UTC

hidden producer memory usage ? does partition-filebacked-batched-async kafka producer make sense?

Hi,
I'm playing with kafka new producer to see if it could fit my use case,
kafka version 8.2.1
I'll probably end up having a kafka cluster of 5 nodes on multiple
datacenter
with one topic, with a replication factor of 2, and at least 10 partitions
required for consumer performance , ( I'll explain why only 10 later.. )
my producers are 10 or more also distributed on multiple datacenters around
the globe each producing about 2k messages/sec
message size is about 500 bytes*message

during my tests I have seen that when there is a network issue between one
of the producer and one of the partition leaders, the producer start to
accumulate data much more than the configured buffer.memory (configured at
about 40mb, which crash the jvm because Garbage collector cannot make space
to the application to work ( jvm memory is about 2 gb, working at about 1gb
usage always except when this problem occurr)
what i see is that i have about 500mb used by char[],

during this events, yes, the buffer usage increase from about 5 mb to about
30mb, and batches sends reach their maximum size to help to speedup the
transmission but seems to be not enought for memory usage / performance /
resilience requirements,



kafka.producer.acks=1
kafka.producer.buffer.memory=40000000
kafka.producer.compression.type=gzip
kafka.producer.retries=1
kafka.producer.batch.size=10000
kafka.producer.max.request.size=10000000
kafka.producer.send.buffer.bytes=1000000
kafka.producer.timeout.ms=10000
kafka.producer.reconnect.backoff.ms=500
kafka.producer.retry.backoff.ms=500
kafka.producer.block.on.buffer.full=false
kafka.producer.linger.ms=1000
kafka.producer.maxinflight=2
kafka.producer.serializer.class=kafka.serializer.DefaultEncoder

the main issue anyway is that when 1 producer get connections path issues
to 1 broker (packet loss/connection slowdown) ( which doesn't mean that the
broker is down , other producer could reach it , it's just a networking
problem between the two which coul last just 1 hours during peak time in
some routing paths) it crash the whole application due to jvm ram usage

I have found no solution playing with available params,

now reading around I have understood that description of internal produer
work:
producer get messages and return an async future ( but during first
connection it blocks before returing the future due to metadata fetching ,
but that's another strange story :) ) , then the message get queued in 1
queue per partition, then in a random order, but sequentially brokers are
contacted and all queue for the partitions assigned to that brokers are
sent, waiting the end of transmission before proceeding to the next broker
 but if the transmission to 1 broker hangs /slow down it does everything (
other queues grows ) and I don't understood where the buffer memory will be
used in this whole process, because to me seems that it there is something
else that is using memory when this happens

*to get more finer grained control over memory usage of kafka producer :*
a simple file-backed solutions that monitor the callbacks to monitor when
the producer hangs and stop sending to kafka producer more data when I know
that there are more than N messages around could be a partial solution, ie
could solve the memory usage but it's strange because for that the buffer
should be enought... is the buffer internally multiplied by partitions or
brokers ? or there is something else I'm not considering that could use so
much ram (10x the buffer size) ?

but also this solution will slow down the whole producer in transmissions
to all partitions leaders,

where N is the number of the partitions in  the queue,
I'm going to build something like N threaded solution where each thread
handle one queue for one partition ( handing 1 leader per thread would be
optimal but to avoid the need to handling of leader assignment/reelection 1
queue per partition is easier ) each thread have one producer which
obviously will have less batch-performance improvements because
multiple-partition-batches doesn't get aggregated to the assigned leader
but at least will have more batching than the sync producer, and if the
callback-tracking count of in-transmission messages reach some threshold I
start using the local-disk-storage as persistence for message to not give
the producer too much messages, and using the disk-fifo as sources for the
producer til it's size reach 0 again... at the end each thread will have
his file backed queue when slow down , and will process everything on ram
queues when run OK, but each producer will never be overloaded of messages ,
having a more finer grained control on producer memory usage could avoid
the callback-inflying-message-counter but the disk queue is anyway required
to avoid message discards on buffer full, and multiple producer are
required to avoid slow down of 1 producer when the problem is only between
that producer and one partition leader...

does all this make sense ?

Thank you :),
Francesco Vigotti

Re: hidden producer memory usage ? does partition-filebacked-batched-async kafka producer make sense?

Posted by tao xiao <xi...@gmail.com>.
The OOME issue may be caused
by org.apache.kafka.clients.producer.internals.ErrorLoggingCallback holding
unnecessary byte[] value. Can you apply the patch in below JIRA and try
again?

https://issues.apache.org/jira/browse/KAFKA-2281


On Wed, 15 Jul 2015 at 06:42 francesco vigotti <vi...@gmail.com>
wrote:

> I've not explained why only 10 partitions, anyway this is due to the fact
> that this does not speedup producer and also having this memory-monitoring
> problem and because I have no problems on the consumers side at the moment
> (10 should be enough even if I've not fully tested it yet ) and because the
> solution would be this partitions-separated-queues 10 seemed a fair number
> for me...
> also I'm considering the setup of a kind of producer-proxy instances,
> something that can be pushed with a batch of messages from a producer that
> I see that have communication problems to a leader and send this batch for
> him, this could help to mitigate the producer latency during such
> networking problems.. there isn't something like this yet right? an option
> in the producer that try to use a non-leader broker as broxy for the
> partition-leader broker would be optimal :) maybe with
> producer-to-singlebroker latency stats :)
>
> Thanks again and sorry for the verbosity :)
>
> On Tue, Jul 14, 2015 at 10:14 PM, francesco vigotti <
> vigotti.francesco@gmail.com> wrote:
>
> > Hi,
> > I'm playing with kafka new producer to see if it could fit my use case,
> > kafka version 8.2.1
> > I'll probably end up having a kafka cluster of 5 nodes on multiple
> > datacenter
> > with one topic, with a replication factor of 2, and at least 10
> partitions
> > required for consumer performance , ( I'll explain why only 10 later.. )
> > my producers are 10 or more also distributed on multiple datacenters
> > around the globe each producing about 2k messages/sec
> > message size is about 500 bytes*message
> >
> > during my tests I have seen that when there is a network issue between
> one
> > of the producer and one of the partition leaders, the producer start to
> > accumulate data much more than the configured buffer.memory (configured
> at
> > about 40mb, which crash the jvm because Garbage collector cannot make
> space
> > to the application to work ( jvm memory is about 2 gb, working at about
> 1gb
> > usage always except when this problem occurr)
> > what i see is that i have about 500mb used by char[],
> >
> > during this events, yes, the buffer usage increase from about 5 mb to
> > about 30mb, and batches sends reach their maximum size to help to speedup
> > the transmission but seems to be not enought for memory usage /
> performance
> > / resilience requirements,
> >
> >
> >
> > kafka.producer.acks=1
> > kafka.producer.buffer.memory=40000000
> > kafka.producer.compression.type=gzip
> > kafka.producer.retries=1
> > kafka.producer.batch.size=10000
> > kafka.producer.max.request.size=10000000
> > kafka.producer.send.buffer.bytes=1000000
> > kafka.producer.timeout.ms=10000
> > kafka.producer.reconnect.backoff.ms=500
> > kafka.producer.retry.backoff.ms=500
> > kafka.producer.block.on.buffer.full=false
> > kafka.producer.linger.ms=1000
> > kafka.producer.maxinflight=2
> > kafka.producer.serializer.class=kafka.serializer.DefaultEncoder
> >
> > the main issue anyway is that when 1 producer get connections path issues
> > to 1 broker (packet loss/connection slowdown) ( which doesn't mean that
> the
> > broker is down , other producer could reach it , it's just a networking
> > problem between the two which coul last just 1 hours during peak time in
> > some routing paths) it crash the whole application due to jvm ram usage
> >
> > I have found no solution playing with available params,
> >
> > now reading around I have understood that description of internal produer
> > work:
> > producer get messages and return an async future ( but during first
> > connection it blocks before returing the future due to metadata fetching
> ,
> > but that's another strange story :) ) , then the message get queued in 1
> > queue per partition, then in a random order, but sequentially brokers are
> > contacted and all queue for the partitions assigned to that brokers are
> > sent, waiting the end of transmission before proceeding to the next
> broker
> >  but if the transmission to 1 broker hangs /slow down it does everything
> (
> > other queues grows ) and I don't understood where the buffer memory will
> be
> > used in this whole process, because to me seems that it there is
> something
> > else that is using memory when this happens
> >
> > *to get more finer grained control over memory usage of kafka producer :*
> > a simple file-backed solutions that monitor the callbacks to monitor when
> > the producer hangs and stop sending to kafka producer more data when I
> know
> > that there are more than N messages around could be a partial solution,
> ie
> > could solve the memory usage but it's strange because for that the buffer
> > should be enought... is the buffer internally multiplied by partitions or
> > brokers ? or there is something else I'm not considering that could use
> so
> > much ram (10x the buffer size) ?
> >
> > but also this solution will slow down the whole producer in transmissions
> > to all partitions leaders,
> >
> > where N is the number of the partitions in  the queue,
> > I'm going to build something like N threaded solution where each thread
> > handle one queue for one partition ( handing 1 leader per thread would be
> > optimal but to avoid the need to handling of leader
> assignment/reelection 1
> > queue per partition is easier ) each thread have one producer which
> > obviously will have less batch-performance improvements because
> > multiple-partition-batches doesn't get aggregated to the assigned leader
> > but at least will have more batching than the sync producer, and if the
> > callback-tracking count of in-transmission messages reach some threshold
> I
> > start using the local-disk-storage as persistence for message to not give
> > the producer too much messages, and using the disk-fifo as sources for
> the
> > producer til it's size reach 0 again... at the end each thread will have
> > his file backed queue when slow down , and will process everything on ram
> > queues when run OK, but each producer will never be overloaded of
> messages ,
> > having a more finer grained control on producer memory usage could avoid
> > the callback-inflying-message-counter but the disk queue is anyway
> required
> > to avoid message discards on buffer full, and multiple producer are
> > required to avoid slow down of 1 producer when the problem is only
> between
> > that producer and one partition leader...
> >
> > does all this make sense ?
> >
> > Thank you :),
> > Francesco Vigotti
> >
>

Re: hidden producer memory usage ? does partition-filebacked-batched-async kafka producer make sense?

Posted by francesco vigotti <vi...@gmail.com>.
I've not explained why only 10 partitions, anyway this is due to the fact
that this does not speedup producer and also having this memory-monitoring
problem and because I have no problems on the consumers side at the moment
(10 should be enough even if I've not fully tested it yet ) and because the
solution would be this partitions-separated-queues 10 seemed a fair number
for me...
also I'm considering the setup of a kind of producer-proxy instances,
something that can be pushed with a batch of messages from a producer that
I see that have communication problems to a leader and send this batch for
him, this could help to mitigate the producer latency during such
networking problems.. there isn't something like this yet right? an option
in the producer that try to use a non-leader broker as broxy for the
partition-leader broker would be optimal :) maybe with
producer-to-singlebroker latency stats :)

Thanks again and sorry for the verbosity :)

On Tue, Jul 14, 2015 at 10:14 PM, francesco vigotti <
vigotti.francesco@gmail.com> wrote:

> Hi,
> I'm playing with kafka new producer to see if it could fit my use case,
> kafka version 8.2.1
> I'll probably end up having a kafka cluster of 5 nodes on multiple
> datacenter
> with one topic, with a replication factor of 2, and at least 10 partitions
> required for consumer performance , ( I'll explain why only 10 later.. )
> my producers are 10 or more also distributed on multiple datacenters
> around the globe each producing about 2k messages/sec
> message size is about 500 bytes*message
>
> during my tests I have seen that when there is a network issue between one
> of the producer and one of the partition leaders, the producer start to
> accumulate data much more than the configured buffer.memory (configured at
> about 40mb, which crash the jvm because Garbage collector cannot make space
> to the application to work ( jvm memory is about 2 gb, working at about 1gb
> usage always except when this problem occurr)
> what i see is that i have about 500mb used by char[],
>
> during this events, yes, the buffer usage increase from about 5 mb to
> about 30mb, and batches sends reach their maximum size to help to speedup
> the transmission but seems to be not enought for memory usage / performance
> / resilience requirements,
>
>
>
> kafka.producer.acks=1
> kafka.producer.buffer.memory=40000000
> kafka.producer.compression.type=gzip
> kafka.producer.retries=1
> kafka.producer.batch.size=10000
> kafka.producer.max.request.size=10000000
> kafka.producer.send.buffer.bytes=1000000
> kafka.producer.timeout.ms=10000
> kafka.producer.reconnect.backoff.ms=500
> kafka.producer.retry.backoff.ms=500
> kafka.producer.block.on.buffer.full=false
> kafka.producer.linger.ms=1000
> kafka.producer.maxinflight=2
> kafka.producer.serializer.class=kafka.serializer.DefaultEncoder
>
> the main issue anyway is that when 1 producer get connections path issues
> to 1 broker (packet loss/connection slowdown) ( which doesn't mean that the
> broker is down , other producer could reach it , it's just a networking
> problem between the two which coul last just 1 hours during peak time in
> some routing paths) it crash the whole application due to jvm ram usage
>
> I have found no solution playing with available params,
>
> now reading around I have understood that description of internal produer
> work:
> producer get messages and return an async future ( but during first
> connection it blocks before returing the future due to metadata fetching ,
> but that's another strange story :) ) , then the message get queued in 1
> queue per partition, then in a random order, but sequentially brokers are
> contacted and all queue for the partitions assigned to that brokers are
> sent, waiting the end of transmission before proceeding to the next broker
>  but if the transmission to 1 broker hangs /slow down it does everything (
> other queues grows ) and I don't understood where the buffer memory will be
> used in this whole process, because to me seems that it there is something
> else that is using memory when this happens
>
> *to get more finer grained control over memory usage of kafka producer :*
> a simple file-backed solutions that monitor the callbacks to monitor when
> the producer hangs and stop sending to kafka producer more data when I know
> that there are more than N messages around could be a partial solution, ie
> could solve the memory usage but it's strange because for that the buffer
> should be enought... is the buffer internally multiplied by partitions or
> brokers ? or there is something else I'm not considering that could use so
> much ram (10x the buffer size) ?
>
> but also this solution will slow down the whole producer in transmissions
> to all partitions leaders,
>
> where N is the number of the partitions in  the queue,
> I'm going to build something like N threaded solution where each thread
> handle one queue for one partition ( handing 1 leader per thread would be
> optimal but to avoid the need to handling of leader assignment/reelection 1
> queue per partition is easier ) each thread have one producer which
> obviously will have less batch-performance improvements because
> multiple-partition-batches doesn't get aggregated to the assigned leader
> but at least will have more batching than the sync producer, and if the
> callback-tracking count of in-transmission messages reach some threshold I
> start using the local-disk-storage as persistence for message to not give
> the producer too much messages, and using the disk-fifo as sources for the
> producer til it's size reach 0 again... at the end each thread will have
> his file backed queue when slow down , and will process everything on ram
> queues when run OK, but each producer will never be overloaded of messages ,
> having a more finer grained control on producer memory usage could avoid
> the callback-inflying-message-counter but the disk queue is anyway required
> to avoid message discards on buffer full, and multiple producer are
> required to avoid slow down of 1 producer when the problem is only between
> that producer and one partition leader...
>
> does all this make sense ?
>
> Thank you :),
> Francesco Vigotti
>