You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Anand Somani <me...@gmail.com> on 2015/02/24 22:57:09 UTC

"at least once" consumer recommendations for a load of 5 K messages/second

Hi,

It is a little long, since I wanted to explain the use case and then ask
questions, so thanks for your attention

Use case:

We have a use case where everything in the queue has to be consumed at
least once. So the consumer has to have "consumed" (saved in some
destination database) the message before confirming consumption to kafka
(or ZK). Now it is possible and from what I have read so far we will have
consumer groups and partitions. Here are some facts/numbers for our case

* We will potentially have messages with peaks of 5k /second.
* We can play with the message size if that makes any difference (keep it <
100 bytes for a link or put the entire message avg size of 2-5K bytes).
* We do not need replication, but might have a kafka cluster to handle the
load.
* Also work consumption will take anywhere from 300-500ms, generally we
would like the consumer to be not behind by more than 1-2 minutes. So if
the message shows up in a queue, it should show up in the database within 2
minutes.

The questions I have are
  * If this has been covered before, please point me to it. Thanks
  * Is that possible/recommended "controlled commit per consumed message"
for this load (have read about some concerns on ZK issues)?
  * Are there any recommendations on configurations in terms of partitions
to number of messages OR consumers? Maybe more queues/topics
  * Anything else that we might need to watch out for?
  * As for the client, I should be able to do this (control when the offset
commit happens) with high level consumer I suppose?


Thanks
Anand

Re: "at least once" consumer recommendations for a load of 5 K messages/second

Posted by Anand Somani <me...@gmail.com>.
Thanks a bunch for the detailed response and tips!! Looks like I have a
couple of knobs one of which should work, I will be doing some runs to
figure out what works best for my use case.

Thanks again.

On Thu, Feb 26, 2015 at 9:03 AM, Jeff Wartes <jw...@whitepages.com> wrote:

>
> A note on throughput with an at-least-once guarantee using the high-level
> consumer:
>
> The core unit of concurrency in kafka is the partition, because you can't
> have more clients than partitions. Although you can ask for two messages
> from a given client instance and process those in parallel, the commit
> semantics mean once you've asked for two messages, you *must* handle
> *both* of them successfully before you commit. If you commit before you’ve
> dealt with them both, you risk losing your at-least-once guarantee. It's
> easier to use single-threaded client message processing than manage
> outstanding messages if you can get away with it. You just get a message,
> process it, and repeat. (You'd probably still want to refrain from calling
> commit after every message though)
>
> Unfortunately, you've suggested processing a message takes some 400ms, so
> that’s 2.5 messages/sec/consumer. To get 5k/messages/sec, you’d need 2000
> partitions (clients) if you're using the partition as the unit of
> concurrency.
>
> Unfortunately again, the neat automatic partition assignment in the
> high-level consumer starts to bog down as the number of partitions goes
> up. My coworker found he was waiting more than 15 minutes for assignment
> to converge using 1000 partitions, and you suffer that whenever a consumer
> is created or destroyed.
>
>
> So you'll probably need to either handle batches of messages from one
> consumer instance concurrently, or use the lower-level consumer and handle
> partition assignment and broker failure cases yourself. Some tips if you
> want to stick with the high-level consumer:
>
> - Use a dedicated thread to manage a single kafka consumer. Don't use
> multiple threads talking to the same consumer instance. Load a batch of
> messages into a thread-safe data structure, and work with that to get your
> message processing concurrency.
> - Once you've pulled a batch of messages from the consumer, you must
> handle ALL of those messages before you can commit, or ask for more
> messages. Put another way, you can only commit when every message you've
> pulled has been handled to your satisfaction.
> - You'll still want a reasonable number of partitions. The partition count
> becomes the unit of max batch processing concurrency, rather than then
> message processing concurrency. You'll still need to do similar math to
> figure how many partitions you need based on how long it takes to process
> a batch. If you're processing every message in the batch in parallel, the
> time it takes to process the batch is the max of the time it took to
> process any message in the batch.
>
>
>
>
>
> On 2/25/15, 9:58 AM, "Gwen Shapira" <gs...@cloudera.com> wrote:
>
> >I don't have good numbers, but I noticed that I usually scale number of
> >partitions by the consumer rates and not by producer rate.
> >
> >Writing to HDFS can be a bit slow (30MB/s is pretty typical, IIRC), so if
> >I
> >need to write 5G a second, I need at least 15 consumers, which means at
> >least 15 partitions. Hopefully your consumers will be doing better. Maybe
> >your bottleneck will be 1gE network speed. Who knows?
> >
> >Small scale benchmark on your specific setup can go a long way in capacity
> >planning :)
> >
> >Gwen
> >
> >On Wed, Feb 25, 2015 at 9:45 AM, Anand Somani <me...@gmail.com>
> >wrote:
> >
> >> Sweet! that I would not depend on ZK more consumption anymore. Thanks
> >>for
> >> the response Gwen, I will take a look at the link you have provided.
> >>
> >> From what I have read so far, for my scenario to work correctly I would
> >> have multiple partitions and a consumer per partition, is that correct?
> >>So
> >> for me to be able to improve throughput on the consumer, will need to
> >>play
> >> with the number of partitions. Is there any recommendation on that ratio
> >> partition/topic or that can be scaled up/out with powerful/more
> >>hardware?
> >>
> >> Thanks
> >> Anand
> >>
> >> On Tue, Feb 24, 2015 at 8:11 PM, Gwen Shapira <gs...@cloudera.com>
> >> wrote:
> >>
> >> > * ZK was not built for 5K/s writes type of load
> >> > * Kafka 0.8.2.0 allows you to commit messages to Kafka rather than
> >>ZK. I
> >> > believe this is recommended.
> >> > * You can also commit batches of messages (i.e. commit every 100
> >> messages).
> >> > This will reduce the writes and give you at least once while
> >>controlling
> >> > number of duplicates in case of failure.
> >> > * Yes, can be done in high level consumer. I give few tips here:
> >> >
> >> >
> >>
> >>
> http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missin
> >>g-pieces/
> >> >
> >> > Gwen
> >> >
> >> > On Tue, Feb 24, 2015 at 1:57 PM, Anand Somani <me...@gmail.com>
> >> > wrote:
> >> >
> >> > > Hi,
> >> > >
> >> > > It is a little long, since I wanted to explain the use case and then
> >> ask
> >> > > questions, so thanks for your attention
> >> > >
> >> > > Use case:
> >> > >
> >> > > We have a use case where everything in the queue has to be consumed
> >>at
> >> > > least once. So the consumer has to have "consumed" (saved in some
> >> > > destination database) the message before confirming consumption to
> >> kafka
> >> > > (or ZK). Now it is possible and from what I have read so far we will
> >> have
> >> > > consumer groups and partitions. Here are some facts/numbers for our
> >> case
> >> > >
> >> > > * We will potentially have messages with peaks of 5k /second.
> >> > > * We can play with the message size if that makes any difference
> >>(keep
> >> > it <
> >> > > 100 bytes for a link or put the entire message avg size of 2-5K
> >>bytes).
> >> > > * We do not need replication, but might have a kafka cluster to
> >>handle
> >> > the
> >> > > load.
> >> > > * Also work consumption will take anywhere from 300-500ms,
> >>generally we
> >> > > would like the consumer to be not behind by more than 1-2 minutes.
> >>So
> >> if
> >> > > the message shows up in a queue, it should show up in the database
> >> > within 2
> >> > > minutes.
> >> > >
> >> > > The questions I have are
> >> > >   * If this has been covered before, please point me to it. Thanks
> >> > >   * Is that possible/recommended "controlled commit per consumed
> >> message"
> >> > > for this load (have read about some concerns on ZK issues)?
> >> > >   * Are there any recommendations on configurations in terms of
> >> > partitions
> >> > > to number of messages OR consumers? Maybe more queues/topics
> >> > >   * Anything else that we might need to watch out for?
> >> > >   * As for the client, I should be able to do this (control when the
> >> > offset
> >> > > commit happens) with high level consumer I suppose?
> >> > >
> >> > >
> >> > > Thanks
> >> > > Anand
> >> > >
> >> >
> >>
>
>

Re: "at least once" consumer recommendations for a load of 5 K messages/second

Posted by Jeff Wartes <jw...@whitepages.com>.
A note on throughput with an at-least-once guarantee using the high-level
consumer: 

The core unit of concurrency in kafka is the partition, because you can't
have more clients than partitions. Although you can ask for two messages
from a given client instance and process those in parallel, the commit
semantics mean once you've asked for two messages, you *must* handle
*both* of them successfully before you commit. If you commit before you’ve
dealt with them both, you risk losing your at-least-once guarantee. It's
easier to use single-threaded client message processing than manage
outstanding messages if you can get away with it. You just get a message,
process it, and repeat. (You'd probably still want to refrain from calling
commit after every message though)

Unfortunately, you've suggested processing a message takes some 400ms, so
that’s 2.5 messages/sec/consumer. To get 5k/messages/sec, you’d need 2000
partitions (clients) if you're using the partition as the unit of
concurrency.

Unfortunately again, the neat automatic partition assignment in the
high-level consumer starts to bog down as the number of partitions goes
up. My coworker found he was waiting more than 15 minutes for assignment
to converge using 1000 partitions, and you suffer that whenever a consumer
is created or destroyed.


So you'll probably need to either handle batches of messages from one
consumer instance concurrently, or use the lower-level consumer and handle
partition assignment and broker failure cases yourself. Some tips if you
want to stick with the high-level consumer:

- Use a dedicated thread to manage a single kafka consumer. Don't use
multiple threads talking to the same consumer instance. Load a batch of
messages into a thread-safe data structure, and work with that to get your
message processing concurrency.
- Once you've pulled a batch of messages from the consumer, you must
handle ALL of those messages before you can commit, or ask for more
messages. Put another way, you can only commit when every message you've
pulled has been handled to your satisfaction.
- You'll still want a reasonable number of partitions. The partition count
becomes the unit of max batch processing concurrency, rather than then
message processing concurrency. You'll still need to do similar math to
figure how many partitions you need based on how long it takes to process
a batch. If you're processing every message in the batch in parallel, the
time it takes to process the batch is the max of the time it took to
process any message in the batch.





On 2/25/15, 9:58 AM, "Gwen Shapira" <gs...@cloudera.com> wrote:

>I don't have good numbers, but I noticed that I usually scale number of
>partitions by the consumer rates and not by producer rate.
>
>Writing to HDFS can be a bit slow (30MB/s is pretty typical, IIRC), so if
>I
>need to write 5G a second, I need at least 15 consumers, which means at
>least 15 partitions. Hopefully your consumers will be doing better. Maybe
>your bottleneck will be 1gE network speed. Who knows?
>
>Small scale benchmark on your specific setup can go a long way in capacity
>planning :)
>
>Gwen
>
>On Wed, Feb 25, 2015 at 9:45 AM, Anand Somani <me...@gmail.com>
>wrote:
>
>> Sweet! that I would not depend on ZK more consumption anymore. Thanks
>>for
>> the response Gwen, I will take a look at the link you have provided.
>>
>> From what I have read so far, for my scenario to work correctly I would
>> have multiple partitions and a consumer per partition, is that correct?
>>So
>> for me to be able to improve throughput on the consumer, will need to
>>play
>> with the number of partitions. Is there any recommendation on that ratio
>> partition/topic or that can be scaled up/out with powerful/more
>>hardware?
>>
>> Thanks
>> Anand
>>
>> On Tue, Feb 24, 2015 at 8:11 PM, Gwen Shapira <gs...@cloudera.com>
>> wrote:
>>
>> > * ZK was not built for 5K/s writes type of load
>> > * Kafka 0.8.2.0 allows you to commit messages to Kafka rather than
>>ZK. I
>> > believe this is recommended.
>> > * You can also commit batches of messages (i.e. commit every 100
>> messages).
>> > This will reduce the writes and give you at least once while
>>controlling
>> > number of duplicates in case of failure.
>> > * Yes, can be done in high level consumer. I give few tips here:
>> >
>> >
>> 
>>http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missin
>>g-pieces/
>> >
>> > Gwen
>> >
>> > On Tue, Feb 24, 2015 at 1:57 PM, Anand Somani <me...@gmail.com>
>> > wrote:
>> >
>> > > Hi,
>> > >
>> > > It is a little long, since I wanted to explain the use case and then
>> ask
>> > > questions, so thanks for your attention
>> > >
>> > > Use case:
>> > >
>> > > We have a use case where everything in the queue has to be consumed
>>at
>> > > least once. So the consumer has to have "consumed" (saved in some
>> > > destination database) the message before confirming consumption to
>> kafka
>> > > (or ZK). Now it is possible and from what I have read so far we will
>> have
>> > > consumer groups and partitions. Here are some facts/numbers for our
>> case
>> > >
>> > > * We will potentially have messages with peaks of 5k /second.
>> > > * We can play with the message size if that makes any difference
>>(keep
>> > it <
>> > > 100 bytes for a link or put the entire message avg size of 2-5K
>>bytes).
>> > > * We do not need replication, but might have a kafka cluster to
>>handle
>> > the
>> > > load.
>> > > * Also work consumption will take anywhere from 300-500ms,
>>generally we
>> > > would like the consumer to be not behind by more than 1-2 minutes.
>>So
>> if
>> > > the message shows up in a queue, it should show up in the database
>> > within 2
>> > > minutes.
>> > >
>> > > The questions I have are
>> > >   * If this has been covered before, please point me to it. Thanks
>> > >   * Is that possible/recommended "controlled commit per consumed
>> message"
>> > > for this load (have read about some concerns on ZK issues)?
>> > >   * Are there any recommendations on configurations in terms of
>> > partitions
>> > > to number of messages OR consumers? Maybe more queues/topics
>> > >   * Anything else that we might need to watch out for?
>> > >   * As for the client, I should be able to do this (control when the
>> > offset
>> > > commit happens) with high level consumer I suppose?
>> > >
>> > >
>> > > Thanks
>> > > Anand
>> > >
>> >
>>


Re: "at least once" consumer recommendations for a load of 5 K messages/second

Posted by Gwen Shapira <gs...@cloudera.com>.
I don't have good numbers, but I noticed that I usually scale number of
partitions by the consumer rates and not by producer rate.

Writing to HDFS can be a bit slow (30MB/s is pretty typical, IIRC), so if I
need to write 5G a second, I need at least 15 consumers, which means at
least 15 partitions. Hopefully your consumers will be doing better. Maybe
your bottleneck will be 1gE network speed. Who knows?

Small scale benchmark on your specific setup can go a long way in capacity
planning :)

Gwen

On Wed, Feb 25, 2015 at 9:45 AM, Anand Somani <me...@gmail.com> wrote:

> Sweet! that I would not depend on ZK more consumption anymore. Thanks for
> the response Gwen, I will take a look at the link you have provided.
>
> From what I have read so far, for my scenario to work correctly I would
> have multiple partitions and a consumer per partition, is that correct? So
> for me to be able to improve throughput on the consumer, will need to play
> with the number of partitions. Is there any recommendation on that ratio
> partition/topic or that can be scaled up/out with powerful/more hardware?
>
> Thanks
> Anand
>
> On Tue, Feb 24, 2015 at 8:11 PM, Gwen Shapira <gs...@cloudera.com>
> wrote:
>
> > * ZK was not built for 5K/s writes type of load
> > * Kafka 0.8.2.0 allows you to commit messages to Kafka rather than ZK. I
> > believe this is recommended.
> > * You can also commit batches of messages (i.e. commit every 100
> messages).
> > This will reduce the writes and give you at least once while controlling
> > number of duplicates in case of failure.
> > * Yes, can be done in high level consumer. I give few tips here:
> >
> >
> http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/
> >
> > Gwen
> >
> > On Tue, Feb 24, 2015 at 1:57 PM, Anand Somani <me...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > It is a little long, since I wanted to explain the use case and then
> ask
> > > questions, so thanks for your attention
> > >
> > > Use case:
> > >
> > > We have a use case where everything in the queue has to be consumed at
> > > least once. So the consumer has to have "consumed" (saved in some
> > > destination database) the message before confirming consumption to
> kafka
> > > (or ZK). Now it is possible and from what I have read so far we will
> have
> > > consumer groups and partitions. Here are some facts/numbers for our
> case
> > >
> > > * We will potentially have messages with peaks of 5k /second.
> > > * We can play with the message size if that makes any difference (keep
> > it <
> > > 100 bytes for a link or put the entire message avg size of 2-5K bytes).
> > > * We do not need replication, but might have a kafka cluster to handle
> > the
> > > load.
> > > * Also work consumption will take anywhere from 300-500ms, generally we
> > > would like the consumer to be not behind by more than 1-2 minutes. So
> if
> > > the message shows up in a queue, it should show up in the database
> > within 2
> > > minutes.
> > >
> > > The questions I have are
> > >   * If this has been covered before, please point me to it. Thanks
> > >   * Is that possible/recommended "controlled commit per consumed
> message"
> > > for this load (have read about some concerns on ZK issues)?
> > >   * Are there any recommendations on configurations in terms of
> > partitions
> > > to number of messages OR consumers? Maybe more queues/topics
> > >   * Anything else that we might need to watch out for?
> > >   * As for the client, I should be able to do this (control when the
> > offset
> > > commit happens) with high level consumer I suppose?
> > >
> > >
> > > Thanks
> > > Anand
> > >
> >
>

Re: "at least once" consumer recommendations for a load of 5 K messages/second

Posted by Anand Somani <me...@gmail.com>.
Sweet! that I would not depend on ZK more consumption anymore. Thanks for
the response Gwen, I will take a look at the link you have provided.

>From what I have read so far, for my scenario to work correctly I would
have multiple partitions and a consumer per partition, is that correct? So
for me to be able to improve throughput on the consumer, will need to play
with the number of partitions. Is there any recommendation on that ratio
partition/topic or that can be scaled up/out with powerful/more hardware?

Thanks
Anand

On Tue, Feb 24, 2015 at 8:11 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> * ZK was not built for 5K/s writes type of load
> * Kafka 0.8.2.0 allows you to commit messages to Kafka rather than ZK. I
> believe this is recommended.
> * You can also commit batches of messages (i.e. commit every 100 messages).
> This will reduce the writes and give you at least once while controlling
> number of duplicates in case of failure.
> * Yes, can be done in high level consumer. I give few tips here:
>
> http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/
>
> Gwen
>
> On Tue, Feb 24, 2015 at 1:57 PM, Anand Somani <me...@gmail.com>
> wrote:
>
> > Hi,
> >
> > It is a little long, since I wanted to explain the use case and then ask
> > questions, so thanks for your attention
> >
> > Use case:
> >
> > We have a use case where everything in the queue has to be consumed at
> > least once. So the consumer has to have "consumed" (saved in some
> > destination database) the message before confirming consumption to kafka
> > (or ZK). Now it is possible and from what I have read so far we will have
> > consumer groups and partitions. Here are some facts/numbers for our case
> >
> > * We will potentially have messages with peaks of 5k /second.
> > * We can play with the message size if that makes any difference (keep
> it <
> > 100 bytes for a link or put the entire message avg size of 2-5K bytes).
> > * We do not need replication, but might have a kafka cluster to handle
> the
> > load.
> > * Also work consumption will take anywhere from 300-500ms, generally we
> > would like the consumer to be not behind by more than 1-2 minutes. So if
> > the message shows up in a queue, it should show up in the database
> within 2
> > minutes.
> >
> > The questions I have are
> >   * If this has been covered before, please point me to it. Thanks
> >   * Is that possible/recommended "controlled commit per consumed message"
> > for this load (have read about some concerns on ZK issues)?
> >   * Are there any recommendations on configurations in terms of
> partitions
> > to number of messages OR consumers? Maybe more queues/topics
> >   * Anything else that we might need to watch out for?
> >   * As for the client, I should be able to do this (control when the
> offset
> > commit happens) with high level consumer I suppose?
> >
> >
> > Thanks
> > Anand
> >
>

Re: "at least once" consumer recommendations for a load of 5 K messages/second

Posted by Gwen Shapira <gs...@cloudera.com>.
* ZK was not built for 5K/s writes type of load
* Kafka 0.8.2.0 allows you to commit messages to Kafka rather than ZK. I
believe this is recommended.
* You can also commit batches of messages (i.e. commit every 100 messages).
This will reduce the writes and give you at least once while controlling
number of duplicates in case of failure.
* Yes, can be done in high level consumer. I give few tips here:
http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/

Gwen

On Tue, Feb 24, 2015 at 1:57 PM, Anand Somani <me...@gmail.com> wrote:

> Hi,
>
> It is a little long, since I wanted to explain the use case and then ask
> questions, so thanks for your attention
>
> Use case:
>
> We have a use case where everything in the queue has to be consumed at
> least once. So the consumer has to have "consumed" (saved in some
> destination database) the message before confirming consumption to kafka
> (or ZK). Now it is possible and from what I have read so far we will have
> consumer groups and partitions. Here are some facts/numbers for our case
>
> * We will potentially have messages with peaks of 5k /second.
> * We can play with the message size if that makes any difference (keep it <
> 100 bytes for a link or put the entire message avg size of 2-5K bytes).
> * We do not need replication, but might have a kafka cluster to handle the
> load.
> * Also work consumption will take anywhere from 300-500ms, generally we
> would like the consumer to be not behind by more than 1-2 minutes. So if
> the message shows up in a queue, it should show up in the database within 2
> minutes.
>
> The questions I have are
>   * If this has been covered before, please point me to it. Thanks
>   * Is that possible/recommended "controlled commit per consumed message"
> for this load (have read about some concerns on ZK issues)?
>   * Are there any recommendations on configurations in terms of partitions
> to number of messages OR consumers? Maybe more queues/topics
>   * Anything else that we might need to watch out for?
>   * As for the client, I should be able to do this (control when the offset
> commit happens) with high level consumer I suppose?
>
>
> Thanks
> Anand
>