You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/04/05 17:12:40 UTC

[kafka] branch 3.0 updated: KAFKA-13794; Fix comparator of `inflightBatchesBySequence` in `TransactionManager` (#11991)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 151251ce51 KAFKA-13794; Fix comparator of `inflightBatchesBySequence` in `TransactionManager` (#11991)
151251ce51 is described below

commit 151251ce51e1836f28e83b510dfacb00decb10fe
Author: Xiaoyue Xue <28...@qq.com>
AuthorDate: Wed Apr 6 01:03:33 2022 +0800

    KAFKA-13794; Fix comparator of `inflightBatchesBySequence` in `TransactionManager` (#11991)
    
    Fixes a bug in the comparator used to sort producer inflight batches for a topic partition. This can cause batches in the map `inflightBatchesBySequence` to be removed incorrectly: i.e. one batch may be removed by another batch with the same sequence number. This leads to an `IllegalStateException` when the inflight request finally returns. This patch fixes the comparator to check equality of the `ProducerBatch` instances if the base sequences match.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../producer/internals/TransactionManager.java     | 10 +++-
 .../producer/internals/TransactionManagerTest.java | 62 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 2de31a03c5..8b3dacad23 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -184,16 +184,22 @@ public class TransactionManager {
         // responses which are due to the retention period elapsing, and those which are due to actual lost data.
         private long lastAckedOffset;
 
+        private static final Comparator<ProducerBatch> PRODUCER_BATCH_COMPARATOR = (b1, b2) -> {
+            if (b1.baseSequence() < b2.baseSequence()) return -1;
+            else if (b1.baseSequence() > b2.baseSequence()) return 1;
+            else return b1.equals(b2) ? 0 : 1;
+        };
+
         TopicPartitionEntry() {
             this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
             this.nextSequence = 0;
             this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
             this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
-            this.inflightBatchesBySequence = new TreeSet<>(Comparator.comparingInt(ProducerBatch::baseSequence));
+            this.inflightBatchesBySequence = new TreeSet<>(PRODUCER_BATCH_COMPARATOR);
         }
 
         void resetSequenceNumbers(Consumer<ProducerBatch> resetSequence) {
-            TreeSet<ProducerBatch> newInflights = new TreeSet<>(Comparator.comparingInt(ProducerBatch::baseSequence));
+            TreeSet<ProducerBatch> newInflights = new TreeSet<>(PRODUCER_BATCH_COMPARATOR);
             for (ProducerBatch inflightBatch : inflightBatchesBySequence) {
                 resetSequence.accept(inflightBatch);
                 newInflights.add(inflightBatch);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 6c1e2fdf1e..8d55721917 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -691,6 +691,68 @@ public class TransactionManagerTest {
         assertNull(transactionManager.nextBatchBySequence(tp0));
     }
 
+    @Test
+    public void testDuplicateSequenceAfterProducerReset() throws Exception {
+        initializeTransactionManager(Optional.empty());
+        initializeIdempotentProducerId(producerId, epoch);
+
+        Metrics metrics = new Metrics(time);
+        final int requestTimeout = 10000;
+        final int deliveryTimeout = 15000;
+
+        RecordAccumulator accumulator = new RecordAccumulator(logContext, 16 * 1024, CompressionType.NONE, 0, 0L,
+                deliveryTimeout, metrics, "", time, apiVersions, transactionManager,
+                new BufferPool(1024 * 1024, 16 * 1024, metrics, time, ""));
+
+        Sender sender = new Sender(logContext, this.client, this.metadata, accumulator, false,
+                MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, requestTimeout,
+                0, transactionManager, apiVersions);
+
+        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+
+        Future<RecordMetadata> responseFuture1 = accumulator.append(tp0, time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS,
+                null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
+        sender.runOnce();
+        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+
+        time.sleep(requestTimeout);
+        sender.runOnce();
+        assertEquals(0, client.inFlightRequestCount());
+        assertTrue(transactionManager.hasInflightBatches(tp0));
+        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+        sender.runOnce(); // retry
+        assertEquals(1, client.inFlightRequestCount());
+        assertTrue(transactionManager.hasInflightBatches(tp0));
+        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+
+        time.sleep(5000); // delivery time out
+        sender.runOnce();
+
+        // The retried request will remain inflight until the request timeout
+        // is reached even though the delivery timeout has expired and the
+        // future has completed exceptionally.
+        assertTrue(responseFuture1.isDone());
+        TestUtils.assertFutureThrows(responseFuture1, TimeoutException.class);
+        assertFalse(transactionManager.hasInFlightRequest());
+        assertEquals(1, client.inFlightRequestCount());
+
+        sender.runOnce(); // bump the epoch
+        assertEquals(epoch + 1, transactionManager.producerIdAndEpoch().epoch);
+        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+
+        Future<RecordMetadata> responseFuture2 = accumulator.append(tp0, time.milliseconds(), "2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS,
+                null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
+        sender.runOnce();
+        sender.runOnce();
+        assertEquals(0, transactionManager.firstInFlightSequence(tp0));
+        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+
+        time.sleep(5000); // request time out again
+        sender.runOnce();
+        assertTrue(transactionManager.hasInflightBatches(tp0)); // the latter batch failed and retried
+        assertFalse(responseFuture2.isDone());
+    }
+
     private ProducerBatch writeIdempotentBatchWithValue(TransactionManager manager,
                                                         TopicPartition tp,
                                                         String value) {