You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/02/03 00:05:35 UTC
[kafka] branch trunk updated: KAFKA-12841: Fix producer callback handling when partition is missing (#11689)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 319732d KAFKA-12841: Fix producer callback handling when partition is missing (#11689)
319732d is described below
commit 319732dbeb08aa7926175cbaf4dfa8a05c52ac18
Author: Philip Nee <pn...@uber.com>
AuthorDate: Wed Feb 2 16:03:32 2022 -0800
KAFKA-12841: Fix producer callback handling when partition is missing (#11689)
Sometimes, the Kafka producer encounters an error prior to selecting a topic partition. In this case, we
would like to acknowledge the failure in the producer interceptors, if any are configured. We should also
pass a non-null Metadata object to the producer callback, if there is one. This PR implements that
behavior. It also updates the JavaDoc to clarify that if a partition cannot be selected, we will pass
back a partition id of -1 in the metadata. This is in keeping with KAFKA-3303.
Co-authors: Kirk True <ki...@mustardgrain.com>
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../apache/kafka/clients/producer/Callback.java | 5 ++-
.../kafka/clients/producer/KafkaProducer.java | 13 ++++++-
.../producer/internals/ProducerInterceptors.java | 7 +++-
.../kafka/clients/producer/KafkaProducerTest.java | 43 ++++++++++++++++++++++
4 files changed, 62 insertions(+), 6 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
index ee0610e..236d04b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
@@ -25,10 +25,11 @@ public interface Callback {
/**
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
- * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
+ * metadata will contain the special -1 value for all fields. If topicPartition cannot be
+ * choosen, a -1 value will be assigned.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata
- * with -1 value for all fields except for topicPartition will be returned if an error occurred.
+ * with -1 value for all fields will be returned if an error occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
* Possible thrown exceptions include:
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index bedd627..7660967 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -993,8 +993,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
- if (callback != null)
- callback.onCompletion(null, e);
+ // producer callback will make sure to call both 'callback' and interceptor callback
+ if (tp == null) {
+ // set topicPartition to -1 when null
+ tp = ProducerInterceptors.extractTopicPartition(record);
+ }
+
+ Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
+
+ // The onCompletion callback does expect a non-null metadata, but one will be created inside
+ // the interceptor's onCompletion implementation before the user's callback is invoked.
+ interceptCallback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
if (transactionManager != null) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index ceec552..dd72409 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -110,8 +110,7 @@ public class ProducerInterceptors<K, V> implements Closeable {
interceptor.onAcknowledgement(null, exception);
} else {
if (interceptTopicPartition == null) {
- interceptTopicPartition = new TopicPartition(record.topic(),
- record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
+ interceptTopicPartition = extractTopicPartition(record);
}
interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
RecordBatch.NO_TIMESTAMP, -1, -1), exception);
@@ -123,6 +122,10 @@ public class ProducerInterceptors<K, V> implements Closeable {
}
}
+ public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K, V> record) {
+ return new TopicPartition(record.topic(), record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
+ }
+
/**
* Closes every interceptor in a container.
*/
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 76e920b..2d668f4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -51,6 +51,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
@@ -58,6 +59,7 @@ import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
@@ -1694,6 +1696,47 @@ public class KafkaProducerTest {
"key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)));
}
+ @Test
+ public void testCallbackHandlesError() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+ Time time = new MockTime();
+ ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+ MockClient client = new MockClient(time, producerMetadata);
+
+ String invalidTopicName = "topic abc"; // Invalid topic name due to space
+
+ try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
+ producerMetadata, client, null, time)) {
+ ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+ // Here's the important piece of the test. Let's make sure that the RecordMetadata we get
+ // is non-null and adheres to the onCompletion contract.
+ Callback callBack = (recordMetadata, exception) -> {
+ assertNotNull(exception);
+ assertNotNull(recordMetadata);
+
+ assertNotNull(recordMetadata.topic(), "Topic name should be valid even on send failure");
+ assertEquals(invalidTopicName, recordMetadata.topic());
+ assertNotNull(recordMetadata.partition(), "Partition should be valid even on send failure");
+
+ assertFalse(recordMetadata.hasOffset());
+ assertEquals(ProduceResponse.INVALID_OFFSET, recordMetadata.offset());
+
+ assertFalse(recordMetadata.hasTimestamp());
+ assertEquals(RecordBatch.NO_TIMESTAMP, recordMetadata.timestamp());
+
+ assertEquals(-1, recordMetadata.serializedKeySize());
+ assertEquals(-1, recordMetadata.serializedValueSize());
+ assertEquals(-1, recordMetadata.partition());
+ };
+
+ producer.send(record, callBack);
+ }
+ }
+
private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class SerializerForClientId implements Serializer<byte[]> {