You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/06 22:57:31 UTC

[GitHub] [kafka] junrao commented on a diff in pull request #12462: [KAFKA-9965] Fix accumulator tryAppend, so that fresh new producerBatch is created

junrao commented on code in PR #12462:
URL: https://github.com/apache/kafka/pull/12462#discussion_r1063864650


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -287,12 +291,16 @@ public RecordAppendResult append(String topic,
                     }
                 }
 
-                // we don't have an in-progress record batch try to allocate a new batch
-                if (abortOnNewBatch) {
-                    // Return a result that will cause another call to append.
+                // noDqForPartition is true either when 1. partition was encountered for first time so no Deque existed previously.
+                // 2. DQ was removed due to - all batches were cleared due to expiration or sender cleared batches after draining.
+                // if so, abort and look to call partitioner -> onNewBatch and select other partition.
+                // This prevents a single partition getting re-selected after recent drain.
+                if (abortOnNewBatch && noDqForPartition) {

Review Comment:
   I am not sure if this completely addresses the issue. For example, if the producer adds records relatively slowly, it's possible that by the time the RoundRobinPartitioner adds the record to the same partition again, the batches for that partition have already been drained.
   
   Here is another possibility. We could change the computation for the following code in KafkaProducer. If a partitioner is defined, but doesn't override the default implementation of `onNewBatch`(which is the case for RoundRobinPartitioner), we could  also set abortOnNewBatch to false. We could use reflection to get the method and do the `isDefault` test.
   
   ```
               // A custom partitioner may take advantage on the onNewBatch callback.
               boolean abortOnNewBatch = partitioner != null;
   
   ```
   
   What do you think @jolshan ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org