You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2022/04/28 13:21:52 UTC

[kafka] branch 3.1 updated (f6395115a0 -> b068124aef)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a change to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


 discard f6395115a0 KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionsManager (round 3) (#12096)
     new b068124aef KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionsManager (round 3) (#12096)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f6395115a0)
            \
             N -- N -- N   refs/heads/3.1 (b068124aef)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kafka/clients/producer/internals/TransactionManager.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[kafka] 01/01: KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionsManager (round 3) (#12096)

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b068124aef261848c0bb0af0b903c798499015ef
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu Apr 28 06:13:23 2022 -0700

    KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionsManager (round 3) (#12096)
    
    Conceptually, the ordering is defined by the producer id, producer epoch
    and the sequence number. This set should generally only have entries
    for the same producer id and epoch, but there is one case where
    we can have conflicting `remove` calls and hence we add this as
    a temporary safe fix.
    
    We'll follow-up with a fix that ensures the original intended invariant.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, David Jacot
    <dj...@confluent.io>, Luke Chen <sh...@gmail.com>
---
 .../clients/producer/internals/TransactionManager.java      | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 7086e593a8..c032cabe98 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -184,11 +184,14 @@ public class TransactionManager {
         // responses which are due to the retention period elapsing, and those which are due to actual lost data.
         private long lastAckedOffset;
 
-        private static final Comparator<ProducerBatch> PRODUCER_BATCH_COMPARATOR = (b1, b2) -> {
-            if (b1.baseSequence() < b2.baseSequence()) return -1;
-            else if (b1.baseSequence() > b2.baseSequence()) return 1;
-            else return Integer.compare(b1.hashCode(), b2.hashCode());
-        };
+        // `inflightBatchesBySequence` should only have batches with the same producer id and producer
+        // epoch, but there is an edge case where we may remove the wrong batch if the comparator
+        // only takes `baseSequence` into account.
+        // See https://github.com/apache/kafka/pull/12096#pullrequestreview-955554191 for details.
+        private static final Comparator<ProducerBatch> PRODUCER_BATCH_COMPARATOR =
+            Comparator.comparingLong(ProducerBatch::producerId)
+                .thenComparingInt(ProducerBatch::producerEpoch)
+                .thenComparingInt(ProducerBatch::baseSequence);
 
         TopicPartitionEntry() {
             this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;