You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/04/05 17:12:40 UTC
[kafka] branch 3.0 updated: KAFKA-13794; Fix comparator of `inflightBatchesBySequence` in `TransactionManager` (#11991)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 151251ce51 KAFKA-13794; Fix comparator of `inflightBatchesBySequence` in `TransactionManager` (#11991)
151251ce51 is described below
commit 151251ce51e1836f28e83b510dfacb00decb10fe
Author: Xiaoyue Xue <28...@qq.com>
AuthorDate: Wed Apr 6 01:03:33 2022 +0800
KAFKA-13794; Fix comparator of `inflightBatchesBySequence` in `TransactionManager` (#11991)
Fixes a bug in the comparator used to sort producer inflight batches for a topic partition. This can cause batches in the map `inflightBatchesBySequence` to be removed incorrectly: i.e. one batch may be removed by another batch with the same sequence number. This leads to an `IllegalStateException` when the inflight request finally returns. This patch fixes the comparator to check equality of the `ProducerBatch` instances if the base sequences match.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../producer/internals/TransactionManager.java | 10 +++-
.../producer/internals/TransactionManagerTest.java | 62 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 2de31a03c5..8b3dacad23 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -184,16 +184,22 @@ public class TransactionManager {
// responses which are due to the retention period elapsing, and those which are due to actual lost data.
private long lastAckedOffset;
+ private static final Comparator<ProducerBatch> PRODUCER_BATCH_COMPARATOR = (b1, b2) -> {
+ if (b1.baseSequence() < b2.baseSequence()) return -1;
+ else if (b1.baseSequence() > b2.baseSequence()) return 1;
+ else return b1.equals(b2) ? 0 : 1;
+ };
+
TopicPartitionEntry() {
this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
this.nextSequence = 0;
this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
- this.inflightBatchesBySequence = new TreeSet<>(Comparator.comparingInt(ProducerBatch::baseSequence));
+ this.inflightBatchesBySequence = new TreeSet<>(PRODUCER_BATCH_COMPARATOR);
}
void resetSequenceNumbers(Consumer<ProducerBatch> resetSequence) {
- TreeSet<ProducerBatch> newInflights = new TreeSet<>(Comparator.comparingInt(ProducerBatch::baseSequence));
+ TreeSet<ProducerBatch> newInflights = new TreeSet<>(PRODUCER_BATCH_COMPARATOR);
for (ProducerBatch inflightBatch : inflightBatchesBySequence) {
resetSequence.accept(inflightBatch);
newInflights.add(inflightBatch);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 6c1e2fdf1e..8d55721917 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -691,6 +691,68 @@ public class TransactionManagerTest {
assertNull(transactionManager.nextBatchBySequence(tp0));
}
+ @Test
+ public void testDuplicateSequenceAfterProducerReset() throws Exception {
+ initializeTransactionManager(Optional.empty());
+ initializeIdempotentProducerId(producerId, epoch);
+
+ Metrics metrics = new Metrics(time);
+ final int requestTimeout = 10000;
+ final int deliveryTimeout = 15000;
+
+ RecordAccumulator accumulator = new RecordAccumulator(logContext, 16 * 1024, CompressionType.NONE, 0, 0L,
+ deliveryTimeout, metrics, "", time, apiVersions, transactionManager,
+ new BufferPool(1024 * 1024, 16 * 1024, metrics, time, ""));
+
+ Sender sender = new Sender(logContext, this.client, this.metadata, accumulator, false,
+ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, requestTimeout,
+ 0, transactionManager, apiVersions);
+
+ assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+
+ Future<RecordMetadata> responseFuture1 = accumulator.append(tp0, time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS,
+ null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
+ sender.runOnce();
+ assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+
+ time.sleep(requestTimeout);
+ sender.runOnce();
+ assertEquals(0, client.inFlightRequestCount());
+ assertTrue(transactionManager.hasInflightBatches(tp0));
+ assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+ sender.runOnce(); // retry
+ assertEquals(1, client.inFlightRequestCount());
+ assertTrue(transactionManager.hasInflightBatches(tp0));
+ assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+
+ time.sleep(5000); // delivery time out
+ sender.runOnce();
+
+ // The retried request will remain inflight until the request timeout
+ // is reached even though the delivery timeout has expired and the
+ // future has completed exceptionally.
+ assertTrue(responseFuture1.isDone());
+ TestUtils.assertFutureThrows(responseFuture1, TimeoutException.class);
+ assertFalse(transactionManager.hasInFlightRequest());
+ assertEquals(1, client.inFlightRequestCount());
+
+ sender.runOnce(); // bump the epoch
+ assertEquals(epoch + 1, transactionManager.producerIdAndEpoch().epoch);
+ assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+
+ Future<RecordMetadata> responseFuture2 = accumulator.append(tp0, time.milliseconds(), "2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS,
+ null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
+ sender.runOnce();
+ sender.runOnce();
+ assertEquals(0, transactionManager.firstInFlightSequence(tp0));
+ assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+
+ time.sleep(5000); // request time out again
+ sender.runOnce();
+ assertTrue(transactionManager.hasInflightBatches(tp0)); // the latter batch failed and retried
+ assertFalse(responseFuture2.isDone());
+ }
+
private ProducerBatch writeIdempotentBatchWithValue(TransactionManager manager,
TopicPartition tp,
String value) {