You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Liam (Jira)" <ji...@apache.org> on 2023/04/11 00:21:00 UTC

[jira] [Updated] (FLINK-31762) Subscribe to multiple Kafka topics may cause partition assignment skew

     [ https://issues.apache.org/jira/browse/FLINK-31762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Liam updated FLINK-31762:
-------------------------
    Summary: Subscribe to multiple Kafka topics may cause partition assignment skew  (was: Subscribe multiple Kafka topics may cause partition assignment skew)

> Subscribe to multiple Kafka topics may cause partition assignment skew
> ----------------------------------------------------------------------
>
>                 Key: FLINK-31762
>                 URL: https://issues.apache.org/jira/browse/FLINK-31762
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.0, 1.18.0
>            Reporter: Liam
>            Priority: Major
>         Attachments: image-2023-04-11-08-00-16-054.png, image-2023-04-11-08-12-24-115.png
>
>
> To simplify the demonstration, let us assume that there are two topics, and each topic has four partitions. We have set the parallelism to eight to consume these two topics. However, the current partition assignment method may lead to some subtasks being assigned two partitions while others are left with none.
> !image-2023-04-11-08-00-16-054.png|width=500,height=143!
> In my case, the situation is even worse as I have ten topics, each with 100 partitions. If I set the parallelism to 1000, some slots may be assigned seven partitions while others remain unassigned.
> To address this issue, I propose a new partition assignment solution. In this approach, round-robin assignment takes place between all topics, not just one.
> For example, the ideal assignment for the case mentioned above is presented below:
>  
> !https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134!
> This new solution can also handle cases where each topic has more partitions.
> !image-2023-04-11-08-12-24-115.png|width=444,height=127!
> Let us work together to reach a consensus on this proposal. Thank you!
>  
> FYI: how the partition be assigned currently
> {code:java}
> public class KafkaTopicPartitionAssigner {    
>     public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
>         return assign(partition.getTopic(), partition.getPartition(), numParallelSubtasks);
>     }    public static int assign(String topic, int partition, int numParallelSubtasks) {
>         int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;        // here, the assumption is that the id of Kafka partitions are always ascending
>         // starting from 0, and therefore can be used directly as the offset clockwise from the
>         // start index
>         return (startIndex + partition) % numParallelSubtasks;
>     }
>  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)