You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/01/26 10:53:10 UTC
[kafka] branch trunk updated: KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions (#11508)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 000ba03 KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions (#11508)
000ba03 is described below
commit 000ba031c3d1135cb0e1fe6438d1e464ff10b6b0
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed Jan 26 05:51:10 2022 -0500
KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions (#11508)
Reviewers: Mickael Maison <mi...@gmail.com>
---
.../kafka/clients/producer/KafkaProducer.java | 3 +
.../producer/internals/TransactionManager.java | 2 +-
.../kafka/clients/producer/KafkaProducerTest.java | 148 ++++++++++++++++++++-
3 files changed, 151 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ced0d7d..bedd627 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -997,6 +997,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
+ if (transactionManager != null) {
+ transactionManager.maybeTransitionToErrorState(e);
+ }
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 521d5da..be881a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -712,7 +712,7 @@ public class TransactionManager {
removeInFlightBatch(batch);
}
- private void maybeTransitionToErrorState(RuntimeException exception) {
+ public synchronized void maybeTransitionToErrorState(RuntimeException exception) {
if (exception instanceof ClusterAuthorizationException
|| exception instanceof TransactionalIdAuthorizationException
|| exception instanceof ProducerFencedException
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 96a3034..76e920b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -97,6 +98,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
@@ -549,7 +552,7 @@ public class KafkaProducerTest {
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
- // Create a record with a partition higher than the initial (outdated) partition range
+ // Create a record for a not-yet-created topic
ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, null, "value");
ProducerMetadata metadata = mock(ProducerMetadata.class);
@@ -1062,6 +1065,149 @@ public class KafkaProducerTest {
}
@Test
+ public void testCommitTransactionWithRecordTooLargeException() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1000);
+ Time time = new MockTime(1);
+ MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ ProducerMetadata metadata = mock(ProducerMetadata.class);
+ MockClient client = new MockClient(time, metadata);
+ client.updateMetadata(initialUpdateResponse);
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+ client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+ when(metadata.fetch()).thenReturn(onePartitionCluster);
+
+ String largeString = IntStream.range(0, 1000).mapToObj(i -> "*").collect(Collectors.joining());
+ ProducerRecord<String, String> largeRecord = new ProducerRecord<>(topic, "large string", largeString);
+
+ try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+ new StringSerializer(), metadata, client, null, time)) {
+ producer.initTransactions();
+
+ client.prepareResponse(endTxnResponse(Errors.NONE));
+ producer.beginTransaction();
+ TestUtils.assertFutureError(producer.send(largeRecord), RecordTooLargeException.class);
+ assertThrows(KafkaException.class, producer::commitTransaction);
+ }
+ }
+
+ @Test
+ public void testCommitTransactionWithMetadataTimeoutForMissingTopic() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
+ // Create a record for a not-yet-created topic
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
+ ProducerMetadata metadata = mock(ProducerMetadata.class);
+
+ MockTime mockTime = new MockTime();
+
+ MockClient client = new MockClient(mockTime, metadata);
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+ client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+ AtomicInteger invocationCount = new AtomicInteger(0);
+ when(metadata.fetch()).then(invocation -> {
+ invocationCount.incrementAndGet();
+ if (invocationCount.get() > 5) {
+ mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
+ }
+
+ return emptyCluster;
+ });
+
+ try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+ new StringSerializer(), metadata, client, null, mockTime)) {
+ producer.initTransactions();
+ producer.beginTransaction();
+
+ TestUtils.assertFutureError(producer.send(record), TimeoutException.class);
+ assertThrows(KafkaException.class, producer::commitTransaction);
+ }
+ }
+
+ @Test
+ public void testCommitTransactionWithMetadataTimeoutForPartitionOutOfRange() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
+ // Create a record with a partition higher than the initial (outdated) partition range
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, null, "value");
+ ProducerMetadata metadata = mock(ProducerMetadata.class);
+
+ MockTime mockTime = new MockTime();
+
+ MockClient client = new MockClient(mockTime, metadata);
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+ client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+ AtomicInteger invocationCount = new AtomicInteger(0);
+ when(metadata.fetch()).then(invocation -> {
+ invocationCount.incrementAndGet();
+ if (invocationCount.get() > 5) {
+ mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
+ }
+
+ return onePartitionCluster;
+ });
+
+ try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+ new StringSerializer(), metadata, client, null, mockTime)) {
+ producer.initTransactions();
+ producer.beginTransaction();
+
+ TestUtils.assertFutureError(producer.send(record), TimeoutException.class);
+ assertThrows(KafkaException.class, producer::commitTransaction);
+ }
+ }
+
+ @Test
+ public void testCommitTransactionWithSendToInvalidTopic() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
+
+ Time time = new MockTime();
+ MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, emptyMap());
+ ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+ metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+ client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+ String invalidTopicName = "topic abc"; // Invalid topic name due to space
+ ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+ List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+ topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
+ invalidTopicName, false, Collections.emptyList()));
+ MetadataResponse updateResponse = RequestTestUtils.metadataResponse(
+ new ArrayList<>(initialUpdateResponse.brokers()),
+ initialUpdateResponse.clusterId(),
+ initialUpdateResponse.controller().id(),
+ topicMetadata);
+ client.prepareMetadataUpdate(updateResponse);
+
+ try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+ new StringSerializer(), metadata, client, null, time)) {
+ producer.initTransactions();
+ producer.beginTransaction();
+
+ TestUtils.assertFutureError(producer.send(record), InvalidTopicException.class);
+ assertThrows(KafkaException.class, producer::commitTransaction);
+ }
+ }
+
+ @Test
public void testSendTxnOffsetsWithGroupId() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");