You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by James Cheng <jc...@tivo.com> on 2015/04/25 02:48:57 UTC

New and old producers partition messages differently

Hi,

I was playing with the new producer in 0.8.2.1 using partition keys ("semantic partitioning" I believe is the phrase?). I noticed that the default partitioner in 0.8.2.1 does not partition items the same way as the old 0.8.1.1 default partitioner was doing. For a test item, the old producer was sending it to partition 0, whereas the new producer was sending it to partition 4.

Digging in the code, it appears that the partitioning logic is different between the old and new producers. Both of them hash the key, but they use different hashing algorithms.

Old partitioner:
./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:

  def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
  }

New partitioner:
./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:

        } else {
            // hash the key to choose a partition
            return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
        }

Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2 isn't the same logic as hashCode, especially since hashCode is overrideable).

Was it intentional that the hashing algorithm would change between the old and new producer? If so, was this documented? I don't know if anyone was relying on the old default partitioner, as opposed to going round-robin or using their own custom partitioner. Do you expect it to change in the future? I'm guessing that one of the main reasons to have a custom hashing algorithm is so that you are full control of the partitioning and can keep it stable (as opposed to being reliant on hashCode()).

Thanks,
-James


Re: New and old producers partition messages differently

Posted by Daniel Compton <da...@gmail.com>.
I would support a configuration flag to be added in the short term, say
until 0.9. In the long term, hashcode may change out from underneath people
anyway, so delaying moving to Murmur for too long is likely to still end up
in pain.

Leaving that configuration around long term increases code and test surface
area which isn't great, although I can certainly see that it could be
necessary for some orgs.

It would probably also be worth documenting the scenarios under which the
old default partitioner may change how it partitions items, so people are
aware of its limitations.
On Sun, 26 Apr 2015 at 5:22 pm Gwen Shapira <gs...@cloudera.com> wrote:

> Ouch. That can be a painful discovery after a client upgrade. It can
> break a lot of app code.
>
> I can see the reason for custom hash algorithm (lots of db products do
> this, usually for stability, but sometimes for other hash properties
> (Oracle has some cool guarantees around modifying number of partitions
> and data movement)).
>
> I'm wondering if, in the interest of painless upgrades, we should add
> a configuration flag for topics - old.hash.algorithm that will keep
> existing behavior. Sounds like a rather ugly hack (and things can
> still break in new versions of Java), but I can't see a better
> alternative at the moment.
>
> Gwen
>
> On Fri, Apr 24, 2015 at 5:48 PM, James Cheng <jc...@tivo.com> wrote:
> > Hi,
> >
> > I was playing with the new producer in 0.8.2.1 using partition keys
> ("semantic partitioning" I believe is the phrase?). I noticed that the
> default partitioner in 0.8.2.1 does not partition items the same way as the
> old 0.8.1.1 default partitioner was doing. For a test item, the old
> producer was sending it to partition 0, whereas the new producer was
> sending it to partition 4.
> >
> > Digging in the code, it appears that the partitioning logic is different
> between the old and new producers. Both of them hash the key, but they use
> different hashing algorithms.
> >
> > Old partitioner:
> > ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
> >
> >   def partition(key: Any, numPartitions: Int): Int = {
> >     Utils.abs(key.hashCode) % numPartitions
> >   }
> >
> > New partitioner:
> >
> ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
> >
> >         } else {
> >             // hash the key to choose a partition
> >             return Utils.abs(Utils.murmur2(record.key())) %
> numPartitions;
> >         }
> >
> > Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2
> isn't the same logic as hashCode, especially since hashCode is
> overrideable).
> >
> > Was it intentional that the hashing algorithm would change between the
> old and new producer? If so, was this documented? I don't know if anyone
> was relying on the old default partitioner, as opposed to going round-robin
> or using their own custom partitioner. Do you expect it to change in the
> future? I'm guessing that one of the main reasons to have a custom hashing
> algorithm is so that you are full control of the partitioning and can keep
> it stable (as opposed to being reliant on hashCode()).
> >
> > Thanks,
> > -James
> >
>

Re: New and old producers partition messages differently

Posted by Gwen Shapira <gs...@cloudera.com>.
Ouch. That can be a painful discovery after a client upgrade. It can
break a lot of app code.

I can see the reason for custom hash algorithm (lots of db products do
this, usually for stability, but sometimes for other hash properties
(Oracle has some cool guarantees around modifying number of partitions
and data movement)).

I'm wondering if, in the interest of painless upgrades, we should add
a configuration flag for topics - old.hash.algorithm that will keep
existing behavior. Sounds like a rather ugly hack (and things can
still break in new versions of Java), but I can't see a better
alternative at the moment.

Gwen

On Fri, Apr 24, 2015 at 5:48 PM, James Cheng <jc...@tivo.com> wrote:
> Hi,
>
> I was playing with the new producer in 0.8.2.1 using partition keys ("semantic partitioning" I believe is the phrase?). I noticed that the default partitioner in 0.8.2.1 does not partition items the same way as the old 0.8.1.1 default partitioner was doing. For a test item, the old producer was sending it to partition 0, whereas the new producer was sending it to partition 4.
>
> Digging in the code, it appears that the partitioning logic is different between the old and new producers. Both of them hash the key, but they use different hashing algorithms.
>
> Old partitioner:
> ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
>
>   def partition(key: Any, numPartitions: Int): Int = {
>     Utils.abs(key.hashCode) % numPartitions
>   }
>
> New partitioner:
> ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
>
>         } else {
>             // hash the key to choose a partition
>             return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
>         }
>
> Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2 isn't the same logic as hashCode, especially since hashCode is overrideable).
>
> Was it intentional that the hashing algorithm would change between the old and new producer? If so, was this documented? I don't know if anyone was relying on the old default partitioner, as opposed to going round-robin or using their own custom partitioner. Do you expect it to change in the future? I'm guessing that one of the main reasons to have a custom hashing algorithm is so that you are full control of the partitioning and can keep it stable (as opposed to being reliant on hashCode()).
>
> Thanks,
> -James
>

Re: New and old producers partition messages differently

Posted by Jay Kreps <ja...@gmail.com>.
Yeah I agree we could have handled this better. I think the story we have
now is that you can override it using the partition argument in the
producer (and when we get the patch for pluggable producer we can bundle a
LegacyPartitioner or something like that).

The reason for murmur2 over 3 was that it had a good single-class java
implementation. The only mumur 3 impl I could find was extremely complex
and hard to bundle, and I really wanted to avoid depending on something
like Guava which ends up being kind of a nightmare from a dependency mgmt
perspective for client libs.

-Jay

On Sun, Apr 26, 2015 at 9:03 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> Definitely +1 for advertising this in the docs.
>
> What I can't figure out is the upgrade path... if my application assumes
> that all data for a single user is in one partition (so it subscribes to a
> single partition and expects everything about a specific subset of users to
> be in that partition), this assumption will not survive an upgrade to
> 0.8.2.X.  I think the assumption of stable hash partitions even after
> upgrades is pretty reasonable (i.e. I made it about gazillion times without
> thinking twice). Note that in this story my app wasn't even upgraded - it
> broke because a producer upgraded to a new API.
>
> If we advertise: "upgrading to the new producer API may break consumers",
> we may need to offer a work-around to allow people to upgrade producers
> anyway.
> Perhaps we can say "wait for Sriharsha's partitioner patch and write a
> custom partitioner that uses hashcode()".
>
> Gwen
>
>
>
> On Sun, Apr 26, 2015 at 7:57 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > This was actually intentional.
> >
> > The problem with relying on hashCode is that
> > (1) it is often a very bad hash function,
> > (2) it is not guaranteed to be consistent from run to run (i.e. if you
> > restart the jvm the value of hashing the same key can change!),
> > (3) it is not available outside the jvm so non-java producers can't use
> the
> > same function.
> >
> > In general at the moment different producers don't use the same hash code
> > so I think this is not quite as bad as it sounds. Though it would be good
> > to standardize things.
> >
> > I think the most obvious thing we could do here would be to do a much
> > better job of advertising this in the docs, though, so people don't get
> > bitten by it.
> >
> > -Jay
> >
> > On Fri, Apr 24, 2015 at 5:48 PM, James Cheng <jc...@tivo.com> wrote:
> >
> > > Hi,
> > >
> > > I was playing with the new producer in 0.8.2.1 using partition keys
> > > ("semantic partitioning" I believe is the phrase?). I noticed that the
> > > default partitioner in 0.8.2.1 does not partition items the same way as
> > the
> > > old 0.8.1.1 default partitioner was doing. For a test item, the old
> > > producer was sending it to partition 0, whereas the new producer was
> > > sending it to partition 4.
> > >
> > > Digging in the code, it appears that the partitioning logic is
> different
> > > between the old and new producers. Both of them hash the key, but they
> > use
> > > different hashing algorithms.
> > >
> > > Old partitioner:
> > > ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
> > >
> > >   def partition(key: Any, numPartitions: Int): Int = {
> > >     Utils.abs(key.hashCode) % numPartitions
> > >   }
> > >
> > > New partitioner:
> > >
> > >
> >
> ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
> > >
> > >         } else {
> > >             // hash the key to choose a partition
> > >             return Utils.abs(Utils.murmur2(record.key())) %
> > numPartitions;
> > >         }
> > >
> > > Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2
> > > isn't the same logic as hashCode, especially since hashCode is
> > > overrideable).
> > >
> > > Was it intentional that the hashing algorithm would change between the
> > old
> > > and new producer? If so, was this documented? I don't know if anyone
> was
> > > relying on the old default partitioner, as opposed to going round-robin
> > or
> > > using their own custom partitioner. Do you expect it to change in the
> > > future? I'm guessing that one of the main reasons to have a custom
> > hashing
> > > algorithm is so that you are full control of the partitioning and can
> > keep
> > > it stable (as opposed to being reliant on hashCode()).
> > >
> > > Thanks,
> > > -James
> > >
> > >
> >
>

Re: New and old producers partition messages differently

Posted by James Cheng <jc...@tivo.com>.
On Apr 26, 2015, at 9:03 PM, Gwen Shapira <gs...@cloudera.com> wrote:

> Definitely +1 for advertising this in the docs.
> 
> What I can't figure out is the upgrade path... if my application assumes
> that all data for a single user is in one partition (so it subscribes to a
> single partition and expects everything about a specific subset of users to
> be in that partition), this assumption will not survive an upgrade to
> 0.8.2.X.  I think the assumption of stable hash partitions even after
> upgrades is pretty reasonable (i.e. I made it about gazillion times without
> thinking twice). Note that in this story my app wasn't even upgraded - it
> broke because a producer upgraded to a new API.
> 

Agreed. And part of the whole thing about Kafka is decoupling producer from consumer. If you upgrade the producer and "break" the stream, you may break consumers that you don't even know exist.

(Side question: How do you even identify who your consumers are? The only way I can think of is by looking at something like consumer groups and offsets, and find mappings between those and the people who wrote those.)

-James

> If we advertise: "upgrading to the new producer API may break consumers",
> we may need to offer a work-around to allow people to upgrade producers
> anyway.
> Perhaps we can say "wait for Sriharsha's partitioner patch and write a
> custom partitioner that uses hashcode()".
> 
> Gwen
> 
> 
> 
> On Sun, Apr 26, 2015 at 7:57 AM, Jay Kreps <ja...@gmail.com> wrote:
> 
>> This was actually intentional.
>> 
>> The problem with relying on hashCode is that
>> (1) it is often a very bad hash function,
>> (2) it is not guaranteed to be consistent from run to run (i.e. if you
>> restart the jvm the value of hashing the same key can change!),
>> (3) it is not available outside the jvm so non-java producers can't use the
>> same function.
>> 
>> In general at the moment different producers don't use the same hash code
>> so I think this is not quite as bad as it sounds. Though it would be good
>> to standardize things.
>> 
>> I think the most obvious thing we could do here would be to do a much
>> better job of advertising this in the docs, though, so people don't get
>> bitten by it.
>> 
>> -Jay
>> 
>> On Fri, Apr 24, 2015 at 5:48 PM, James Cheng <jc...@tivo.com> wrote:
>> 
>>> Hi,
>>> 
>>> I was playing with the new producer in 0.8.2.1 using partition keys
>>> ("semantic partitioning" I believe is the phrase?). I noticed that the
>>> default partitioner in 0.8.2.1 does not partition items the same way as
>> the
>>> old 0.8.1.1 default partitioner was doing. For a test item, the old
>>> producer was sending it to partition 0, whereas the new producer was
>>> sending it to partition 4.
>>> 
>>> Digging in the code, it appears that the partitioning logic is different
>>> between the old and new producers. Both of them hash the key, but they
>> use
>>> different hashing algorithms.
>>> 
>>> Old partitioner:
>>> ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
>>> 
>>>  def partition(key: Any, numPartitions: Int): Int = {
>>>    Utils.abs(key.hashCode) % numPartitions
>>>  }
>>> 
>>> New partitioner:
>>> 
>>> 
>> ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
>>> 
>>>        } else {
>>>            // hash the key to choose a partition
>>>            return Utils.abs(Utils.murmur2(record.key())) %
>> numPartitions;
>>>        }
>>> 
>>> Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2
>>> isn't the same logic as hashCode, especially since hashCode is
>>> overrideable).
>>> 
>>> Was it intentional that the hashing algorithm would change between the
>> old
>>> and new producer? If so, was this documented? I don't know if anyone was
>>> relying on the old default partitioner, as opposed to going round-robin
>> or
>>> using their own custom partitioner. Do you expect it to change in the
>>> future? I'm guessing that one of the main reasons to have a custom
>> hashing
>>> algorithm is so that you are full control of the partitioning and can
>> keep
>>> it stable (as opposed to being reliant on hashCode()).
>>> 
>>> Thanks,
>>> -James
>>> 
>>> 
>> 


Re: New and old producers partition messages differently

Posted by Gwen Shapira <gs...@cloudera.com>.
Definitely +1 for advertising this in the docs.

What I can't figure out is the upgrade path... if my application assumes
that all data for a single user is in one partition (so it subscribes to a
single partition and expects everything about a specific subset of users to
be in that partition), this assumption will not survive an upgrade to
0.8.2.X.  I think the assumption of stable hash partitions even after
upgrades is pretty reasonable (i.e. I made it about gazillion times without
thinking twice). Note that in this story my app wasn't even upgraded - it
broke because a producer upgraded to a new API.

If we advertise: "upgrading to the new producer API may break consumers",
we may need to offer a work-around to allow people to upgrade producers
anyway.
Perhaps we can say "wait for Sriharsha's partitioner patch and write a
custom partitioner that uses hashcode()".

Gwen



On Sun, Apr 26, 2015 at 7:57 AM, Jay Kreps <ja...@gmail.com> wrote:

> This was actually intentional.
>
> The problem with relying on hashCode is that
> (1) it is often a very bad hash function,
> (2) it is not guaranteed to be consistent from run to run (i.e. if you
> restart the jvm the value of hashing the same key can change!),
> (3) it is not available outside the jvm so non-java producers can't use the
> same function.
>
> In general at the moment different producers don't use the same hash code
> so I think this is not quite as bad as it sounds. Though it would be good
> to standardize things.
>
> I think the most obvious thing we could do here would be to do a much
> better job of advertising this in the docs, though, so people don't get
> bitten by it.
>
> -Jay
>
> On Fri, Apr 24, 2015 at 5:48 PM, James Cheng <jc...@tivo.com> wrote:
>
> > Hi,
> >
> > I was playing with the new producer in 0.8.2.1 using partition keys
> > ("semantic partitioning" I believe is the phrase?). I noticed that the
> > default partitioner in 0.8.2.1 does not partition items the same way as
> the
> > old 0.8.1.1 default partitioner was doing. For a test item, the old
> > producer was sending it to partition 0, whereas the new producer was
> > sending it to partition 4.
> >
> > Digging in the code, it appears that the partitioning logic is different
> > between the old and new producers. Both of them hash the key, but they
> use
> > different hashing algorithms.
> >
> > Old partitioner:
> > ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
> >
> >   def partition(key: Any, numPartitions: Int): Int = {
> >     Utils.abs(key.hashCode) % numPartitions
> >   }
> >
> > New partitioner:
> >
> >
> ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
> >
> >         } else {
> >             // hash the key to choose a partition
> >             return Utils.abs(Utils.murmur2(record.key())) %
> numPartitions;
> >         }
> >
> > Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2
> > isn't the same logic as hashCode, especially since hashCode is
> > overrideable).
> >
> > Was it intentional that the hashing algorithm would change between the
> old
> > and new producer? If so, was this documented? I don't know if anyone was
> > relying on the old default partitioner, as opposed to going round-robin
> or
> > using their own custom partitioner. Do you expect it to change in the
> > future? I'm guessing that one of the main reasons to have a custom
> hashing
> > algorithm is so that you are full control of the partitioning and can
> keep
> > it stable (as opposed to being reliant on hashCode()).
> >
> > Thanks,
> > -James
> >
> >
>

Re: New and old producers partition messages differently

Posted by Gwen Shapira <gs...@cloudera.com>.
We are doing work for supporting custom partitioner, so everything is
customizable :)

On Sun, Apr 26, 2015 at 8:52 PM, Wes Chow <we...@chartbeat.com> wrote:

>
> Along these lines too, is the function customizable? I could see how mmh3
> (or 2) would be generally sufficient, however in some cases you may want
> something that's a bit more cryptographically secure so as to avoid attacks.
>
> (Though I suppose the programmer could first crypto-hash the key, and then
> pass it through mmh.)
>
> Wes
>
>   Evan Huus <ev...@shopify.com>
>  April 26, 2015 11:51 AM
> Related to this topic: why the choice of murmur2 over murmur3? I'm not
> super-familiar with the differences between the two, but I'd assume murmur3
> would be faster or have a more even distribution or something.
>
> Evan
>
> P.S. Also, there appear to be many murmur3 implementations for other
> languages, whereas murmur2 is much less common.
>
>
>   Jay Kreps <ja...@gmail.com>
>  April 26, 2015 10:57 AM
> This was actually intentional.
>
> The problem with relying on hashCode is that
> (1) it is often a very bad hash function,
> (2) it is not guaranteed to be consistent from run to run (i.e. if you
> restart the jvm the value of hashing the same key can change!),
> (3) it is not available outside the jvm so non-java producers can't use the
> same function.
>
> In general at the moment different producers don't use the same hash code
> so I think this is not quite as bad as it sounds. Though it would be good
> to standardize things.
>
> I think the most obvious thing we could do here would be to do a much
> better job of advertising this in the docs, though, so people don't get
> bitten by it.
>
> -Jay
>
>
>   James Cheng <jc...@tivo.com>
>  April 24, 2015 8:48 PM
> Hi,
>
> I was playing with the new producer in 0.8.2.1 using partition keys
> ("semantic partitioning" I believe is the phrase?). I noticed that the
> default partitioner in 0.8.2.1 does not partition items the same way as the
> old 0.8.1.1 default partitioner was doing. For a test item, the old
> producer was sending it to partition 0, whereas the new producer was
> sending it to partition 4.
>
> Digging in the code, it appears that the partitioning logic is different
> between the old and new producers. Both of them hash the key, but they use
> different hashing algorithms.
>
> Old partitioner:
> ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
>
> def partition(key: Any, numPartitions: Int): Int = {
> Utils.abs(key.hashCode) % numPartitions
> }
>
> New partitioner:
>
> ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
>
> } else {
> // hash the key to choose a partition
> return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
> }
>
> Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2
> isn't the same logic as hashCode, especially since hashCode is
> overrideable).
>
> Was it intentional that the hashing algorithm would change between the old
> and new producer? If so, was this documented? I don't know if anyone was
> relying on the old default partitioner, as opposed to going round-robin or
> using their own custom partitioner. Do you expect it to change in the
> future? I'm guessing that one of the main reasons to have a custom hashing
> algorithm is so that you are full control of the partitioning and can keep
> it stable (as opposed to being reliant on hashCode()).
>
> Thanks,
> -James
>
>

Re: New and old producers partition messages differently

Posted by Wes Chow <we...@chartbeat.com>.
Along these lines too, is the function customizable? I could see how 
mmh3 (or 2) would be generally sufficient, however in some cases you may 
want something that's a bit more cryptographically secure so as to avoid 
attacks.

(Though I suppose the programmer could first crypto-hash the key, and 
then pass it through mmh.)

Wes

> Evan Huus <ma...@shopify.com>
> April 26, 2015 11:51 AM
> Related to this topic: why the choice of murmur2 over murmur3? I'm not
> super-familiar with the differences between the two, but I'd assume 
> murmur3
> would be faster or have a more even distribution or something.
>
> Evan
>
> P.S. Also, there appear to be many murmur3 implementations for other
> languages, whereas murmur2 is much less common.
>
>
> Jay Kreps <ma...@gmail.com>
> April 26, 2015 10:57 AM
> This was actually intentional.
>
> The problem with relying on hashCode is that
> (1) it is often a very bad hash function,
> (2) it is not guaranteed to be consistent from run to run (i.e. if you
> restart the jvm the value of hashing the same key can change!),
> (3) it is not available outside the jvm so non-java producers can't 
> use the
> same function.
>
> In general at the moment different producers don't use the same hash code
> so I think this is not quite as bad as it sounds. Though it would be good
> to standardize things.
>
> I think the most obvious thing we could do here would be to do a much
> better job of advertising this in the docs, though, so people don't get
> bitten by it.
>
> -Jay
>
>
> James Cheng <ma...@tivo.com>
> April 24, 2015 8:48 PM
> Hi,
>
> I was playing with the new producer in 0.8.2.1 using partition keys 
> ("semantic partitioning" I believe is the phrase?). I noticed that the 
> default partitioner in 0.8.2.1 does not partition items the same way 
> as the old 0.8.1.1 default partitioner was doing. For a test item, the 
> old producer was sending it to partition 0, whereas the new producer 
> was sending it to partition 4.
>
> Digging in the code, it appears that the partitioning logic is 
> different between the old and new producers. Both of them hash the 
> key, but they use different hashing algorithms.
>
> Old partitioner:
> ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
>
> def partition(key: Any, numPartitions: Int): Int = {
> Utils.abs(key.hashCode) % numPartitions
> }
>
> New partitioner:
> ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
>
> } else {
> // hash the key to choose a partition
> return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
> }
>
> Where murmur2 is a custom hashing algorithm. (I'm assuming that 
> murmur2 isn't the same logic as hashCode, especially since hashCode is 
> overrideable).
>
> Was it intentional that the hashing algorithm would change between the 
> old and new producer? If so, was this documented? I don't know if 
> anyone was relying on the old default partitioner, as opposed to going 
> round-robin or using their own custom partitioner. Do you expect it to 
> change in the future? I'm guessing that one of the main reasons to 
> have a custom hashing algorithm is so that you are full control of the 
> partitioning and can keep it stable (as opposed to being reliant on 
> hashCode()).
>
> Thanks,
> -James
>

Re: New and old producers partition messages differently

Posted by Evan Huus <ev...@shopify.com>.
Related to this topic: why the choice of murmur2 over murmur3? I'm not
super-familiar with the differences between the two, but I'd assume murmur3
would be faster or have a more even distribution or something.

Evan

P.S. Also, there appear to be many murmur3 implementations for other
languages, whereas murmur2 is much less common.

On Sun, Apr 26, 2015 at 10:57 AM, Jay Kreps <ja...@gmail.com> wrote:

> This was actually intentional.
>
> The problem with relying on hashCode is that
> (1) it is often a very bad hash function,
> (2) it is not guaranteed to be consistent from run to run (i.e. if you
> restart the jvm the value of hashing the same key can change!),
> (3) it is not available outside the jvm so non-java producers can't use the
> same function.
>
> In general at the moment different producers don't use the same hash code
> so I think this is not quite as bad as it sounds. Though it would be good
> to standardize things.
>
> I think the most obvious thing we could do here would be to do a much
> better job of advertising this in the docs, though, so people don't get
> bitten by it.
>
> -Jay
>
> On Fri, Apr 24, 2015 at 5:48 PM, James Cheng <jc...@tivo.com> wrote:
>
> > Hi,
> >
> > I was playing with the new producer in 0.8.2.1 using partition keys
> > ("semantic partitioning" I believe is the phrase?). I noticed that the
> > default partitioner in 0.8.2.1 does not partition items the same way as
> the
> > old 0.8.1.1 default partitioner was doing. For a test item, the old
> > producer was sending it to partition 0, whereas the new producer was
> > sending it to partition 4.
> >
> > Digging in the code, it appears that the partitioning logic is different
> > between the old and new producers. Both of them hash the key, but they
> use
> > different hashing algorithms.
> >
> > Old partitioner:
> > ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
> >
> >   def partition(key: Any, numPartitions: Int): Int = {
> >     Utils.abs(key.hashCode) % numPartitions
> >   }
> >
> > New partitioner:
> >
> >
> ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
> >
> >         } else {
> >             // hash the key to choose a partition
> >             return Utils.abs(Utils.murmur2(record.key())) %
> numPartitions;
> >         }
> >
> > Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2
> > isn't the same logic as hashCode, especially since hashCode is
> > overrideable).
> >
> > Was it intentional that the hashing algorithm would change between the
> old
> > and new producer? If so, was this documented? I don't know if anyone was
> > relying on the old default partitioner, as opposed to going round-robin
> or
> > using their own custom partitioner. Do you expect it to change in the
> > future? I'm guessing that one of the main reasons to have a custom
> hashing
> > algorithm is so that you are full control of the partitioning and can
> keep
> > it stable (as opposed to being reliant on hashCode()).
> >
> > Thanks,
> > -James
> >
> >
>

Re: New and old producers partition messages differently

Posted by Jay Kreps <ja...@gmail.com>.
This was actually intentional.

The problem with relying on hashCode is that
(1) it is often a very bad hash function,
(2) it is not guaranteed to be consistent from run to run (i.e. if you
restart the jvm the value of hashing the same key can change!),
(3) it is not available outside the jvm so non-java producers can't use the
same function.

In general at the moment different producers don't use the same hash code
so I think this is not quite as bad as it sounds. Though it would be good
to standardize things.

I think the most obvious thing we could do here would be to do a much
better job of advertising this in the docs, though, so people don't get
bitten by it.

-Jay

On Fri, Apr 24, 2015 at 5:48 PM, James Cheng <jc...@tivo.com> wrote:

> Hi,
>
> I was playing with the new producer in 0.8.2.1 using partition keys
> ("semantic partitioning" I believe is the phrase?). I noticed that the
> default partitioner in 0.8.2.1 does not partition items the same way as the
> old 0.8.1.1 default partitioner was doing. For a test item, the old
> producer was sending it to partition 0, whereas the new producer was
> sending it to partition 4.
>
> Digging in the code, it appears that the partitioning logic is different
> between the old and new producers. Both of them hash the key, but they use
> different hashing algorithms.
>
> Old partitioner:
> ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
>
>   def partition(key: Any, numPartitions: Int): Int = {
>     Utils.abs(key.hashCode) % numPartitions
>   }
>
> New partitioner:
>
> ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
>
>         } else {
>             // hash the key to choose a partition
>             return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
>         }
>
> Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2
> isn't the same logic as hashCode, especially since hashCode is
> overrideable).
>
> Was it intentional that the hashing algorithm would change between the old
> and new producer? If so, was this documented? I don't know if anyone was
> relying on the old default partitioner, as opposed to going round-robin or
> using their own custom partitioner. Do you expect it to change in the
> future? I'm guessing that one of the main reasons to have a custom hashing
> algorithm is so that you are full control of the partitioning and can keep
> it stable (as opposed to being reliant on hashCode()).
>
> Thanks,
> -James
>
>