You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Suriya Vijayaraghavan (Jira)" <ji...@apache.org> on 2021/08/09 13:45:00 UTC

[jira] [Created] (KAFKA-13180) Data Distribution among partitions not working as Expected

Suriya Vijayaraghavan created KAFKA-13180:
---------------------------------------------

             Summary: Data Distribution among partitions not working as Expected
                 Key: KAFKA-13180
                 URL: https://issues.apache.org/jira/browse/KAFKA-13180
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 2.8.0
            Reporter: Suriya Vijayaraghavan


Hi team, we are facing a weird issue. not sure if anyone else faced this same. But we are able to identify the flow.

Issue
 Using RoundiRobin partitioner with even number of partitions n, resulting in always produce to only n/2 number of partitions

Is Reproducible: yes

Scenario: For a Kafka topic, we have 6 partitions (0,1,2,3,4,5). We are trying to produce to a topic with RoundRobin partitioner.

The RoundRobin partitioner is working based on the index of an ArrayList of partition info. For our case lest assume the order of the partitions is populated as below in the array list.

{1,2,3,4,5,0}

Expected flow: Even distribution to 6 partitions

How it worked: Data was produced only to partition 2,4,0.

Why:
 On debugging further with the producer flow, we noticed below highlighted method in doSend method of KafkaProducer.
{quote}int partition = *partition*(record, serializedKey, serializedValue, cluster);
 tp = new TopicPartition(record.topic(), partition);
 .....
 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
 serializedValue, headers, interceptCallback, remainingWaitMs, *true*, nowMs);
 if (result.abortForNewBatch) {
 int prevPartition = partition;
 partitioner.onNewBatch(record.topic(), cluster, prevPartition);
 partition = *partition*(record, serializedKey, serializedValue, cluster);
 tp = new TopicPartition(record.topic(), partition);
 .....
 result = accumulator.append(tp, timestamp, serializedKey,
 serializedValue, headers, interceptCallback, remainingWaitMs, *false*, nowMs);
{quote}
here, in the accumulator.append, true is passed for abortOnNewBatch. The Deque that is derived in the RecordAccumulator.append method will always be empty on the first message too. Which will try to create a new batch.

And for the new batch, a new TopicPartition Object is being created, which will have partition 2. And in this flow, the abortOnNewBatch is passed as false, so the record will get added in the DeQueue for this topicpartition.

How ever this will get distributed properly if the total number of partitions are odd, as the first record is getting addition will only succed when the abordOnNewbatch is passed as false (lets say it as second invoke).

the order of the invoke will be as follows for an even number of odd number of partitions and even.

ODD: \{1,2,3,4,0}
 Iteration set untill all partitions gets populated: 
 1 - 2
 3 - 4
 0 - 1
 2 - 3
 4 - 0

Dequeue populated partitions = \{2,4,1,3,0}

EVEN: \{1,2,3,4,5,0}

Iteration set untill all partitions gets populated: 
 1 - 2
 3 - 4
 5 - 0
 1 - 2
 3 - 4
 5 - 0
 1 - 2
 3 - 4
5 - 0.........

Dequeue populated partitions = \{2,4,0}

will go on continuosly as all partitions will never be initated. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)