You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Stefan Miklosovic <mi...@gmail.com> on 2015/07/15 21:50:31 UTC

Kafka partitioning is pretty much broken

I have following problem, I tried almost everything I could but without any luck

All I want to do is to have 1 producer, 1 topic, 10 partitions and 10 consumers.

All I want is to send 1M of messages via producer to these 10 consumers.

I am using built Kafka 0.8.3 from current upstream so I have bleeding
edge stuff. It does not work on 0.8.1.1 nor 0.8.2 stream.

The problem I have is that I expect that when I send 1 milion of
messages via that producer, I will have all consumers busy. In other
words, if a message to be sent via producer is sent to partition
randomly (roundrobin / range), I expect that all 10 consumers will
process about 100k of messages each because producer sends it to
random partition of these 10.

But I have never achieved such outcome.

I was trying these combinations:

1) old scala producer vs old scala consumer

Consumer was created by Consumers.createJavaConsumer() ten times.
Every consumer is running in the separate thread.

2) old scala producer vs new java consumer

new consumer was used like I have 10 consumers listening for a topic
and 10 consumers subscribed to 1 partition. (consumer 1 - partition 1,
consumer 2 - paritition 2 and so on)

3) old scala producer with custom partitioner

I even tried to use my own partitioner, I just generated a random
number from 0 to 9 so I expected that the messages will be sent
randomly to the partition of that number.

All I see is that there are only couple of consumers from these 10
utilized, even I am sending 1M of messages, all I got from the
debugging output is some preselected set of consumers which appear to
be selected randomly.

Do you have ANY hint why all consumers are not utilized even
partitions are selected randomly?

My initial suspicion was that rebalancing was done badly. The think
was I was generating old consumers in a loop quicky one after another
and I can imaging that rebalancing algorithm got mad.

So I abandon this solution and I was thinking that let's just
subscribe these consumers one by one to some partition so I will have
1 consumer subscribed just to 1 partition and there will not be any
rebalancing at all.

Oh my how wrong was I ... nothing changed.

So I was thinking that if I have 10 consumers, each one subscribed to
1 paritition, maybe producer is just sending messages to some set of
partitions and that's it. I  was not sure how this can be possible so
to be super sure about the even spreading of message to partitions, I
used custom partitioner class in old consumer so I will be sure that
the partition the message will be sent to is super random.

But that does not seems to work either.

Please people, help me.

-- 
Stefan Miklosovic

Re: Kafka partitioning is pretty much broken

Posted by Stefan Miklosovic <mi...@gmail.com>.
Nice one! That might be it as well. Do you have an idea what is that
configuration parameter called?

On Thu, Jul 16, 2015 at 12:53 AM, JIEFU GONG <jg...@berkeley.edu> wrote:
> This is a total shot in the dark here so please ignore this if it fails to
> make sense, but I remember that on some previous implementation of the
> producer prior to when round-robin was enabled, producers would send
> messages to only one of the partitions for a set period of time
> (configurable, I believe) before moving onto the next one. This caused me a
> similar grievance as I would notice only a few of my consumers would get
> data while others were completely idle.
>
> Sounds similar, so check if that's a possibility at all?
>
> On Wed, Jul 15, 2015 at 3:04 PM, Jagbir Hooda <jh...@gmail.com> wrote:
>
>> Hi Stefan,
>>
>> Have you looked at the following output for message distribution
>> across the topic-partitions and which topic-partition is consumed by
>> which consumer thread?
>>
>> kafaka-server/bin>./kafka-run-class.sh
>> kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group
>> <consumer_group_name>
>>
>> Jagbir
>>
>> On Wed, Jul 15, 2015 at 12:50 PM, Stefan Miklosovic
>> <mi...@gmail.com> wrote:
>> > I have following problem, I tried almost everything I could but without
>> any luck
>> >
>> > All I want to do is to have 1 producer, 1 topic, 10 partitions and 10
>> consumers.
>> >
>> > All I want is to send 1M of messages via producer to these 10 consumers.
>> >
>> > I am using built Kafka 0.8.3 from current upstream so I have bleeding
>> > edge stuff. It does not work on 0.8.1.1 nor 0.8.2 stream.
>> >
>> > The problem I have is that I expect that when I send 1 milion of
>> > messages via that producer, I will have all consumers busy. In other
>> > words, if a message to be sent via producer is sent to partition
>> > randomly (roundrobin / range), I expect that all 10 consumers will
>> > process about 100k of messages each because producer sends it to
>> > random partition of these 10.
>> >
>> > But I have never achieved such outcome.
>> >
>> > I was trying these combinations:
>> >
>> > 1) old scala producer vs old scala consumer
>> >
>> > Consumer was created by Consumers.createJavaConsumer() ten times.
>> > Every consumer is running in the separate thread.
>> >
>> > 2) old scala producer vs new java consumer
>> >
>> > new consumer was used like I have 10 consumers listening for a topic
>> > and 10 consumers subscribed to 1 partition. (consumer 1 - partition 1,
>> > consumer 2 - paritition 2 and so on)
>> >
>> > 3) old scala producer with custom partitioner
>> >
>> > I even tried to use my own partitioner, I just generated a random
>> > number from 0 to 9 so I expected that the messages will be sent
>> > randomly to the partition of that number.
>> >
>> > All I see is that there are only couple of consumers from these 10
>> > utilized, even I am sending 1M of messages, all I got from the
>> > debugging output is some preselected set of consumers which appear to
>> > be selected randomly.
>> >
>> > Do you have ANY hint why all consumers are not utilized even
>> > partitions are selected randomly?
>> >
>> > My initial suspicion was that rebalancing was done badly. The think
>> > was I was generating old consumers in a loop quicky one after another
>> > and I can imaging that rebalancing algorithm got mad.
>> >
>> > So I abandon this solution and I was thinking that let's just
>> > subscribe these consumers one by one to some partition so I will have
>> > 1 consumer subscribed just to 1 partition and there will not be any
>> > rebalancing at all.
>> >
>> > Oh my how wrong was I ... nothing changed.
>> >
>> > So I was thinking that if I have 10 consumers, each one subscribed to
>> > 1 paritition, maybe producer is just sending messages to some set of
>> > partitions and that's it. I  was not sure how this can be possible so
>> > to be super sure about the even spreading of message to partitions, I
>> > used custom partitioner class in old consumer so I will be sure that
>> > the partition the message will be sent to is super random.
>> >
>> > But that does not seems to work either.
>> >
>> > Please people, help me.
>> >
>> > --
>> > Stefan Miklosovic
>>
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jgong@berkeley.edu <el...@berkeley.edu> | (925) 400-3427



-- 
Stefan Miklosovic

Re: Kafka partitioning is pretty much broken

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Also worth mentioning is that the new producer doesn't have this behavior
-- it will round robin over available partitions for records without keys.
"Available" means it currently has a leader -- under normal cases this
means it distributes evenly across all partitions, but if a partition is
down temporarily it will just avoid it. It's highly recommended you use the
new producer anyway since it comes with a lot of other improvements as well.

-Ewen

On Wed, Jul 15, 2015 at 4:57 PM, Lance Laursen <ll...@rubiconproject.com>
wrote:

> From the FAQ:
>
> "To reduce # of open sockets, in 0.8.0 (
> https://issues.apache.org/jira/browse/KAFKA-1017), when the partitioning
> key is not specified or null, a producer will pick a random partition and
> stick to it for some time (default is 10 mins) before switching to another
> one. So, if there are fewer producers than partitions, at a given point of
> time, some partitions may not receive any data. To alleviate this problem,
> one can either reduce the metadata refresh interval or specify a message
> key and a customized random partitioner. For more detail see this thread
>
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201310.mbox/%3CCAFbh0Q0aVh%2Bvqxfy7H-%2BMnRFBt6BnyoZk1LWBoMspwSmTqUKMg%40mail.gmail.com%3E
> "
>
>
> On Wed, Jul 15, 2015 at 4:13 PM, Stefan Miklosovic <mi...@gmail.com>
> wrote:
>
> > Maybe there is some reason why produce sticks with a partition for
> > some period of time - mostly performance related. I can imagine that
> > constant switching between partitions can be kind of slow in such
> > sense that producer has to "refocus" on another partition to send a
> > message to and this switching may cost something so switching happens
> > sporadically.
> >
> > On the other hand, I would never expect such behaviour I encountered.
> > If it is once propagated as "random", I expect that it is really
> > random and not "random but .... not random every time". It is hard to
> > figure out these information, the only way seems to be to try all
> > other solutions and the most awkward one you would never expect to
> > work is actually the proper one ...
> >
> > On Thu, Jul 16, 2015 at 12:53 AM, JIEFU GONG <jg...@berkeley.edu> wrote:
> > > This is a total shot in the dark here so please ignore this if it fails
> > to
> > > make sense, but I remember that on some previous implementation of the
> > > producer prior to when round-robin was enabled, producers would send
> > > messages to only one of the partitions for a set period of time
> > > (configurable, I believe) before moving onto the next one. This caused
> > me a
> > > similar grievance as I would notice only a few of my consumers would
> get
> > > data while others were completely idle.
> > >
> > > Sounds similar, so check if that's a possibility at all?
> > >
> > > On Wed, Jul 15, 2015 at 3:04 PM, Jagbir Hooda <jh...@gmail.com>
> wrote:
> > >
> > >> Hi Stefan,
> > >>
> > >> Have you looked at the following output for message distribution
> > >> across the topic-partitions and which topic-partition is consumed by
> > >> which consumer thread?
> > >>
> > >> kafaka-server/bin>./kafka-run-class.sh
> > >> kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group
> > >> <consumer_group_name>
> > >>
> > >> Jagbir
> > >>
> > >> On Wed, Jul 15, 2015 at 12:50 PM, Stefan Miklosovic
> > >> <mi...@gmail.com> wrote:
> > >> > I have following problem, I tried almost everything I could but
> > without
> > >> any luck
> > >> >
> > >> > All I want to do is to have 1 producer, 1 topic, 10 partitions and
> 10
> > >> consumers.
> > >> >
> > >> > All I want is to send 1M of messages via producer to these 10
> > consumers.
> > >> >
> > >> > I am using built Kafka 0.8.3 from current upstream so I have
> bleeding
> > >> > edge stuff. It does not work on 0.8.1.1 nor 0.8.2 stream.
> > >> >
> > >> > The problem I have is that I expect that when I send 1 milion of
> > >> > messages via that producer, I will have all consumers busy. In other
> > >> > words, if a message to be sent via producer is sent to partition
> > >> > randomly (roundrobin / range), I expect that all 10 consumers will
> > >> > process about 100k of messages each because producer sends it to
> > >> > random partition of these 10.
> > >> >
> > >> > But I have never achieved such outcome.
> > >> >
> > >> > I was trying these combinations:
> > >> >
> > >> > 1) old scala producer vs old scala consumer
> > >> >
> > >> > Consumer was created by Consumers.createJavaConsumer() ten times.
> > >> > Every consumer is running in the separate thread.
> > >> >
> > >> > 2) old scala producer vs new java consumer
> > >> >
> > >> > new consumer was used like I have 10 consumers listening for a topic
> > >> > and 10 consumers subscribed to 1 partition. (consumer 1 - partition
> 1,
> > >> > consumer 2 - paritition 2 and so on)
> > >> >
> > >> > 3) old scala producer with custom partitioner
> > >> >
> > >> > I even tried to use my own partitioner, I just generated a random
> > >> > number from 0 to 9 so I expected that the messages will be sent
> > >> > randomly to the partition of that number.
> > >> >
> > >> > All I see is that there are only couple of consumers from these 10
> > >> > utilized, even I am sending 1M of messages, all I got from the
> > >> > debugging output is some preselected set of consumers which appear
> to
> > >> > be selected randomly.
> > >> >
> > >> > Do you have ANY hint why all consumers are not utilized even
> > >> > partitions are selected randomly?
> > >> >
> > >> > My initial suspicion was that rebalancing was done badly. The think
> > >> > was I was generating old consumers in a loop quicky one after
> another
> > >> > and I can imaging that rebalancing algorithm got mad.
> > >> >
> > >> > So I abandon this solution and I was thinking that let's just
> > >> > subscribe these consumers one by one to some partition so I will
> have
> > >> > 1 consumer subscribed just to 1 partition and there will not be any
> > >> > rebalancing at all.
> > >> >
> > >> > Oh my how wrong was I ... nothing changed.
> > >> >
> > >> > So I was thinking that if I have 10 consumers, each one subscribed
> to
> > >> > 1 paritition, maybe producer is just sending messages to some set of
> > >> > partitions and that's it. I  was not sure how this can be possible
> so
> > >> > to be super sure about the even spreading of message to partitions,
> I
> > >> > used custom partitioner class in old consumer so I will be sure that
> > >> > the partition the message will be sent to is super random.
> > >> >
> > >> > But that does not seems to work either.
> > >> >
> > >> > Please people, help me.
> > >> >
> > >> > --
> > >> > Stefan Miklosovic
> > >>
> > >
> > >
> > >
> > > --
> > >
> > > Jiefu Gong
> > > University of California, Berkeley | Class of 2017
> > > B.A Computer Science | College of Letters and Sciences
> > >
> > > jgong@berkeley.edu <el...@berkeley.edu> | (925) 400-3427
> >
> >
> >
> > --
> > Stefan Miklosovic
> >
>



-- 
Thanks,
Ewen

Re: Kafka partitioning is pretty much broken

Posted by Lance Laursen <ll...@rubiconproject.com>.
>From the FAQ:

"To reduce # of open sockets, in 0.8.0 (
https://issues.apache.org/jira/browse/KAFKA-1017), when the partitioning
key is not specified or null, a producer will pick a random partition and
stick to it for some time (default is 10 mins) before switching to another
one. So, if there are fewer producers than partitions, at a given point of
time, some partitions may not receive any data. To alleviate this problem,
one can either reduce the metadata refresh interval or specify a message
key and a customized random partitioner. For more detail see this thread
http://mail-archives.apache.org/mod_mbox/kafka-dev/201310.mbox/%3CCAFbh0Q0aVh%2Bvqxfy7H-%2BMnRFBt6BnyoZk1LWBoMspwSmTqUKMg%40mail.gmail.com%3E
"


On Wed, Jul 15, 2015 at 4:13 PM, Stefan Miklosovic <mi...@gmail.com>
wrote:

> Maybe there is some reason why produce sticks with a partition for
> some period of time - mostly performance related. I can imagine that
> constant switching between partitions can be kind of slow in such
> sense that producer has to "refocus" on another partition to send a
> message to and this switching may cost something so switching happens
> sporadically.
>
> On the other hand, I would never expect such behaviour I encountered.
> If it is once propagated as "random", I expect that it is really
> random and not "random but .... not random every time". It is hard to
> figure out these information, the only way seems to be to try all
> other solutions and the most awkward one you would never expect to
> work is actually the proper one ...
>
> On Thu, Jul 16, 2015 at 12:53 AM, JIEFU GONG <jg...@berkeley.edu> wrote:
> > This is a total shot in the dark here so please ignore this if it fails
> to
> > make sense, but I remember that on some previous implementation of the
> > producer prior to when round-robin was enabled, producers would send
> > messages to only one of the partitions for a set period of time
> > (configurable, I believe) before moving onto the next one. This caused
> me a
> > similar grievance as I would notice only a few of my consumers would get
> > data while others were completely idle.
> >
> > Sounds similar, so check if that's a possibility at all?
> >
> > On Wed, Jul 15, 2015 at 3:04 PM, Jagbir Hooda <jh...@gmail.com> wrote:
> >
> >> Hi Stefan,
> >>
> >> Have you looked at the following output for message distribution
> >> across the topic-partitions and which topic-partition is consumed by
> >> which consumer thread?
> >>
> >> kafaka-server/bin>./kafka-run-class.sh
> >> kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group
> >> <consumer_group_name>
> >>
> >> Jagbir
> >>
> >> On Wed, Jul 15, 2015 at 12:50 PM, Stefan Miklosovic
> >> <mi...@gmail.com> wrote:
> >> > I have following problem, I tried almost everything I could but
> without
> >> any luck
> >> >
> >> > All I want to do is to have 1 producer, 1 topic, 10 partitions and 10
> >> consumers.
> >> >
> >> > All I want is to send 1M of messages via producer to these 10
> consumers.
> >> >
> >> > I am using built Kafka 0.8.3 from current upstream so I have bleeding
> >> > edge stuff. It does not work on 0.8.1.1 nor 0.8.2 stream.
> >> >
> >> > The problem I have is that I expect that when I send 1 milion of
> >> > messages via that producer, I will have all consumers busy. In other
> >> > words, if a message to be sent via producer is sent to partition
> >> > randomly (roundrobin / range), I expect that all 10 consumers will
> >> > process about 100k of messages each because producer sends it to
> >> > random partition of these 10.
> >> >
> >> > But I have never achieved such outcome.
> >> >
> >> > I was trying these combinations:
> >> >
> >> > 1) old scala producer vs old scala consumer
> >> >
> >> > Consumer was created by Consumers.createJavaConsumer() ten times.
> >> > Every consumer is running in the separate thread.
> >> >
> >> > 2) old scala producer vs new java consumer
> >> >
> >> > new consumer was used like I have 10 consumers listening for a topic
> >> > and 10 consumers subscribed to 1 partition. (consumer 1 - partition 1,
> >> > consumer 2 - paritition 2 and so on)
> >> >
> >> > 3) old scala producer with custom partitioner
> >> >
> >> > I even tried to use my own partitioner, I just generated a random
> >> > number from 0 to 9 so I expected that the messages will be sent
> >> > randomly to the partition of that number.
> >> >
> >> > All I see is that there are only couple of consumers from these 10
> >> > utilized, even I am sending 1M of messages, all I got from the
> >> > debugging output is some preselected set of consumers which appear to
> >> > be selected randomly.
> >> >
> >> > Do you have ANY hint why all consumers are not utilized even
> >> > partitions are selected randomly?
> >> >
> >> > My initial suspicion was that rebalancing was done badly. The think
> >> > was I was generating old consumers in a loop quicky one after another
> >> > and I can imaging that rebalancing algorithm got mad.
> >> >
> >> > So I abandon this solution and I was thinking that let's just
> >> > subscribe these consumers one by one to some partition so I will have
> >> > 1 consumer subscribed just to 1 partition and there will not be any
> >> > rebalancing at all.
> >> >
> >> > Oh my how wrong was I ... nothing changed.
> >> >
> >> > So I was thinking that if I have 10 consumers, each one subscribed to
> >> > 1 paritition, maybe producer is just sending messages to some set of
> >> > partitions and that's it. I  was not sure how this can be possible so
> >> > to be super sure about the even spreading of message to partitions, I
> >> > used custom partitioner class in old consumer so I will be sure that
> >> > the partition the message will be sent to is super random.
> >> >
> >> > But that does not seems to work either.
> >> >
> >> > Please people, help me.
> >> >
> >> > --
> >> > Stefan Miklosovic
> >>
> >
> >
> >
> > --
> >
> > Jiefu Gong
> > University of California, Berkeley | Class of 2017
> > B.A Computer Science | College of Letters and Sciences
> >
> > jgong@berkeley.edu <el...@berkeley.edu> | (925) 400-3427
>
>
>
> --
> Stefan Miklosovic
>

Re: Kafka partitioning is pretty much broken

Posted by Stefan Miklosovic <mi...@gmail.com>.
Maybe there is some reason why produce sticks with a partition for
some period of time - mostly performance related. I can imagine that
constant switching between partitions can be kind of slow in such
sense that producer has to "refocus" on another partition to send a
message to and this switching may cost something so switching happens
sporadically.

On the other hand, I would never expect such behaviour I encountered.
If it is once propagated as "random", I expect that it is really
random and not "random but .... not random every time". It is hard to
figure out these information, the only way seems to be to try all
other solutions and the most awkward one you would never expect to
work is actually the proper one ...

On Thu, Jul 16, 2015 at 12:53 AM, JIEFU GONG <jg...@berkeley.edu> wrote:
> This is a total shot in the dark here so please ignore this if it fails to
> make sense, but I remember that on some previous implementation of the
> producer prior to when round-robin was enabled, producers would send
> messages to only one of the partitions for a set period of time
> (configurable, I believe) before moving onto the next one. This caused me a
> similar grievance as I would notice only a few of my consumers would get
> data while others were completely idle.
>
> Sounds similar, so check if that's a possibility at all?
>
> On Wed, Jul 15, 2015 at 3:04 PM, Jagbir Hooda <jh...@gmail.com> wrote:
>
>> Hi Stefan,
>>
>> Have you looked at the following output for message distribution
>> across the topic-partitions and which topic-partition is consumed by
>> which consumer thread?
>>
>> kafaka-server/bin>./kafka-run-class.sh
>> kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group
>> <consumer_group_name>
>>
>> Jagbir
>>
>> On Wed, Jul 15, 2015 at 12:50 PM, Stefan Miklosovic
>> <mi...@gmail.com> wrote:
>> > I have following problem, I tried almost everything I could but without
>> any luck
>> >
>> > All I want to do is to have 1 producer, 1 topic, 10 partitions and 10
>> consumers.
>> >
>> > All I want is to send 1M of messages via producer to these 10 consumers.
>> >
>> > I am using built Kafka 0.8.3 from current upstream so I have bleeding
>> > edge stuff. It does not work on 0.8.1.1 nor 0.8.2 stream.
>> >
>> > The problem I have is that I expect that when I send 1 milion of
>> > messages via that producer, I will have all consumers busy. In other
>> > words, if a message to be sent via producer is sent to partition
>> > randomly (roundrobin / range), I expect that all 10 consumers will
>> > process about 100k of messages each because producer sends it to
>> > random partition of these 10.
>> >
>> > But I have never achieved such outcome.
>> >
>> > I was trying these combinations:
>> >
>> > 1) old scala producer vs old scala consumer
>> >
>> > Consumer was created by Consumers.createJavaConsumer() ten times.
>> > Every consumer is running in the separate thread.
>> >
>> > 2) old scala producer vs new java consumer
>> >
>> > new consumer was used like I have 10 consumers listening for a topic
>> > and 10 consumers subscribed to 1 partition. (consumer 1 - partition 1,
>> > consumer 2 - paritition 2 and so on)
>> >
>> > 3) old scala producer with custom partitioner
>> >
>> > I even tried to use my own partitioner, I just generated a random
>> > number from 0 to 9 so I expected that the messages will be sent
>> > randomly to the partition of that number.
>> >
>> > All I see is that there are only couple of consumers from these 10
>> > utilized, even I am sending 1M of messages, all I got from the
>> > debugging output is some preselected set of consumers which appear to
>> > be selected randomly.
>> >
>> > Do you have ANY hint why all consumers are not utilized even
>> > partitions are selected randomly?
>> >
>> > My initial suspicion was that rebalancing was done badly. The think
>> > was I was generating old consumers in a loop quicky one after another
>> > and I can imaging that rebalancing algorithm got mad.
>> >
>> > So I abandon this solution and I was thinking that let's just
>> > subscribe these consumers one by one to some partition so I will have
>> > 1 consumer subscribed just to 1 partition and there will not be any
>> > rebalancing at all.
>> >
>> > Oh my how wrong was I ... nothing changed.
>> >
>> > So I was thinking that if I have 10 consumers, each one subscribed to
>> > 1 paritition, maybe producer is just sending messages to some set of
>> > partitions and that's it. I  was not sure how this can be possible so
>> > to be super sure about the even spreading of message to partitions, I
>> > used custom partitioner class in old consumer so I will be sure that
>> > the partition the message will be sent to is super random.
>> >
>> > But that does not seems to work either.
>> >
>> > Please people, help me.
>> >
>> > --
>> > Stefan Miklosovic
>>
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jgong@berkeley.edu <el...@berkeley.edu> | (925) 400-3427



-- 
Stefan Miklosovic

Re: Kafka partitioning is pretty much broken

Posted by JIEFU GONG <jg...@berkeley.edu>.
This is a total shot in the dark here so please ignore this if it fails to
make sense, but I remember that on some previous implementation of the
producer prior to when round-robin was enabled, producers would send
messages to only one of the partitions for a set period of time
(configurable, I believe) before moving onto the next one. This caused me a
similar grievance as I would notice only a few of my consumers would get
data while others were completely idle.

Sounds similar, so check if that's a possibility at all?

On Wed, Jul 15, 2015 at 3:04 PM, Jagbir Hooda <jh...@gmail.com> wrote:

> Hi Stefan,
>
> Have you looked at the following output for message distribution
> across the topic-partitions and which topic-partition is consumed by
> which consumer thread?
>
> kafaka-server/bin>./kafka-run-class.sh
> kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group
> <consumer_group_name>
>
> Jagbir
>
> On Wed, Jul 15, 2015 at 12:50 PM, Stefan Miklosovic
> <mi...@gmail.com> wrote:
> > I have following problem, I tried almost everything I could but without
> any luck
> >
> > All I want to do is to have 1 producer, 1 topic, 10 partitions and 10
> consumers.
> >
> > All I want is to send 1M of messages via producer to these 10 consumers.
> >
> > I am using built Kafka 0.8.3 from current upstream so I have bleeding
> > edge stuff. It does not work on 0.8.1.1 nor 0.8.2 stream.
> >
> > The problem I have is that I expect that when I send 1 milion of
> > messages via that producer, I will have all consumers busy. In other
> > words, if a message to be sent via producer is sent to partition
> > randomly (roundrobin / range), I expect that all 10 consumers will
> > process about 100k of messages each because producer sends it to
> > random partition of these 10.
> >
> > But I have never achieved such outcome.
> >
> > I was trying these combinations:
> >
> > 1) old scala producer vs old scala consumer
> >
> > Consumer was created by Consumers.createJavaConsumer() ten times.
> > Every consumer is running in the separate thread.
> >
> > 2) old scala producer vs new java consumer
> >
> > new consumer was used like I have 10 consumers listening for a topic
> > and 10 consumers subscribed to 1 partition. (consumer 1 - partition 1,
> > consumer 2 - paritition 2 and so on)
> >
> > 3) old scala producer with custom partitioner
> >
> > I even tried to use my own partitioner, I just generated a random
> > number from 0 to 9 so I expected that the messages will be sent
> > randomly to the partition of that number.
> >
> > All I see is that there are only couple of consumers from these 10
> > utilized, even I am sending 1M of messages, all I got from the
> > debugging output is some preselected set of consumers which appear to
> > be selected randomly.
> >
> > Do you have ANY hint why all consumers are not utilized even
> > partitions are selected randomly?
> >
> > My initial suspicion was that rebalancing was done badly. The think
> > was I was generating old consumers in a loop quicky one after another
> > and I can imaging that rebalancing algorithm got mad.
> >
> > So I abandon this solution and I was thinking that let's just
> > subscribe these consumers one by one to some partition so I will have
> > 1 consumer subscribed just to 1 partition and there will not be any
> > rebalancing at all.
> >
> > Oh my how wrong was I ... nothing changed.
> >
> > So I was thinking that if I have 10 consumers, each one subscribed to
> > 1 paritition, maybe producer is just sending messages to some set of
> > partitions and that's it. I  was not sure how this can be possible so
> > to be super sure about the even spreading of message to partitions, I
> > used custom partitioner class in old consumer so I will be sure that
> > the partition the message will be sent to is super random.
> >
> > But that does not seems to work either.
> >
> > Please people, help me.
> >
> > --
> > Stefan Miklosovic
>



-- 

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jgong@berkeley.edu <el...@berkeley.edu> | (925) 400-3427

Re: Kafka partitioning is pretty much broken

Posted by Stefan Miklosovic <mi...@gmail.com>.
I think I figured it out.

I had to use custom parititioner which does basically nothing.

Even I used it before, it was not taken into consideration because I
was sending KeyedMessage without any key. Just partition and payload.

Now I am doing it like this:

producer.send(new KeyedMessage<String, String>("topic", key, "message");

where "key" is defined like this:

String key = String.valueOf(new Random().nextInt(10));

which basically gives me random partition.

Next I am using my custom partitioner and that one does basically
nothing but it just returns that random number back:

public class RandomPartitioner implements Partitioner {

    public RandomPartitioner(VerifiableProperties properties) {
    }

    @Override
    public int partition(Object key, int numberOfPartitions) {
        return Integer.parseInt((String) key);
    }

}

I am not saying this is the best solution but you can pretty much bet
on it that it is the only one which works for me.

It seems that producer in my case just selected the partition it sends
messages to and it never changes that partition. That it is only way
how it is possible that some of my consumers have never got any
message even they were subscribed to one specific partition.

Another optimalization would be like

public class RandomPartitioner implements Partitioner {

    private int partition = 0;

    public RandomPartitioner(VerifiableProperties properties) {
    }

    @Override
    public int partition(Object key, int numberOfPartitions) {
        partition++;
        partition %= numberOfPartitions;
        return partition;
    }

}

But I have to make sure that this is unique per producer. I do not
know if partitioner is stateless or not in such sense that it get's
instantiated every time I want to send something or it is just created
and it is reused forever.

On Thu, Jul 16, 2015 at 12:04 AM, Jagbir Hooda <jh...@gmail.com> wrote:
> Hi Stefan,
>
> Have you looked at the following output for message distribution
> across the topic-partitions and which topic-partition is consumed by
> which consumer thread?
>
> kafaka-server/bin>./kafka-run-class.sh
> kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group
> <consumer_group_name>
>
> Jagbir
>
> On Wed, Jul 15, 2015 at 12:50 PM, Stefan Miklosovic
> <mi...@gmail.com> wrote:
>> I have following problem, I tried almost everything I could but without any luck
>>
>> All I want to do is to have 1 producer, 1 topic, 10 partitions and 10 consumers.
>>
>> All I want is to send 1M of messages via producer to these 10 consumers.
>>
>> I am using built Kafka 0.8.3 from current upstream so I have bleeding
>> edge stuff. It does not work on 0.8.1.1 nor 0.8.2 stream.
>>
>> The problem I have is that I expect that when I send 1 milion of
>> messages via that producer, I will have all consumers busy. In other
>> words, if a message to be sent via producer is sent to partition
>> randomly (roundrobin / range), I expect that all 10 consumers will
>> process about 100k of messages each because producer sends it to
>> random partition of these 10.
>>
>> But I have never achieved such outcome.
>>
>> I was trying these combinations:
>>
>> 1) old scala producer vs old scala consumer
>>
>> Consumer was created by Consumers.createJavaConsumer() ten times.
>> Every consumer is running in the separate thread.
>>
>> 2) old scala producer vs new java consumer
>>
>> new consumer was used like I have 10 consumers listening for a topic
>> and 10 consumers subscribed to 1 partition. (consumer 1 - partition 1,
>> consumer 2 - paritition 2 and so on)
>>
>> 3) old scala producer with custom partitioner
>>
>> I even tried to use my own partitioner, I just generated a random
>> number from 0 to 9 so I expected that the messages will be sent
>> randomly to the partition of that number.
>>
>> All I see is that there are only couple of consumers from these 10
>> utilized, even I am sending 1M of messages, all I got from the
>> debugging output is some preselected set of consumers which appear to
>> be selected randomly.
>>
>> Do you have ANY hint why all consumers are not utilized even
>> partitions are selected randomly?
>>
>> My initial suspicion was that rebalancing was done badly. The think
>> was I was generating old consumers in a loop quicky one after another
>> and I can imaging that rebalancing algorithm got mad.
>>
>> So I abandon this solution and I was thinking that let's just
>> subscribe these consumers one by one to some partition so I will have
>> 1 consumer subscribed just to 1 partition and there will not be any
>> rebalancing at all.
>>
>> Oh my how wrong was I ... nothing changed.
>>
>> So I was thinking that if I have 10 consumers, each one subscribed to
>> 1 paritition, maybe producer is just sending messages to some set of
>> partitions and that's it. I  was not sure how this can be possible so
>> to be super sure about the even spreading of message to partitions, I
>> used custom partitioner class in old consumer so I will be sure that
>> the partition the message will be sent to is super random.
>>
>> But that does not seems to work either.
>>
>> Please people, help me.
>>
>> --
>> Stefan Miklosovic



-- 
Stefan Miklosovic

Re: Kafka partitioning is pretty much broken

Posted by Jagbir Hooda <jh...@gmail.com>.
Hi Stefan,

Have you looked at the following output for message distribution
across the topic-partitions and which topic-partition is consumed by
which consumer thread?

kafaka-server/bin>./kafka-run-class.sh
kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group
<consumer_group_name>

Jagbir

On Wed, Jul 15, 2015 at 12:50 PM, Stefan Miklosovic
<mi...@gmail.com> wrote:
> I have following problem, I tried almost everything I could but without any luck
>
> All I want to do is to have 1 producer, 1 topic, 10 partitions and 10 consumers.
>
> All I want is to send 1M of messages via producer to these 10 consumers.
>
> I am using built Kafka 0.8.3 from current upstream so I have bleeding
> edge stuff. It does not work on 0.8.1.1 nor 0.8.2 stream.
>
> The problem I have is that I expect that when I send 1 milion of
> messages via that producer, I will have all consumers busy. In other
> words, if a message to be sent via producer is sent to partition
> randomly (roundrobin / range), I expect that all 10 consumers will
> process about 100k of messages each because producer sends it to
> random partition of these 10.
>
> But I have never achieved such outcome.
>
> I was trying these combinations:
>
> 1) old scala producer vs old scala consumer
>
> Consumer was created by Consumers.createJavaConsumer() ten times.
> Every consumer is running in the separate thread.
>
> 2) old scala producer vs new java consumer
>
> new consumer was used like I have 10 consumers listening for a topic
> and 10 consumers subscribed to 1 partition. (consumer 1 - partition 1,
> consumer 2 - paritition 2 and so on)
>
> 3) old scala producer with custom partitioner
>
> I even tried to use my own partitioner, I just generated a random
> number from 0 to 9 so I expected that the messages will be sent
> randomly to the partition of that number.
>
> All I see is that there are only couple of consumers from these 10
> utilized, even I am sending 1M of messages, all I got from the
> debugging output is some preselected set of consumers which appear to
> be selected randomly.
>
> Do you have ANY hint why all consumers are not utilized even
> partitions are selected randomly?
>
> My initial suspicion was that rebalancing was done badly. The think
> was I was generating old consumers in a loop quicky one after another
> and I can imaging that rebalancing algorithm got mad.
>
> So I abandon this solution and I was thinking that let's just
> subscribe these consumers one by one to some partition so I will have
> 1 consumer subscribed just to 1 partition and there will not be any
> rebalancing at all.
>
> Oh my how wrong was I ... nothing changed.
>
> So I was thinking that if I have 10 consumers, each one subscribed to
> 1 paritition, maybe producer is just sending messages to some set of
> partitions and that's it. I  was not sure how this can be possible so
> to be super sure about the even spreading of message to partitions, I
> used custom partitioner class in old consumer so I will be sure that
> the partition the message will be sent to is super random.
>
> But that does not seems to work either.
>
> Please people, help me.
>
> --
> Stefan Miklosovic