You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/02/03 00:05:35 UTC

[kafka] branch trunk updated: KAFKA-12841: Fix producer callback handling when partition is missing (#11689)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 319732d  KAFKA-12841: Fix producer callback handling when partition is missing (#11689)
319732d is described below

commit 319732dbeb08aa7926175cbaf4dfa8a05c52ac18
Author: Philip Nee <pn...@uber.com>
AuthorDate: Wed Feb 2 16:03:32 2022 -0800

    KAFKA-12841: Fix producer callback handling when partition is missing (#11689)
    
    Sometimes, the Kafka producer encounters an error prior to selecting a topic partition. In this case, we
    would like to acknowledge the failure in the producer interceptors, if any are configured. We should also
    pass a non-null Metadata object to the producer callback, if there is one. This PR implements that
    behavior. It also updates the JavaDoc to clarify that if a partition cannot be selected, we will pass
    back a partition id of -1 in the metadata. This is in keeping with KAFKA-3303.
    
    Co-authors: Kirk True <ki...@mustardgrain.com>
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../apache/kafka/clients/producer/Callback.java    |  5 ++-
 .../kafka/clients/producer/KafkaProducer.java      | 13 ++++++-
 .../producer/internals/ProducerInterceptors.java   |  7 +++-
 .../kafka/clients/producer/KafkaProducerTest.java  | 43 ++++++++++++++++++++++
 4 files changed, 62 insertions(+), 6 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
index ee0610e..236d04b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
@@ -25,10 +25,11 @@ public interface Callback {
     /**
      * A callback method the user can implement to provide asynchronous handling of request completion. This method will
      * be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
-     * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
+     * metadata will contain the special -1 value for all fields. If topicPartition cannot be
+     * choosen, a -1 value will be assigned.
      *
      * @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata
-     *                 with -1 value for all fields except for topicPartition will be returned if an error occurred.
+     *                 with -1 value for all fields will be returned if an error occurred.
      * @param exception The exception thrown during processing of this record. Null if no error occurred.
      *                  Possible thrown exceptions include:
      *
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index bedd627..7660967 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -993,8 +993,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             // for other exceptions throw directly
         } catch (ApiException e) {
             log.debug("Exception occurred during message send:", e);
-            if (callback != null)
-                callback.onCompletion(null, e);
+            // producer callback will make sure to call both 'callback' and interceptor callback
+            if (tp == null) {
+                // set topicPartition to -1 when null
+                tp = ProducerInterceptors.extractTopicPartition(record);
+            }
+
+            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
+
+            // The onCompletion callback does expect a non-null metadata, but one will be created inside
+            // the interceptor's onCompletion implementation before the user's callback is invoked.
+            interceptCallback.onCompletion(null, e);
             this.errors.record();
             this.interceptors.onSendError(record, tp, e);
             if (transactionManager != null) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index ceec552..dd72409 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -110,8 +110,7 @@ public class ProducerInterceptors<K, V> implements Closeable {
                     interceptor.onAcknowledgement(null, exception);
                 } else {
                     if (interceptTopicPartition == null) {
-                        interceptTopicPartition = new TopicPartition(record.topic(),
-                                record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
+                        interceptTopicPartition = extractTopicPartition(record);
                     }
                     interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
                                     RecordBatch.NO_TIMESTAMP, -1, -1), exception);
@@ -123,6 +122,10 @@ public class ProducerInterceptors<K, V> implements Closeable {
         }
     }
 
+    public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K, V> record) {
+        return new TopicPartition(record.topic(), record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
+    }
+
     /**
      * Closes every interceptor in a container.
      */
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 76e920b..2d668f4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -51,6 +51,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
 import org.apache.kafka.common.requests.EndTxnResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
@@ -58,6 +59,7 @@ import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
@@ -1694,6 +1696,47 @@ public class KafkaProducerTest {
             "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)));
     }
 
+    @Test
+    public void testCallbackHandlesError() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+        Time time = new MockTime();
+        ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, producerMetadata);
+
+        String invalidTopicName = "topic abc"; // Invalid topic name due to space
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
+                producerMetadata, client, null, time)) {
+            ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+            // Here's the important piece of the test. Let's make sure that the RecordMetadata we get
+            // is non-null and adheres to the onCompletion contract.
+            Callback callBack = (recordMetadata, exception) -> {
+                assertNotNull(exception);
+                assertNotNull(recordMetadata);
+
+                assertNotNull(recordMetadata.topic(), "Topic name should be valid even on send failure");
+                assertEquals(invalidTopicName, recordMetadata.topic());
+                assertNotNull(recordMetadata.partition(), "Partition should be valid even on send failure");
+
+                assertFalse(recordMetadata.hasOffset());
+                assertEquals(ProduceResponse.INVALID_OFFSET, recordMetadata.offset());
+
+                assertFalse(recordMetadata.hasTimestamp());
+                assertEquals(RecordBatch.NO_TIMESTAMP, recordMetadata.timestamp());
+
+                assertEquals(-1, recordMetadata.serializedKeySize());
+                assertEquals(-1, recordMetadata.serializedValueSize());
+                assertEquals(-1, recordMetadata.partition());
+            };
+
+            producer.send(record, callBack);
+        }
+    }
+
     private static final List<String> CLIENT_IDS = new ArrayList<>();
 
     public static class SerializerForClientId implements Serializer<byte[]> {