You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/04/05 17:29:34 UTC

[kafka] branch 3.2 updated: KAFKA-13782; Ensure correct partition added to txn after abort on full batch (#11995)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new df0385435e KAFKA-13782; Ensure correct partition added to txn after abort on full batch (#11995)
df0385435e is described below

commit df0385435e06faca646a8374d57c6a5b15810283
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Apr 5 09:48:21 2022 -0700

    KAFKA-13782; Ensure correct partition added to txn after abort on full batch (#11995)
    
    Fixes a regression introduced in https://github.com/apache/kafka/pull/11452. Following [KIP-480](https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner), the `Partitioner` will receive a callback when a batch has been completed so that it can choose another partition. Because of this, we have to wait until the batch has been successfully appended to the accumulator before adding the partition in `TransactionManager.maybeAddPartition`. This is still safe becaus [...]
    
    Reviewers: Artem Livshits <84...@users.noreply.github.com>, David Jacot <dj...@confluent.io>,  Tom Bentley <tb...@redhat.com>
---
 .../kafka/clients/producer/KafkaProducer.java      |  50 +++-
 .../producer/internals/RecordAccumulator.java      |   2 +-
 .../kafka/clients/producer/KafkaProducerTest.java  | 290 ++++++++++++++++++++-
 3 files changed, 333 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ef686de7ce..da749c2f12 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -440,6 +440,43 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
+    // visible for testing
+    KafkaProducer(ProducerConfig config,
+                  LogContext logContext,
+                  Metrics metrics,
+                  Serializer<K> keySerializer,
+                  Serializer<V> valueSerializer,
+                  ProducerMetadata metadata,
+                  RecordAccumulator accumulator,
+                  TransactionManager transactionManager,
+                  Sender sender,
+                  ProducerInterceptors<K, V> interceptors,
+                  Partitioner partitioner,
+                  Time time,
+                  KafkaThread ioThread) {
+        this.producerConfig = config;
+        this.time = time;
+        this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
+        this.log = logContext.logger(KafkaProducer.class);
+        this.metrics = metrics;
+        this.producerMetrics = new KafkaProducerMetrics(metrics);
+        this.partitioner = partitioner;
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+        this.interceptors = interceptors;
+        this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
+        this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
+        this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
+        this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
+        this.apiVersions = new ApiVersions();
+        this.transactionManager = transactionManager;
+        this.accumulator = accumulator;
+        this.errors = this.metrics.sensor("errors");
+        this.metadata = metadata;
+        this.sender = sender;
+        this.ioThread = ioThread;
+    }
+
     // visible for testing
     Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
         int maxInflightRequests = producerConfig.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
@@ -934,10 +971,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             // producer callback will make sure to call both 'callback' and interceptor callback
             Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
 
-            if (transactionManager != null) {
-                transactionManager.maybeAddPartition(tp);
-            }
-
             RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                     serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
 
@@ -956,6 +989,15 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
             }
 
+            // Add the partition to the transaction (if in progress) after it has been successfully
+            // appended to the accumulator. We cannot do it before because the initially selected
+            // partition may be changed when the batch is closed (as indicated by `abortForNewBatch`).
+            // Note that the `Sender` will refuse to dequeue batches from the accumulator until they
+            // have been added to the transaction.
+            if (transactionManager != null) {
+                transactionManager.maybeAddPartition(tp);
+            }
+
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index a47c9d38ad..7724237e45 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -63,7 +63,7 @@ import org.slf4j.Logger;
  * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
  * this behavior is explicitly disabled.
  */
-public final class RecordAccumulator {
+public class RecordAccumulator {
 
     private final Logger log;
     private volatile boolean closed;
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 81eb6e3f2d..bc91340a7b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -23,9 +23,13 @@ import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
+import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.clients.producer.internals.ProducerMetadata;
+import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
+import org.apache.kafka.clients.producer.internals.TransactionManager;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -51,6 +55,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
 import org.apache.kafka.common.requests.EndTxnResponse;
@@ -66,6 +71,7 @@ import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -74,7 +80,9 @@ import org.apache.kafka.test.MockPartitioner;
 import org.apache.kafka.test.MockProducerInterceptor;
 import org.apache.kafka.test.MockSerializer;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -123,6 +131,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -151,6 +160,8 @@ public class KafkaProducerTest {
                     new PartitionInfo(topic, 2, null, null, null)),
             Collections.emptySet(),
             Collections.emptySet());
+    private TestInfo testInfo;
+
     private static final int DEFAULT_METADATA_IDLE_MS = 5 * 60 * 1000;
     private static final Node NODE = new Node(0, "host1", 1000);
 
@@ -161,8 +172,13 @@ public class KafkaProducerTest {
                   KafkaClient kafkaClient,
                   ProducerInterceptors<K, V> interceptors,
                   Time time) {
-        return new KafkaProducer<>(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)),
-                keySerializer, valueSerializer, metadata, kafkaClient, interceptors, time);
+        return new KafkaProducer<K, V>(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)),
+            keySerializer, valueSerializer, metadata, kafkaClient, interceptors, time);
+    }
+
+    @BeforeEach
+    public void setup(TestInfo testInfo) {
+        this.testInfo = testInfo;
     }
 
     @Test
@@ -636,7 +652,7 @@ public class KafkaProducerTest {
 
     private static KafkaProducer<String, String> producerWithOverrideNewSender(Map<String, Object> configs,
                                                                                ProducerMetadata metadata,
-                                                                               Time timer) {
+                                                                               Time time) {
         // let mockClient#leastLoadedNode return the node directly so that we can isolate Metadata calls from KafkaProducer for idempotent producer
         MockClient mockClient = new MockClient(Time.SYSTEM, metadata) {
             @Override
@@ -647,7 +663,7 @@ public class KafkaProducerTest {
 
         return new KafkaProducer<String, String>(
                 new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, new StringSerializer(), new StringSerializer())),
-                new StringSerializer(), new StringSerializer(), metadata, mockClient, null, timer) {
+                new StringSerializer(), new StringSerializer(), metadata, mockClient, null, time) {
             @Override
             Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
                 // give Sender its own Metadata instance so that we can isolate Metadata calls from KafkaProducer
@@ -1944,6 +1960,180 @@ public class KafkaProducerTest {
         }
     }
 
+    @Test
+    public void testPartitionAddedToTransaction() throws Exception {
+        StringSerializer serializer = new StringSerializer();
+        KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
+
+        String topic = "foo";
+        TopicPartition topicPartition = new TopicPartition(topic, 0);
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+
+        when(ctx.sender.isRunning()).thenReturn(true);
+        when(ctx.metadata.fetch()).thenReturn(cluster);
+
+        long timestamp = ctx.time.milliseconds();
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, "key", "value");
+        FutureRecordMetadata future = expectAppend(ctx, record, topicPartition, cluster);
+
+        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
+            assertEquals(future, producer.send(record));
+            assertFalse(future.isDone());
+            verify(ctx.transactionManager).maybeAddPartition(topicPartition);
+        }
+    }
+
+    @Test
+    public void testPartitionAddedToTransactionAfterFullBatchRetry() throws Exception {
+        StringSerializer serializer = new StringSerializer();
+        KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
+
+        String topic = "foo";
+        TopicPartition topicPartition0 = new TopicPartition(topic, 0);
+        TopicPartition topicPartition1 = new TopicPartition(topic, 1);
+        Cluster cluster = TestUtils.singletonCluster(topic, 2);
+
+        when(ctx.sender.isRunning()).thenReturn(true);
+        when(ctx.metadata.fetch()).thenReturn(cluster);
+
+        long timestamp = ctx.time.milliseconds();
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, "key", "value");
+
+        FutureRecordMetadata future = expectAppendWithAbortForNewBatch(
+            ctx,
+            record,
+            topicPartition0,
+            topicPartition1,
+            cluster
+        );
+
+        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
+            assertEquals(future, producer.send(record));
+            assertFalse(future.isDone());
+            verify(ctx.partitioner).onNewBatch(topic, cluster, 0);
+            verify(ctx.transactionManager, never()).maybeAddPartition(topicPartition0);
+            verify(ctx.transactionManager).maybeAddPartition(topicPartition1);
+        }
+    }
+
+    private <T> FutureRecordMetadata expectAppend(
+        KafkaProducerTestContext<T> ctx,
+        ProducerRecord<T, T> record,
+        TopicPartition initialSelectedPartition,
+        Cluster cluster
+    ) throws InterruptedException {
+        byte[] serializedKey = ctx.serializer.serialize(topic, record.key());
+        byte[] serializedValue = ctx.serializer.serialize(topic, record.value());
+        long timestamp = record.timestamp() == null ? ctx.time.milliseconds() : record.timestamp();
+
+        ProduceRequestResult requestResult = new ProduceRequestResult(initialSelectedPartition);
+        FutureRecordMetadata futureRecordMetadata = new FutureRecordMetadata(
+            requestResult,
+            5,
+            timestamp,
+            serializedKey.length,
+            serializedValue.length,
+            ctx.time
+        );
+
+        when(ctx.partitioner.partition(
+            initialSelectedPartition.topic(),
+            record.key(),
+            serializedKey,
+            record.value(),
+            serializedValue,
+            cluster
+        )).thenReturn(initialSelectedPartition.partition());
+
+        when(ctx.accumulator.append(
+            eq(initialSelectedPartition),
+            eq(timestamp),
+            eq(serializedKey),
+            eq(serializedValue),
+            eq(Record.EMPTY_HEADERS),
+            any(Callback.class),
+            anyLong(),
+            eq(true),
+            anyLong()
+        )).thenReturn(new RecordAccumulator.RecordAppendResult(
+            futureRecordMetadata,
+            false,
+            false,
+            false
+        ));
+
+        return futureRecordMetadata;
+    }
+
+    private <T> FutureRecordMetadata expectAppendWithAbortForNewBatch(
+        KafkaProducerTestContext<T> ctx,
+        ProducerRecord<T, T> record,
+        TopicPartition initialSelectedPartition,
+        TopicPartition retrySelectedPartition,
+        Cluster cluster
+    ) throws InterruptedException {
+        byte[] serializedKey = ctx.serializer.serialize(topic, record.key());
+        byte[] serializedValue = ctx.serializer.serialize(topic, record.value());
+        long timestamp = record.timestamp() == null ? ctx.time.milliseconds() : record.timestamp();
+
+        ProduceRequestResult requestResult = new ProduceRequestResult(retrySelectedPartition);
+        FutureRecordMetadata futureRecordMetadata = new FutureRecordMetadata(
+            requestResult,
+            0,
+            timestamp,
+            serializedKey.length,
+            serializedValue.length,
+            ctx.time
+        );
+
+        when(ctx.partitioner.partition(
+            initialSelectedPartition.topic(),
+            record.key(),
+            serializedKey,
+            record.value(),
+            serializedValue,
+            cluster
+        )).thenReturn(initialSelectedPartition.partition())
+          .thenReturn(retrySelectedPartition.partition());
+
+        when(ctx.accumulator.append(
+            eq(initialSelectedPartition),
+            eq(timestamp),
+            eq(serializedKey),
+            eq(serializedValue),
+            eq(Record.EMPTY_HEADERS),
+            any(Callback.class),
+            anyLong(),
+            eq(true), // abortOnNewBatch
+            anyLong()
+        )).thenReturn(new RecordAccumulator.RecordAppendResult(
+            null,
+            false,
+            false,
+            true
+        ));
+
+        when(ctx.accumulator.append(
+            eq(retrySelectedPartition),
+            eq(timestamp),
+            eq(serializedKey),
+            eq(serializedValue),
+            eq(Record.EMPTY_HEADERS),
+            any(Callback.class),
+            anyLong(),
+            eq(false), // abortOnNewBatch
+            anyLong()
+        )).thenReturn(new RecordAccumulator.RecordAppendResult(
+            futureRecordMetadata,
+            false,
+            true,
+            false
+        ));
+
+        return futureRecordMetadata;
+    }
+
+
     private static final List<String> CLIENT_IDS = new ArrayList<>();
 
     public static class SerializerForClientId implements Serializer<byte[]> {
@@ -2012,4 +2202,96 @@ public class KafkaProducerTest {
         public void configure(Map<String, ?> configs) {
         }
     }
+
+    private static class KafkaProducerTestContext<T> {
+        private final TestInfo testInfo;
+        private final Map<String, Object> configs;
+        private final Serializer<T> serializer;
+        private ProducerMetadata metadata = mock(ProducerMetadata.class);
+        private RecordAccumulator accumulator = mock(RecordAccumulator.class);
+        private Sender sender = mock(Sender.class);
+        private TransactionManager transactionManager = mock(TransactionManager.class);
+        private Partitioner partitioner = mock(Partitioner.class);
+        private KafkaThread ioThread = mock(KafkaThread.class);
+        private Time time = new MockTime();
+        private Metrics metrics = new Metrics(time);
+        private List<ProducerInterceptor<T, T>> interceptors = new ArrayList<>();
+
+        public KafkaProducerTestContext(
+            TestInfo testInfo,
+            Serializer<T> serializer
+        ) {
+            this(testInfo, new HashMap<>(), serializer);
+        }
+
+        public KafkaProducerTestContext(
+            TestInfo testInfo,
+            Map<String, Object> configs,
+            Serializer<T> serializer
+        ) {
+            this.testInfo = testInfo;
+            this.configs = configs;
+            this.serializer = serializer;
+
+            if (!configs.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+                configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+            }
+        }
+
+        public KafkaProducerTestContext<T> setProducerMetadata(ProducerMetadata metadata) {
+            this.metadata = metadata;
+            return this;
+        }
+
+        public KafkaProducerTestContext<T> setAccumulator(RecordAccumulator accumulator) {
+            this.accumulator = accumulator;
+            return this;
+        }
+
+        public KafkaProducerTestContext<T> setSender(Sender sender) {
+            this.sender = sender;
+            return this;
+        }
+
+        public KafkaProducerTestContext<T> setTransactionManager(TransactionManager transactionManager) {
+            this.transactionManager = transactionManager;
+            return this;
+        }
+
+        public KafkaProducerTestContext<T> addInterceptor(ProducerInterceptor<T, T> interceptor) {
+            this.interceptors.add(interceptor);
+            return this;
+        }
+
+        public KafkaProducerTestContext<T> setTime(Time time) {
+            this.time = time;
+            return this;
+        }
+
+        public KafkaProducer<T, T> newKafkaProducer() {
+            LogContext logContext = new LogContext("[Producer test=" + testInfo.getDisplayName() + "] ");
+
+            ProducerConfig producerConfig = new ProducerConfig(
+                ProducerConfig.appendSerializerToConfig(configs, serializer, serializer));
+
+            ProducerInterceptors<T, T> interceptors = new ProducerInterceptors<>(this.interceptors);
+
+            return new KafkaProducer<>(
+                producerConfig,
+                logContext,
+                metrics,
+                serializer,
+                serializer,
+                metadata,
+                accumulator,
+                transactionManager,
+                sender,
+                interceptors,
+                partitioner,
+                time,
+                ioThread
+            );
+        }
+    }
+
 }