You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/01/26 10:53:10 UTC

[kafka] branch trunk updated: KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions (#11508)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 000ba03  KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions (#11508)
000ba03 is described below

commit 000ba031c3d1135cb0e1fe6438d1e464ff10b6b0
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed Jan 26 05:51:10 2022 -0500

    KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions (#11508)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 .../kafka/clients/producer/KafkaProducer.java      |   3 +
 .../producer/internals/TransactionManager.java     |   2 +-
 .../kafka/clients/producer/KafkaProducerTest.java  | 148 ++++++++++++++++++++-
 3 files changed, 151 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ced0d7d..bedd627 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -997,6 +997,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 callback.onCompletion(null, e);
             this.errors.record();
             this.interceptors.onSendError(record, tp, e);
+            if (transactionManager != null) {
+                transactionManager.maybeTransitionToErrorState(e);
+            }
             return new FutureFailure(e);
         } catch (InterruptedException e) {
             this.errors.record();
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 521d5da..be881a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -712,7 +712,7 @@ public class TransactionManager {
         removeInFlightBatch(batch);
     }
 
-    private void maybeTransitionToErrorState(RuntimeException exception) {
+    public synchronized void maybeTransitionToErrorState(RuntimeException exception) {
         if (exception instanceof ClusterAuthorizationException
                 || exception instanceof TransactionalIdAuthorizationException
                 || exception instanceof ProducerFencedException
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 96a3034..76e920b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -97,6 +98,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static java.util.Collections.emptyMap;
@@ -549,7 +552,7 @@ public class KafkaProducerTest {
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
 
-        // Create a record with a partition higher than the initial (outdated) partition range
+        // Create a record for a not-yet-created topic
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, null, "value");
         ProducerMetadata metadata = mock(ProducerMetadata.class);
 
@@ -1062,6 +1065,149 @@ public class KafkaProducerTest {
     }
 
     @Test
+    public void testCommitTransactionWithRecordTooLargeException() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1000);
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = mock(ProducerMetadata.class);
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        when(metadata.fetch()).thenReturn(onePartitionCluster);
+
+        String largeString = IntStream.range(0, 1000).mapToObj(i -> "*").collect(Collectors.joining());
+        ProducerRecord<String, String> largeRecord = new ProducerRecord<>(topic, "large string", largeString);
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions();
+
+            client.prepareResponse(endTxnResponse(Errors.NONE));
+            producer.beginTransaction();
+            TestUtils.assertFutureError(producer.send(largeRecord), RecordTooLargeException.class);
+            assertThrows(KafkaException.class, producer::commitTransaction);
+        }
+    }
+
+    @Test
+    public void testCommitTransactionWithMetadataTimeoutForMissingTopic() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
+        // Create a record for a not-yet-created topic
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
+        ProducerMetadata metadata = mock(ProducerMetadata.class);
+
+        MockTime mockTime = new MockTime();
+
+        MockClient client = new MockClient(mockTime, metadata);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        AtomicInteger invocationCount = new AtomicInteger(0);
+        when(metadata.fetch()).then(invocation -> {
+            invocationCount.incrementAndGet();
+            if (invocationCount.get() > 5) {
+                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
+            }
+
+            return emptyCluster;
+        });
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, mockTime)) {
+            producer.initTransactions();
+            producer.beginTransaction();
+
+            TestUtils.assertFutureError(producer.send(record), TimeoutException.class);
+            assertThrows(KafkaException.class, producer::commitTransaction);
+        }
+    }
+
+    @Test
+    public void testCommitTransactionWithMetadataTimeoutForPartitionOutOfRange() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
+        // Create a record with a partition higher than the initial (outdated) partition range
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, null, "value");
+        ProducerMetadata metadata = mock(ProducerMetadata.class);
+
+        MockTime mockTime = new MockTime();
+
+        MockClient client = new MockClient(mockTime, metadata);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        AtomicInteger invocationCount = new AtomicInteger(0);
+        when(metadata.fetch()).then(invocation -> {
+            invocationCount.incrementAndGet();
+            if (invocationCount.get() > 5) {
+                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
+            }
+
+            return onePartitionCluster;
+        });
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, mockTime)) {
+            producer.initTransactions();
+            producer.beginTransaction();
+
+            TestUtils.assertFutureError(producer.send(record), TimeoutException.class);
+            assertThrows(KafkaException.class, producer::commitTransaction);
+        }
+    }
+
+    @Test
+    public void testCommitTransactionWithSendToInvalidTopic() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
+
+        Time time = new MockTime();
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, emptyMap());
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        String invalidTopicName = "topic abc"; // Invalid topic name due to space
+        ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+        List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+        topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
+                invalidTopicName, false, Collections.emptyList()));
+        MetadataResponse updateResponse =  RequestTestUtils.metadataResponse(
+                new ArrayList<>(initialUpdateResponse.brokers()),
+                initialUpdateResponse.clusterId(),
+                initialUpdateResponse.controller().id(),
+                topicMetadata);
+        client.prepareMetadataUpdate(updateResponse);
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions();
+            producer.beginTransaction();
+
+            TestUtils.assertFutureError(producer.send(record), InvalidTopicException.class);
+            assertThrows(KafkaException.class, producer::commitTransaction);
+        }
+    }
+
+    @Test
     public void testSendTxnOffsetsWithGroupId() {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");