You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Zijing Guo <al...@yahoo.com.INVALID> on 2015/03/05 17:50:22 UTC

Kafka DefaultPartitioner is not behaved as expected.

Hi community,I have a 2 nodes test cluster with 2 zk instance and 2 broker instance running and I'm experimenting kafka producer in a cluster environment. So I create a topic "foo" with 2 partitions and replication 1.I create a async Producer without defining partition.class (so the partitioner will be the default one, which is "kafka.producer.DefaultPartitioner" and I verified.)
Now since I know that there is 2 partitions for topic "foo" and I create 1000 KeyedMessage with key = "a"
    val msgs = val msgs = (1 to 1000).map(e => KeyedMessage("foo","test message" + e, "a"))    prod.send(msgs)

In theory, "a".hashCode=97, 97 % 2 = 1. so I should expect all the message go to broker1. However,after I send the message,  from the kafka Web console, I can see that the data is evenly distributed around the 2 brokers.
Any help will be appreciated.Thanks

 

Re: Kafka DefaultPartitioner is not behaved as expected.

Posted by Zijing Guo <al...@yahoo.com.INVALID>.
Hi, Thanks for your response. That's just my typo, I was meant to say KeyedMessage("foo","a", "test message" + e).


     On Thursday, March 5, 2015 12:49 PM, Mayuresh Gharat <gh...@gmail.com> wrote:
   

 I suppose the keyedMessage constructor is KeyedMessage(topic, key,
message), so in your case key is "test message" + e.


Thanks,

Mayuresh

On Thu, Mar 5, 2015 at 9:25 AM, Zijing Guo <al...@yahoo.com.invalid>
wrote:

> And I'm using kafka version 0.8.2.0
>
>      On Thursday, March 5, 2015 11:51 AM, Zijing Guo
> <al...@yahoo.com.INVALID> wrote:
>
>
>  Hi community,I have a 2 nodes test cluster with 2 zk instance and 2
> broker instance running and I'm experimenting kafka producer in a cluster
> environment. So I create a topic "foo" with 2 partitions and replication
> 1.I create a async Producer without defining partition.class (so the
> partitioner will be the default one, which is
> "kafka.producer.DefaultPartitioner" and I verified.)
> Now since I know that there is 2 partitions for topic "foo" and I create
> 1000 KeyedMessage with key = "a"
>    val msgs = val msgs = (1 to 1000).map(e => KeyedMessage("foo","test
> message" + e, "a"))    prod.send(msgs)
>
> In theory, "a".hashCode=97, 97 % 2 = 1. so I should expect all the message
> go to broker1. However,after I send the message,  from the kafka Web
> console, I can see that the data is evenly distributed around the 2 brokers.
> Any help will be appreciated.Thanks
>
>
>
>
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


   

Re: Kafka DefaultPartitioner is not behaved as expected.

Posted by Mayuresh Gharat <gh...@gmail.com>.
I suppose the keyedMessage constructor is KeyedMessage(topic, key,
message), so in your case key is "test message" + e.


Thanks,

Mayuresh

On Thu, Mar 5, 2015 at 9:25 AM, Zijing Guo <al...@yahoo.com.invalid>
wrote:

> And I'm using kafka version 0.8.2.0
>
>      On Thursday, March 5, 2015 11:51 AM, Zijing Guo
> <al...@yahoo.com.INVALID> wrote:
>
>
>  Hi community,I have a 2 nodes test cluster with 2 zk instance and 2
> broker instance running and I'm experimenting kafka producer in a cluster
> environment. So I create a topic "foo" with 2 partitions and replication
> 1.I create a async Producer without defining partition.class (so the
> partitioner will be the default one, which is
> "kafka.producer.DefaultPartitioner" and I verified.)
> Now since I know that there is 2 partitions for topic "foo" and I create
> 1000 KeyedMessage with key = "a"
>     val msgs = val msgs = (1 to 1000).map(e => KeyedMessage("foo","test
> message" + e, "a"))    prod.send(msgs)
>
> In theory, "a".hashCode=97, 97 % 2 = 1. so I should expect all the message
> go to broker1. However,after I send the message,  from the kafka Web
> console, I can see that the data is evenly distributed around the 2 brokers.
> Any help will be appreciated.Thanks
>
>
>
>
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: Kafka DefaultPartitioner is not behaved as expected.

Posted by Zijing Guo <al...@yahoo.com.INVALID>.
And I'm using kafka version 0.8.2.0 

     On Thursday, March 5, 2015 11:51 AM, Zijing Guo <al...@yahoo.com.INVALID> wrote:
   

 Hi community,I have a 2 nodes test cluster with 2 zk instance and 2 broker instance running and I'm experimenting kafka producer in a cluster environment. So I create a topic "foo" with 2 partitions and replication 1.I create a async Producer without defining partition.class (so the partitioner will be the default one, which is "kafka.producer.DefaultPartitioner" and I verified.)
Now since I know that there is 2 partitions for topic "foo" and I create 1000 KeyedMessage with key = "a"
    val msgs = val msgs = (1 to 1000).map(e => KeyedMessage("foo","test message" + e, "a"))    prod.send(msgs)

In theory, "a".hashCode=97, 97 % 2 = 1. so I should expect all the message go to broker1. However,after I send the message,  from the kafka Web console, I can see that the data is evenly distributed around the 2 brokers.
Any help will be appreciated.Thanks

 

   

Re: Kafka DefaultPartitioner is not behaved as expected.

Posted by Zijing Guo <al...@yahoo.com.INVALID>.
Thanks a lot, really appreciate you guys help!!! 

     On Thursday, March 5, 2015 9:17 PM, tao xiao <xi...@gmail.com> wrote:
   

 The reason you need to use "a".getBytes is because the default serializer.class
is kafka.serializer.DefaultEncoder which takes byte[] as input. The way the
array returns hash code is not based on equality of the elements hence
every time a new byte array is created which is the case in your sample
code the hash code is going to be different.

If you really want to stick with the same partition for the key you'd
better use kafka.serializer.StringEncoder as the serializer.class.
This StringEncoder
takes string as input and as you know string always returns same hash code
if the value is the same.

On Fri, Mar 6, 2015 at 2:23 AM, Zijing Guo <al...@yahoo.com.invalid>
wrote:

> And also there something that I think worth mentioning,when I
> call prod.send(KeyedMessage("foo", "a", "test message")), the data can't be
> delivered to the brokers, the only way to make it work is
> through:prod.send(KeyedMessage("foo", "a".getBytes, "test
> message".getBytes)). When I convert the data and key to bytes, the data is
> not going to the proper partitions.
> Thanks
>
>      On Thursday, March 5, 2015 12:59 PM, Zijing Guo
> <al...@yahoo.com.INVALID> wrote:
>
>
>  Hi Guozhang,I'm using kafka 0.8.2.0
> Thanks
>
>    On Thursday, March 5, 2015 12:57 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
>
>  Zijing,
>
> Which version of Kafka client are you using?
>
> On Thu, Mar 5, 2015 at 8:50 AM, Zijing Guo <al...@yahoo.com.invalid>
> wrote:
>
> > Hi community,I have a 2 nodes test cluster with 2 zk instance and 2
> broker
> > instance running and I'm experimenting kafka producer in a cluster
> > environment. So I create a topic "foo" with 2 partitions and replication
> > 1.I create a async Producer without defining partition.class (so the
> > partitioner will be the default one, which is
> > "kafka.producer.DefaultPartitioner" and I verified.)
> > Now since I know that there is 2 partitions for topic "foo" and I create
> > 1000 KeyedMessage with key = "a"
> >    val msgs = val msgs = (1 to 1000).map(e => KeyedMessage("foo","test
> > message" + e, "a"))    prod.send(msgs)
> >
> > In theory, "a".hashCode=97, 97 % 2 = 1. so I should expect all the
> message
> > go to broker1. However,after I send the message,  from the kafka Web
> > console, I can see that the data is evenly distributed around the 2
> brokers.
> > Any help will be appreciated.Thanks
> >
> >
>
>
>
>
> --
> -- Guozhang
>
>
>
>
>
>



-- 
Regards,
Tao


   

Re: Kafka DefaultPartitioner is not behaved as expected.

Posted by tao xiao <xi...@gmail.com>.
The reason you need to use "a".getBytes is because the default serializer.class
is kafka.serializer.DefaultEncoder which takes byte[] as input. The way the
array returns hash code is not based on equality of the elements hence
every time a new byte array is created which is the case in your sample
code the hash code is going to be different.

If you really want to stick with the same partition for the key you'd
better use kafka.serializer.StringEncoder as the serializer.class.
This StringEncoder
takes string as input and as you know string always returns same hash code
if the value is the same.

On Fri, Mar 6, 2015 at 2:23 AM, Zijing Guo <al...@yahoo.com.invalid>
wrote:

> And also there something that I think worth mentioning,when I
> call prod.send(KeyedMessage("foo", "a", "test message")), the data can't be
> delivered to the brokers, the only way to make it work is
> through:prod.send(KeyedMessage("foo", "a".getBytes, "test
> message".getBytes)). When I convert the data and key to bytes, the data is
> not going to the proper partitions.
> Thanks
>
>      On Thursday, March 5, 2015 12:59 PM, Zijing Guo
> <al...@yahoo.com.INVALID> wrote:
>
>
>  Hi Guozhang,I'm using kafka 0.8.2.0
> Thanks
>
>     On Thursday, March 5, 2015 12:57 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
>
>  Zijing,
>
> Which version of Kafka client are you using?
>
> On Thu, Mar 5, 2015 at 8:50 AM, Zijing Guo <al...@yahoo.com.invalid>
> wrote:
>
> > Hi community,I have a 2 nodes test cluster with 2 zk instance and 2
> broker
> > instance running and I'm experimenting kafka producer in a cluster
> > environment. So I create a topic "foo" with 2 partitions and replication
> > 1.I create a async Producer without defining partition.class (so the
> > partitioner will be the default one, which is
> > "kafka.producer.DefaultPartitioner" and I verified.)
> > Now since I know that there is 2 partitions for topic "foo" and I create
> > 1000 KeyedMessage with key = "a"
> >    val msgs = val msgs = (1 to 1000).map(e => KeyedMessage("foo","test
> > message" + e, "a"))    prod.send(msgs)
> >
> > In theory, "a".hashCode=97, 97 % 2 = 1. so I should expect all the
> message
> > go to broker1. However,after I send the message,  from the kafka Web
> > console, I can see that the data is evenly distributed around the 2
> brokers.
> > Any help will be appreciated.Thanks
> >
> >
>
>
>
>
> --
> -- Guozhang
>
>
>
>
>
>



-- 
Regards,
Tao

Re: Kafka DefaultPartitioner is not behaved as expected.

Posted by Zijing Guo <al...@yahoo.com.INVALID>.
And also there something that I think worth mentioning,when I call prod.send(KeyedMessage("foo", "a", "test message")), the data can't be delivered to the brokers, the only way to make it work is through:prod.send(KeyedMessage("foo", "a".getBytes, "test message".getBytes)). When I convert the data and key to bytes, the data is not going to the proper partitions.
Thanks 

     On Thursday, March 5, 2015 12:59 PM, Zijing Guo <al...@yahoo.com.INVALID> wrote:
   

 Hi Guozhang,I'm using kafka 0.8.2.0 
Thanks 

    On Thursday, March 5, 2015 12:57 PM, Guozhang Wang <wa...@gmail.com> wrote:
  

 Zijing,

Which version of Kafka client are you using?

On Thu, Mar 5, 2015 at 8:50 AM, Zijing Guo <al...@yahoo.com.invalid>
wrote:

> Hi community,I have a 2 nodes test cluster with 2 zk instance and 2 broker
> instance running and I'm experimenting kafka producer in a cluster
> environment. So I create a topic "foo" with 2 partitions and replication
> 1.I create a async Producer without defining partition.class (so the
> partitioner will be the default one, which is
> "kafka.producer.DefaultPartitioner" and I verified.)
> Now since I know that there is 2 partitions for topic "foo" and I create
> 1000 KeyedMessage with key = "a"
>    val msgs = val msgs = (1 to 1000).map(e => KeyedMessage("foo","test
> message" + e, "a"))    prod.send(msgs)
>
> In theory, "a".hashCode=97, 97 % 2 = 1. so I should expect all the message
> go to broker1. However,after I send the message,  from the kafka Web
> console, I can see that the data is evenly distributed around the 2 brokers.
> Any help will be appreciated.Thanks
>
>




-- 
-- Guozhang




   

Re: Kafka DefaultPartitioner is not behaved as expected.

Posted by Zijing Guo <al...@yahoo.com.INVALID>.
Hi Guozhang,I'm using kafka 0.8.2.0 
Thanks 

     On Thursday, March 5, 2015 12:57 PM, Guozhang Wang <wa...@gmail.com> wrote:
   

 Zijing,

Which version of Kafka client are you using?

On Thu, Mar 5, 2015 at 8:50 AM, Zijing Guo <al...@yahoo.com.invalid>
wrote:

> Hi community,I have a 2 nodes test cluster with 2 zk instance and 2 broker
> instance running and I'm experimenting kafka producer in a cluster
> environment. So I create a topic "foo" with 2 partitions and replication
> 1.I create a async Producer without defining partition.class (so the
> partitioner will be the default one, which is
> "kafka.producer.DefaultPartitioner" and I verified.)
> Now since I know that there is 2 partitions for topic "foo" and I create
> 1000 KeyedMessage with key = "a"
>    val msgs = val msgs = (1 to 1000).map(e => KeyedMessage("foo","test
> message" + e, "a"))    prod.send(msgs)
>
> In theory, "a".hashCode=97, 97 % 2 = 1. so I should expect all the message
> go to broker1. However,after I send the message,  from the kafka Web
> console, I can see that the data is evenly distributed around the 2 brokers.
> Any help will be appreciated.Thanks
>
>




-- 
-- Guozhang


   

Re: Kafka DefaultPartitioner is not behaved as expected.

Posted by Guozhang Wang <wa...@gmail.com>.
Just got the previous emails.

Mayuresh is right, it seems your keys are not "a".

On Thu, Mar 5, 2015 at 9:57 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Zijing,
>
> Which version of Kafka client are you using?
>
> On Thu, Mar 5, 2015 at 8:50 AM, Zijing Guo <al...@yahoo.com.invalid>
> wrote:
>
>> Hi community,I have a 2 nodes test cluster with 2 zk instance and 2
>> broker instance running and I'm experimenting kafka producer in a cluster
>> environment. So I create a topic "foo" with 2 partitions and replication
>> 1.I create a async Producer without defining partition.class (so the
>> partitioner will be the default one, which is
>> "kafka.producer.DefaultPartitioner" and I verified.)
>> Now since I know that there is 2 partitions for topic "foo" and I create
>> 1000 KeyedMessage with key = "a"
>>     val msgs = val msgs = (1 to 1000).map(e => KeyedMessage("foo","test
>> message" + e, "a"))    prod.send(msgs)
>>
>> In theory, "a".hashCode=97, 97 % 2 = 1. so I should expect all the
>> message go to broker1. However,after I send the message,  from the kafka
>> Web console, I can see that the data is evenly distributed around the 2
>> brokers.
>> Any help will be appreciated.Thanks
>>
>>
>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Re: Kafka DefaultPartitioner is not behaved as expected.

Posted by Guozhang Wang <wa...@gmail.com>.
Zijing,

Which version of Kafka client are you using?

On Thu, Mar 5, 2015 at 8:50 AM, Zijing Guo <al...@yahoo.com.invalid>
wrote:

> Hi community,I have a 2 nodes test cluster with 2 zk instance and 2 broker
> instance running and I'm experimenting kafka producer in a cluster
> environment. So I create a topic "foo" with 2 partitions and replication
> 1.I create a async Producer without defining partition.class (so the
> partitioner will be the default one, which is
> "kafka.producer.DefaultPartitioner" and I verified.)
> Now since I know that there is 2 partitions for topic "foo" and I create
> 1000 KeyedMessage with key = "a"
>     val msgs = val msgs = (1 to 1000).map(e => KeyedMessage("foo","test
> message" + e, "a"))    prod.send(msgs)
>
> In theory, "a".hashCode=97, 97 % 2 = 1. so I should expect all the message
> go to broker1. However,after I send the message,  from the kafka Web
> console, I can see that the data is evenly distributed around the 2 brokers.
> Any help will be appreciated.Thanks
>
>




-- 
-- Guozhang