You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Malcolm McFarland <mm...@cavulus.com> on 2022/12/15 04:01:17 UTC

Samza partition hashing relative to other clients

Hey folks,

I'm working on a system where several different Kafka clients (including
Samza) are producing into the same Kafka topic. It's necessary for each of
these clients to calculate the same partition hash for the same key input
to ensure consistent message ordering (there are some asynchronous actions
that need to be ordered across systems). I've been able to get our non-JVM
Kafka clients to calculate partition identifiers (using the murmur2 hashing
algorithm) in the same manner as the official Java Kafka producers.
However, it looks like Samza uses its own hashing algorithm[0]; this is
fine for maintaining order if it's just Samza producing into a topic, but
it's not so great if Samza is just one system of many that are working on a
multi-stage task.

I've dug through the Samza and Kafka codebases quite a bit over the last
few days, and I'm at a loss about how to get Samza to hash partition
indexes in a way that's compatible with other producers. I've tried
implementing Samza's hashing algorithm in other clients (ie with [1]), but
cannot for the life of me get equivalent partition calculations in a
non-JVM language.

Does anybody know a) if it's possible to define a custom key-to-partition
hashing algorithm in Samza, or b) if there is a reliable general-purpose
algorithm that can create the same results as Samza's algorithm?

Cheers,
Malcolm McFarland
Cavulus

[0]
https://github.com/apache/samza/blob/1.7.0/samza-kafka/src/main/java/org/apache/samza/util/KafkaUtil.java#L47-L49
[1]
https://stackoverflow.com/questions/40303333/how-to-replicate-java-hashcode-in-c-language

Re: Samza partition hashing relative to other clients

Posted by Malcolm McFarland <mm...@cavulus.com>.
Hi Stuart,

Thanks for the tip. I've come across this configuration as well. It's not
so much an issue that I want to change Kafka's partitioner, rather that I
want to ensure that Samza is using Kafka's default partitioner (although
your point about explicitly defining the same partitioner in every client
is a good one). From what I'm seeing in the Samza source, if an
OutgoingMessageEnvelope is instantiated with the most verbose constructor
and the partitionKey is set explicitly to null, Samza's partitioner should
be bypassed and the default Kafka partitioner should be used with the
key[0]. At that point, the key will be passed into Kafka's partitioner,
which then should use the default murmur2 partitioner (Samza having not
provided an explicit partition)[1][2][3][4]. Does that seem right?

Sorry about all of the Github links, just trying to be specific about my
reasoning!

Cheers,
Malcolm McFarland
Cavulus

[0]
https://github.com/apache/samza/blob/1.7.0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala#L97-L101
[1]
https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L895
[2]
https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java#L44-L47
[3]
https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1268-L1271
[4]
https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L52-L60

On Wed, Dec 14, 2022 at 11:45 PM Stuart Perks <st...@gmail.com>
wrote:

> Hi Malcolm,
>
> You should be able to override the following producer config for
> *partitioner.class*:
> https://kafka.apache.org/24/documentation.html#producerconfigs
>
> This can be done as follows via Samza config systems.*system-name*.
> producer.* :
> https://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#kafka
>
> Caveat I haven’t tried this but should work from docs.
>
> But I would say ideally to be safe you should rekey/repartition on your
> consumer to protect against future producers that differ or producers
> accidentally changing the partitioner.
>
> Hope it helps,
>
> Stuart
>
>
>
>
> On 15 Dec 2022, at 04:01, Malcolm McFarland <mm...@cavulus.com>
> wrote:
>
> Hey folks,
>
> I'm working on a system where several different Kafka clients (including
> Samza) are producing into the same Kafka topic. It's necessary for each of
> these clients to calculate the same partition hash for the same key input
> to ensure consistent message ordering (there are some asynchronous actions
> that need to be ordered across systems). I've been able to get our non-JVM
> Kafka clients to calculate partition identifiers (using the murmur2 hashing
> algorithm) in the same manner as the official Java Kafka producers.
> However, it looks like Samza uses its own hashing algorithm[0]; this is
> fine for maintaining order if it's just Samza producing into a topic, but
> it's not so great if Samza is just one system of many that are working on a
> multi-stage task.
>
> I've dug through the Samza and Kafka codebases quite a bit over the last
> few days, and I'm at a loss about how to get Samza to hash partition
> indexes in a way that's compatible with other producers. I've tried
> implementing Samza's hashing algorithm in other clients (ie with [1]), but
> cannot for the life of me get equivalent partition calculations in a
> non-JVM language.
>
> Does anybody know a) if it's possible to define a custom key-to-partition
> hashing algorithm in Samza, or b) if there is a reliable general-purpose
> algorithm that can create the same results as Samza's algorithm?
>
> Cheers,
> Malcolm McFarland
> Cavulus
>
> [0]
>
> https://github.com/apache/samza/blob/1.7.0/samza-kafka/src/main/java/org/apache/samza/util/KafkaUtil.java#L47-L49
> [1]
>
> https://stackoverflow.com/questions/40303333/how-to-replicate-java-hashcode-in-c-language
>
>
>

Re: Samza partition hashing relative to other clients

Posted by Stuart Perks <st...@gmail.com>.
Hi Malcolm, 

You should be able to override the following producer config for partitioner.class: https://kafka.apache.org/24/documentation.html#producerconfigs <https://kafka.apache.org/24/documentation.html#producerconfigs>

This can be done as follows via Samza config systems.system-name.producer.* : https://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#kafka <https://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#kafka>

Caveat I haven’t tried this but should work from docs. 

But I would say ideally to be safe you should rekey/repartition on your consumer to protect against future producers that differ or producers accidentally changing the partitioner. 

Hope it helps, 

Stuart




> On 15 Dec 2022, at 04:01, Malcolm McFarland <mm...@cavulus.com> wrote:
> 
> Hey folks,
> 
> I'm working on a system where several different Kafka clients (including
> Samza) are producing into the same Kafka topic. It's necessary for each of
> these clients to calculate the same partition hash for the same key input
> to ensure consistent message ordering (there are some asynchronous actions
> that need to be ordered across systems). I've been able to get our non-JVM
> Kafka clients to calculate partition identifiers (using the murmur2 hashing
> algorithm) in the same manner as the official Java Kafka producers.
> However, it looks like Samza uses its own hashing algorithm[0]; this is
> fine for maintaining order if it's just Samza producing into a topic, but
> it's not so great if Samza is just one system of many that are working on a
> multi-stage task.
> 
> I've dug through the Samza and Kafka codebases quite a bit over the last
> few days, and I'm at a loss about how to get Samza to hash partition
> indexes in a way that's compatible with other producers. I've tried
> implementing Samza's hashing algorithm in other clients (ie with [1]), but
> cannot for the life of me get equivalent partition calculations in a
> non-JVM language.
> 
> Does anybody know a) if it's possible to define a custom key-to-partition
> hashing algorithm in Samza, or b) if there is a reliable general-purpose
> algorithm that can create the same results as Samza's algorithm?
> 
> Cheers,
> Malcolm McFarland
> Cavulus
> 
> [0]
> https://github.com/apache/samza/blob/1.7.0/samza-kafka/src/main/java/org/apache/samza/util/KafkaUtil.java#L47-L49
> [1]
> https://stackoverflow.com/questions/40303333/how-to-replicate-java-hashcode-in-c-language