You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Upesh Desai <ud...@itrsgroup.com> on 2022/11/19 00:07:10 UTC

Kafka Streams possible partitioner bug

Hello all,

We have been working on implementing a custom partitioner for our producer within a simple stream application, that will partition the records by a member field when sending them to the output topic. By looking at the contract of the partition() method in the Partitioner interface, it would seem that the value Object would be in its deserialized form when this method is called:

/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
*/
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

For a regular producer that’s instantiated, this seems to work correctly. However, within the RecordCollectorImpl class, we found that in a streams app, the record key and value are serialized prior to being sent as seen below:


final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);

streamsProducer.send(serializedRecord, (metadata, exception) -> {

We didn’t want to have to deserialize the value object again within the custom partitioner, so is there another way around this? Or is this a bug within the streams producer code?

Thanks in advance!
Upesh Desai

Upesh Desai
Senior Software Developer
udesai@itrsgroup.com
www.itrsgroup.com
Internet communications are not secure and therefore the ITRS Group does not accept legal responsibility for the contents of this message. Any view or opinions presented are solely those of the author and do not necessarily represent those of the ITRS Group unless otherwise specifically stated.
[itrs.email.signature]

Re: Kafka Streams possible partitioner bug

Posted by Upesh Desai <ud...@itrsgroup.com>.
Yes, we tried to plug the custom partition in via the `partitioner.class` ProducerConfig as you thought.

We confirmed that implementing the StreamPartitioner interface, and passing it to our Topology definition indeed does work, thank you for your help!

Agreed, updating the documentation will help for now, and we will look out for the new config when it comes out.

Thanks for the quick help,
Upesh


Upesh Desai | Senior Software Developer | udesai@itrsgroup.com
www.itrsgroup.com
From: Sophie Blee-Goldman <so...@confluent.io.INVALID>
Date: Friday, November 18, 2022 at 7:35 PM
To: users@kafka.apache.org <us...@kafka.apache.org>
Subject: Re: Kafka Streams possible partitioner bug
Hey Upesh, are you trying to plug in the custom partitioner via the
`partitioner.class` ProducerConfig?
That won't work in Streams for the exact reason you highlighted, which is
why Streams has its own
version of the interface called StreamPartitioner -- this is what you need
to implement instead.

Unfortunately there is currently no config for Streams that will be applied
across the application, so
you will have to plug in the custom partitioner by passing it in directly
to the operators. If you look
at the various APIs of the DSL you'll notice many have an overload which
takes in this parameter
(eg see "Produced")

As it turns out however I am currently working on a KIP for a
default.stream.partitioner config that
you will be able to set once rather than carefully passing it in across the
topology. I'll take this as
good evidence of the usefulness of this feature -- unfortunately you'll
have to wait for a bit as it
will not be available until version 3.5 most likely.

Anyways trying to use the Producer config is an honest mistake, and we
don't seem to include it
in the documented list of client configs that can't be set in Streams. I've
filed a ticket to fix up
the docs and also to explicitly log a warning if any of these are set
instead of silently ignoring them
or flat out breaking as in this case

https://issues.apache.org/jira/browse/KAFKA-14404
https://issues.apache.org/jira/browse/KAFKA-14405

On Fri, Nov 18, 2022 at 4:07 PM Upesh Desai <ud...@itrsgroup.com> wrote:

> Hello all,
>
>
>
> We have been working on implementing a custom partitioner for our producer
> within a simple stream application, that will partition the records by a
> member field when sending them to the output topic. By looking at the
> contract of the partition() method in the Partitioner interface, it would
> seem that the value Object would be in its deserialized form when this
> method is called:
>
>
>
>
>
>
>
>
>
>
>
>
>
> */** * Compute the partition for the given record. * * @param topic The
> topic name * @param key The key to partition on (or null if no key) *
> @param keyBytes The serialized key to partition on( or null if no key) *
> @param value The value to partition on or null * @param valueBytes The
> serialized value to partition on or null * @param cluster The current
> cluster metadata */ *int partition(String topic, Object key, byte[]
> keyBytes, Object value, byte[] valueBytes, Cluster cluster);
>
>
>
> For a regular producer that’s instantiated, this seems to work correctly.
> However, within the RecordCollectorImpl class, we found that in a streams
> app, the record key and value are serialized prior to being sent as seen
> below:
>
>
>
>
>
> final ProducerRecord<byte[], byte[]> serializedRecord = new
> ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
>
> streamsProducer.send(serializedRecord, (metadata, exception) -> {
>
>
>
> We didn’t want to have to deserialize the value object again within the
> custom partitioner, so is there another way around this? Or is this a bug
> within the streams producer code?
>
>
>
> Thanks in advance!
>
> Upesh Desai
> <https://www.itrsgroup.com/>
> Upesh Desai​
> Senior Software Developer
> *udesai@itrsgroup.com* <ud...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> Internet communications are not secure and therefore the ITRS Group does
> not accept legal responsibility for the contents of this message. Any view
> or opinions presented are solely those of the author and do not necessarily
> represent those of the ITRS Group unless otherwise specifically stated.
> [itrs.email.signature]
>

Re: Kafka Streams possible partitioner bug

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Hey Upesh, are you trying to plug in the custom partitioner via the
`partitioner.class` ProducerConfig?
That won't work in Streams for the exact reason you highlighted, which is
why Streams has its own
version of the interface called StreamPartitioner -- this is what you need
to implement instead.

Unfortunately there is currently no config for Streams that will be applied
across the application, so
you will have to plug in the custom partitioner by passing it in directly
to the operators. If you look
at the various APIs of the DSL you'll notice many have an overload which
takes in this parameter
(eg see "Produced")

As it turns out however I am currently working on a KIP for a
default.stream.partitioner config that
you will be able to set once rather than carefully passing it in across the
topology. I'll take this as
good evidence of the usefulness of this feature -- unfortunately you'll
have to wait for a bit as it
will not be available until version 3.5 most likely.

Anyways trying to use the Producer config is an honest mistake, and we
don't seem to include it
in the documented list of client configs that can't be set in Streams. I've
filed a ticket to fix up
the docs and also to explicitly log a warning if any of these are set
instead of silently ignoring them
or flat out breaking as in this case

https://issues.apache.org/jira/browse/KAFKA-14404
https://issues.apache.org/jira/browse/KAFKA-14405

On Fri, Nov 18, 2022 at 4:07 PM Upesh Desai <ud...@itrsgroup.com> wrote:

> Hello all,
>
>
>
> We have been working on implementing a custom partitioner for our producer
> within a simple stream application, that will partition the records by a
> member field when sending them to the output topic. By looking at the
> contract of the partition() method in the Partitioner interface, it would
> seem that the value Object would be in its deserialized form when this
> method is called:
>
>
>
>
>
>
>
>
>
>
>
>
>
> */** * Compute the partition for the given record. * * @param topic The
> topic name * @param key The key to partition on (or null if no key) *
> @param keyBytes The serialized key to partition on( or null if no key) *
> @param value The value to partition on or null * @param valueBytes The
> serialized value to partition on or null * @param cluster The current
> cluster metadata */ *int partition(String topic, Object key, byte[]
> keyBytes, Object value, byte[] valueBytes, Cluster cluster);
>
>
>
> For a regular producer that’s instantiated, this seems to work correctly.
> However, within the RecordCollectorImpl class, we found that in a streams
> app, the record key and value are serialized prior to being sent as seen
> below:
>
>
>
>
>
> final ProducerRecord<byte[], byte[]> serializedRecord = new
> ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
>
> streamsProducer.send(serializedRecord, (metadata, exception) -> {
>
>
>
> We didn’t want to have to deserialize the value object again within the
> custom partitioner, so is there another way around this? Or is this a bug
> within the streams producer code?
>
>
>
> Thanks in advance!
>
> Upesh Desai
> <https://www.itrsgroup.com/>
> Upesh Desai​
> Senior Software Developer
> *udesai@itrsgroup.com* <ud...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> Internet communications are not secure and therefore the ITRS Group does
> not accept legal responsibility for the contents of this message. Any view
> or opinions presented are solely those of the author and do not necessarily
> represent those of the ITRS Group unless otherwise specifically stated.
> [itrs.email.signature]
>