You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/05 09:30:23 UTC

[GitHub] [kafka] sudeshwasnik commented on pull request #12462: [KAFKA-9965] Fix accumulator tryAppend, so that fresh new producerBatch is created

sudeshwasnik commented on PR #12462:
URL: https://github.com/apache/kafka/pull/12462#issuecomment-1236760679

   hey @artemlivshits , thanks for pointing out that if we just check if partition's producerBatches are empty -> we may get stuck selecting the same partition if producerBatches keep getting cleared up by sender during drain. 
   
   Here is a new approach - 
   1. Check if DQ was created in topicInfo.batches for this partition. 
   2. There are two cases when topicinfo.batches is empty 
       * this partition is encountered for the first time. 
       * this partition was recently cleared while either drain or batches-expiry cleanup. 
       In the first case, it is fine to skip this partition (but create an ArrayDeque for it in topicinfo.batches) as this is current behaviour. Next time this partition is checked for, we find DQ for this partition, and the append is not aborted. ie partition will not get skipped here-on. 
       In the second case, the partition will be skipped since partition entry from topicinfo.batches was removed. (but create an ArrayDeque for it in topicinfo.batches). After this, this partition won't get skipped and records will be added into it. 
   3. Always clear DQ when its entries are cleared up while drain or expiry. 
   
   example - 
   1. When producer is new. First partition is skipped. But this will call append again (with abortOnNewBatch false) -> thus onNewBatch will be called, new partition will get assigned and next partition won't get skipped. 
   2. When drain happens for a partition - immediate next append will get aborted. And append will be called again with abortOnNewBatch false, onNewBatch will be called and new partition is assigned. 
   
   
       
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org