You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Suriya Vijayaraghavan (Jira)" <ji...@apache.org> on 2021/08/09 13:45:00 UTC
[jira] [Created] (KAFKA-13180) Data Distribution among partitions
not working as Expected
Suriya Vijayaraghavan created KAFKA-13180:
---------------------------------------------
Summary: Data Distribution among partitions not working as Expected
Key: KAFKA-13180
URL: https://issues.apache.org/jira/browse/KAFKA-13180
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 2.8.0
Reporter: Suriya Vijayaraghavan
Hi team, we are facing a weird issue. not sure if anyone else faced this same. But we are able to identify the flow.
Issue
Using RoundiRobin partitioner with even number of partitions n, resulting in always produce to only n/2 number of partitions
Is Reproducible: yes
Scenario: For a Kafka topic, we have 6 partitions (0,1,2,3,4,5). We are trying to produce to a topic with RoundRobin partitioner.
The RoundRobin partitioner is working based on the index of an ArrayList of partition info. For our case lest assume the order of the partitions is populated as below in the array list.
{1,2,3,4,5,0}
Expected flow: Even distribution to 6 partitions
How it worked: Data was produced only to partition 2,4,0.
Why:
On debugging further with the producer flow, we noticed below highlighted method in doSend method of KafkaProducer.
{quote}int partition = *partition*(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
.....
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, *true*, nowMs);
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = *partition*(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
.....
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, *false*, nowMs);
{quote}
here, in the accumulator.append, true is passed for abortOnNewBatch. The Deque that is derived in the RecordAccumulator.append method will always be empty on the first message too. Which will try to create a new batch.
And for the new batch, a new TopicPartition Object is being created, which will have partition 2. And in this flow, the abortOnNewBatch is passed as false, so the record will get added in the DeQueue for this topicpartition.
How ever this will get distributed properly if the total number of partitions are odd, as the first record is getting addition will only succed when the abordOnNewbatch is passed as false (lets say it as second invoke).
the order of the invoke will be as follows for an even number of odd number of partitions and even.
ODD: \{1,2,3,4,0}
Iteration set untill all partitions gets populated:
1 - 2
3 - 4
0 - 1
2 - 3
4 - 0
Dequeue populated partitions = \{2,4,1,3,0}
EVEN: \{1,2,3,4,5,0}
Iteration set untill all partitions gets populated:
1 - 2
3 - 4
5 - 0
1 - 2
3 - 4
5 - 0
1 - 2
3 - 4
5 - 0.........
Dequeue populated partitions = \{2,4,0}
will go on continuosly as all partitions will never be initated.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)