You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Hemant Bairwa <ba...@gmail.com> on 2020/06/18 04:38:33 UTC

Uneven distribution of messages in topic's partitions

Hello All

I have a single producer service which is queuing message into a topic with
let say 12 partitions. I want to evenly distribute the messages across all
the partitions in a round robin fashion.
Even after using default partitioning and keeping key 'NULL', the messages
are not getting distributed evenly. Rather some partitions are getting none
of the messages while some are getting multiple.
One reason I found for this behaviour, somewhere, is that if there are
lesser number of producers than the number of partitions, it distributes
the messages to fewer partitions to limit many open sockets.
However I have achieved even distribution through code by first getting
total partition numbers and then passing partition number in the
incremental order along with the message into the producer record. Once the
partition number reaches end of the partition number then again resetting
the next partition number to zero.

Query:
1. Is there can be any downside of above approach used?
2. If yes, how to achieve even distribution of messages in an optimized way?

Re: Uneven distribution of messages in topic's partitions

Posted by Ricardo Ferreira <ri...@riferrei.com>.
Nag,

Technically the `DefaultPartitioner` uses Mumur2 as you can see in the 
implementation code from Kafka's trunk:

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java

The `RoundRobinPartitioner` should be used if the behavior that you want 
is uniform distribution of data across the partitions. The difference 
between the `UniformStickyPartitioner` and the `RoundRobinPartitioner` 
is that the former sticks to a given partition to allow better batching 
but that also means that maybe the data distribution won't be even.

And yes, the `UniformStickyPartitioner` addresses better latency.

Thanks,

-- Ricardo

On 6/19/20 11:47 PM, Nag Y wrote:
> Hi  Ricardo ,
> Just follow up question to add , I believe the defaultpartioner uses
> mumur3 as default .
>       Should RoundRobinPartitioner class be used to  have an equal
> distribution to maximum extent.instead of default partitioner ?
>       Is StickyPartitioner (mentioned above) is different from
> RoundRobinPartitioner and provides better distribution ?
>       And, also I see  StickyPartitioner from KIP that it addresses the
> improvements needed to reduce the latency.
>
> Thanks,
>
>
> On Fri, Jun 19, 2020 at 11:36 PM Ricardo Ferreira <ri...@riferrei.com>
> wrote:
>
>> Hi Hemant,
>>
>> Being able to lookup specific records by key is not possible in Kafka.
>> As a distributed streaming platform based on the concept of a commit log
>> Kafka organizes data sequentially where each record has an offset that
>> uniquely identifies not who the record is but where within the log it is
>> positioned.
>>
>> In order to implement record lookup by key you would need to use Kafka
>> Streams or ksqlDB. I would recommend ksqlDB since you can easily create
>> a stream out of your existing topic and then make that stream
>> transformed into a table. Note only that currently ksqlDB requires that
>> each table that would serve pull requests (i.e.: queries that serve
>> requests given a key) need to be created using an aggregation construct.
>> So you might need to work that out in order to achieve the behavior that
>> you want.
>>
>> Thanks,
>>
>> -- Ricardo
>>
>> On 6/19/20 1:07 PM, Hemant Bairwa wrote:
>>> Thanks Ricardo.
>>>
>>> I need some information on more use case.
>>> In my application I need to use Kafka to maintain the different
>>> workflow states of message items while processing through different
>>> processes. For example in my application all messages transits from
>>> Process A to Process Z and I need to maintain all the processed states
>>> by an item. So for item xyz there should be total 26 entries in Kafka
>>> topic.
>>> xyz, A
>>> xyz, B... and so on.
>>>
>>> User should be able to retrieve all the messages for any specific key
>>> as many times. That is a DB type of feature is required.
>>>
>>> 1. Is Kafka alone is able to cater this requirement?
>>> 2. Or do I need to use KSql DB for meeting this requirement? I did
>>> some research around it but I don't want to run separate KSql DB server.
>>> 3. Any other suggestions?
>>>
>>> Regards,
>>>
>>>
>>>
>>> On Thu, 18 Jun 2020, 6:51 pm Ricardo Ferreira, <riferrei@riferrei.com
>>> <ma...@riferrei.com>> wrote:
>>>
>>>      Hemant,
>>>
>>>      This behavior might be the result of the version of AK (Apache
>>>      Kafka) that you are using. Before AK 2.4 the default behavior for
>>>      the DefaultPartitioner was to load balance data production across
>>>      the partitions as you described. But it was found that this
>>>      behavior would cause performance problems to the batching strategy
>>>      that each producer does. Therefore, AK 2.4 introduced a new
>>>      behavior into the DefaultPartitioner called sticky partitioning.
>>>      You can follow up in this change reading up the KIP that was
>>>      created for this change: *KIP-480
>>>      <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
>>> *.
>>>
>>>      The only downside that I see in your workaround is if you are
>>>      handling connections to the partitions programmatically. That
>>>      would make your code fragile because if the # of partitions for
>>>      the topic changes then your code would not know this. Instead,
>>>      just use the RoundRobinPartitioner
>>>      <
>> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/RoundRobinPartitioner.html
>>>      explicitly in your producer:
>>>
>>>      ```
>>>
>>>      configs.put("partitioner.class",
>>>      "org.apache.kafka.clients.producer.RoundRobinPartitioner");
>>>
>>>      ```
>>>
>>>      Thanks,
>>>
>>>      -- Ricardo
>>>
>>>      On 6/18/20 12:38 AM, Hemant Bairwa wrote:
>>>>      Hello All
>>>>
>>>>      I have a single producer service which is queuing message into a
>> topic with
>>>>      let say 12 partitions. I want to evenly distribute the messages
>> across all
>>>>      the partitions in a round robin fashion.
>>>>      Even after using default partitioning and keeping key 'NULL', the
>> messages
>>>>      are not getting distributed evenly. Rather some partitions are
>> getting none
>>>>      of the messages while some are getting multiple.
>>>>      One reason I found for this behaviour, somewhere, is that if there
>> are
>>>>      lesser number of producers than the number of partitions, it
>> distributes
>>>>      the messages to fewer partitions to limit many open sockets.
>>>>      However I have achieved even distribution through code by first
>> getting
>>>>      total partition numbers and then passing partition number in the
>>>>      incremental order along with the message into the producer record.
>> Once the
>>>>      partition number reaches end of the partition number then again
>> resetting
>>>>      the next partition number to zero.
>>>>
>>>>      Query:
>>>>      1. Is there can be any downside of above approach used?
>>>>      2. If yes, how to achieve even distribution of messages in an
>> optimized way?

Re: Uneven distribution of messages in topic's partitions

Posted by Nag Y <an...@gmail.com>.
Hi  Ricardo ,
Just follow up question to add , I believe the defaultpartioner uses
mumur3 as default .
     Should RoundRobinPartitioner class be used to  have an equal
distribution to maximum extent.instead of default partitioner ?
     Is StickyPartitioner (mentioned above) is different from
RoundRobinPartitioner and provides better distribution ?
     And, also I see  StickyPartitioner from KIP that it addresses the
improvements needed to reduce the latency.

Thanks,


On Fri, Jun 19, 2020 at 11:36 PM Ricardo Ferreira <ri...@riferrei.com>
wrote:

> Hi Hemant,
>
> Being able to lookup specific records by key is not possible in Kafka.
> As a distributed streaming platform based on the concept of a commit log
> Kafka organizes data sequentially where each record has an offset that
> uniquely identifies not who the record is but where within the log it is
> positioned.
>
> In order to implement record lookup by key you would need to use Kafka
> Streams or ksqlDB. I would recommend ksqlDB since you can easily create
> a stream out of your existing topic and then make that stream
> transformed into a table. Note only that currently ksqlDB requires that
> each table that would serve pull requests (i.e.: queries that serve
> requests given a key) need to be created using an aggregation construct.
> So you might need to work that out in order to achieve the behavior that
> you want.
>
> Thanks,
>
> -- Ricardo
>
> On 6/19/20 1:07 PM, Hemant Bairwa wrote:
> > Thanks Ricardo.
> >
> > I need some information on more use case.
> > In my application I need to use Kafka to maintain the different
> > workflow states of message items while processing through different
> > processes. For example in my application all messages transits from
> > Process A to Process Z and I need to maintain all the processed states
> > by an item. So for item xyz there should be total 26 entries in Kafka
> > topic.
> > xyz, A
> > xyz, B... and so on.
> >
> > User should be able to retrieve all the messages for any specific key
> > as many times. That is a DB type of feature is required.
> >
> > 1. Is Kafka alone is able to cater this requirement?
> > 2. Or do I need to use KSql DB for meeting this requirement? I did
> > some research around it but I don't want to run separate KSql DB server.
> > 3. Any other suggestions?
> >
> > Regards,
> >
> >
> >
> > On Thu, 18 Jun 2020, 6:51 pm Ricardo Ferreira, <riferrei@riferrei.com
> > <ma...@riferrei.com>> wrote:
> >
> >     Hemant,
> >
> >     This behavior might be the result of the version of AK (Apache
> >     Kafka) that you are using. Before AK 2.4 the default behavior for
> >     the DefaultPartitioner was to load balance data production across
> >     the partitions as you described. But it was found that this
> >     behavior would cause performance problems to the batching strategy
> >     that each producer does. Therefore, AK 2.4 introduced a new
> >     behavior into the DefaultPartitioner called sticky partitioning.
> >     You can follow up in this change reading up the KIP that was
> >     created for this change: *KIP-480
> >     <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
> >*.
> >
> >     The only downside that I see in your workaround is if you are
> >     handling connections to the partitions programmatically. That
> >     would make your code fragile because if the # of partitions for
> >     the topic changes then your code would not know this. Instead,
> >     just use the RoundRobinPartitioner
> >     <
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/RoundRobinPartitioner.html
> >
> >     explicitly in your producer:
> >
> >     ```
> >
> >     configs.put("partitioner.class",
> >     "org.apache.kafka.clients.producer.RoundRobinPartitioner");
> >
> >     ```
> >
> >     Thanks,
> >
> >     -- Ricardo
> >
> >     On 6/18/20 12:38 AM, Hemant Bairwa wrote:
> >>     Hello All
> >>
> >>     I have a single producer service which is queuing message into a
> topic with
> >>     let say 12 partitions. I want to evenly distribute the messages
> across all
> >>     the partitions in a round robin fashion.
> >>     Even after using default partitioning and keeping key 'NULL', the
> messages
> >>     are not getting distributed evenly. Rather some partitions are
> getting none
> >>     of the messages while some are getting multiple.
> >>     One reason I found for this behaviour, somewhere, is that if there
> are
> >>     lesser number of producers than the number of partitions, it
> distributes
> >>     the messages to fewer partitions to limit many open sockets.
> >>     However I have achieved even distribution through code by first
> getting
> >>     total partition numbers and then passing partition number in the
> >>     incremental order along with the message into the producer record.
> Once the
> >>     partition number reaches end of the partition number then again
> resetting
> >>     the next partition number to zero.
> >>
> >>     Query:
> >>     1. Is there can be any downside of above approach used?
> >>     2. If yes, how to achieve even distribution of messages in an
> optimized way?
> >>
>

Re: Uneven distribution of messages in topic's partitions

Posted by Ricardo Ferreira <ri...@riferrei.com>.
Hi Hemant,

Being able to lookup specific records by key is not possible in Kafka. 
As a distributed streaming platform based on the concept of a commit log 
Kafka organizes data sequentially where each record has an offset that 
uniquely identifies not who the record is but where within the log it is 
positioned.

In order to implement record lookup by key you would need to use Kafka 
Streams or ksqlDB. I would recommend ksqlDB since you can easily create 
a stream out of your existing topic and then make that stream 
transformed into a table. Note only that currently ksqlDB requires that 
each table that would serve pull requests (i.e.: queries that serve 
requests given a key) need to be created using an aggregation construct. 
So you might need to work that out in order to achieve the behavior that 
you want.

Thanks,

-- Ricardo

On 6/19/20 1:07 PM, Hemant Bairwa wrote:
> Thanks Ricardo.
>
> I need some information on more use case.
> In my application I need to use Kafka to maintain the different 
> workflow states of message items while processing through different 
> processes. For example in my application all messages transits from 
> Process A to Process Z and I need to maintain all the processed states 
> by an item. So for item xyz there should be total 26 entries in Kafka 
> topic.
> xyz, A
> xyz, B... and so on.
>
> User should be able to retrieve all the messages for any specific key 
> as many times. That is a DB type of feature is required.
>
> 1. Is Kafka alone is able to cater this requirement?
> 2. Or do I need to use KSql DB for meeting this requirement? I did 
> some research around it but I don't want to run separate KSql DB server.
> 3. Any other suggestions?
>
> Regards,
>
>
>
> On Thu, 18 Jun 2020, 6:51 pm Ricardo Ferreira, <riferrei@riferrei.com 
> <ma...@riferrei.com>> wrote:
>
>     Hemant,
>
>     This behavior might be the result of the version of AK (Apache
>     Kafka) that you are using. Before AK 2.4 the default behavior for
>     the DefaultPartitioner was to load balance data production across
>     the partitions as you described. But it was found that this
>     behavior would cause performance problems to the batching strategy
>     that each producer does. Therefore, AK 2.4 introduced a new
>     behavior into the DefaultPartitioner called sticky partitioning.
>     You can follow up in this change reading up the KIP that was
>     created for this change: *KIP-480
>     <https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner>*.
>
>     The only downside that I see in your workaround is if you are
>     handling connections to the partitions programmatically. That
>     would make your code fragile because if the # of partitions for
>     the topic changes then your code would not know this. Instead,
>     just use the RoundRobinPartitioner
>     <https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/RoundRobinPartitioner.html>
>     explicitly in your producer:
>
>     ```
>
>     configs.put("partitioner.class",
>     "org.apache.kafka.clients.producer.RoundRobinPartitioner");
>
>     ```
>
>     Thanks,
>
>     -- Ricardo
>
>     On 6/18/20 12:38 AM, Hemant Bairwa wrote:
>>     Hello All
>>
>>     I have a single producer service which is queuing message into a topic with
>>     let say 12 partitions. I want to evenly distribute the messages across all
>>     the partitions in a round robin fashion.
>>     Even after using default partitioning and keeping key 'NULL', the messages
>>     are not getting distributed evenly. Rather some partitions are getting none
>>     of the messages while some are getting multiple.
>>     One reason I found for this behaviour, somewhere, is that if there are
>>     lesser number of producers than the number of partitions, it distributes
>>     the messages to fewer partitions to limit many open sockets.
>>     However I have achieved even distribution through code by first getting
>>     total partition numbers and then passing partition number in the
>>     incremental order along with the message into the producer record. Once the
>>     partition number reaches end of the partition number then again resetting
>>     the next partition number to zero.
>>
>>     Query:
>>     1. Is there can be any downside of above approach used?
>>     2. If yes, how to achieve even distribution of messages in an optimized way?
>>

Re: Uneven distribution of messages in topic's partitions

Posted by Hemant Bairwa <ba...@gmail.com>.
Thanks Ricardo.

I need some information on more use case.
In my application I need to use Kafka to maintain the different workflow
states of message items while processing through different processes.
For example in my application all messages transits from Process A to
Process Z and I need to maintain all the processed states by an item. So
for item xyz there should be total 26 entries in Kafka topic.
xyz, A
xyz, B... and so on.

User should be able to retrieve all the messages for any specific key as
many times. That is a DB type of feature is required.

1. Is Kafka alone is able to cater this requirement?
2. Or do I need to use KSql DB for meeting this requirement? I did some
research around it but I don't want to run separate KSql DB server.
3. Any other suggestions?

Regards,



On Thu, 18 Jun 2020, 6:51 pm Ricardo Ferreira, <ri...@riferrei.com>
wrote:

> Hemant,
>
> This behavior might be the result of the version of AK (Apache Kafka) that
> you are using. Before AK 2.4 the default behavior for the
> DefaultPartitioner was to load balance data production across the
> partitions as you described. But it was found that this behavior would
> cause performance problems to the batching strategy that each producer
> does. Therefore, AK 2.4 introduced a new behavior into the
> DefaultPartitioner called sticky partitioning. You can follow up in this
> change reading up the KIP that was created for this change: *KIP-480
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner>*
> .
>
> The only downside that I see in your workaround is if you are handling
> connections to the partitions programmatically. That would make your code
> fragile because if the # of partitions for the topic changes then your code
> would not know this. Instead, just use the RoundRobinPartitioner
> <https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/RoundRobinPartitioner.html>
> explicitly in your producer:
>
> ```
>
> configs.put("partitioner.class",
> "org.apache.kafka.clients.producer.RoundRobinPartitioner");
>
> ```
>
> Thanks,
>
> -- Ricardo
> On 6/18/20 12:38 AM, Hemant Bairwa wrote:
>
> Hello All
>
> I have a single producer service which is queuing message into a topic with
> let say 12 partitions. I want to evenly distribute the messages across all
> the partitions in a round robin fashion.
> Even after using default partitioning and keeping key 'NULL', the messages
> are not getting distributed evenly. Rather some partitions are getting none
> of the messages while some are getting multiple.
> One reason I found for this behaviour, somewhere, is that if there are
> lesser number of producers than the number of partitions, it distributes
> the messages to fewer partitions to limit many open sockets.
> However I have achieved even distribution through code by first getting
> total partition numbers and then passing partition number in the
> incremental order along with the message into the producer record. Once the
> partition number reaches end of the partition number then again resetting
> the next partition number to zero.
>
> Query:
> 1. Is there can be any downside of above approach used?
> 2. If yes, how to achieve even distribution of messages in an optimized way?
>
>
>

Re: Uneven distribution of messages in topic's partitions

Posted by Ricardo Ferreira <ri...@riferrei.com>.
Hemant,

This behavior might be the result of the version of AK (Apache Kafka) 
that you are using. Before AK 2.4 the default behavior for the 
DefaultPartitioner was to load balance data production across the 
partitions as you described. But it was found that this behavior would 
cause performance problems to the batching strategy that each producer 
does. Therefore, AK 2.4 introduced a new behavior into the 
DefaultPartitioner called sticky partitioning. You can follow up in this 
change reading up the KIP that was created for this change: *KIP-480 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner>*.

The only downside that I see in your workaround is if you are handling 
connections to the partitions programmatically. That would make your 
code fragile because if the # of partitions for the topic changes then 
your code would not know this. Instead, just use the 
RoundRobinPartitioner 
<https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/RoundRobinPartitioner.html> 
explicitly in your producer:

```

configs.put("partitioner.class", 
"org.apache.kafka.clients.producer.RoundRobinPartitioner");

```

Thanks,

-- Ricardo

On 6/18/20 12:38 AM, Hemant Bairwa wrote:
> Hello All
>
> I have a single producer service which is queuing message into a topic with
> let say 12 partitions. I want to evenly distribute the messages across all
> the partitions in a round robin fashion.
> Even after using default partitioning and keeping key 'NULL', the messages
> are not getting distributed evenly. Rather some partitions are getting none
> of the messages while some are getting multiple.
> One reason I found for this behaviour, somewhere, is that if there are
> lesser number of producers than the number of partitions, it distributes
> the messages to fewer partitions to limit many open sockets.
> However I have achieved even distribution through code by first getting
> total partition numbers and then passing partition number in the
> incremental order along with the message into the producer record. Once the
> partition number reaches end of the partition number then again resetting
> the next partition number to zero.
>
> Query:
> 1. Is there can be any downside of above approach used?
> 2. If yes, how to achieve even distribution of messages in an optimized way?
>