You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2022/04/25 22:59:53 UTC

[kafka] branch trunk updated: KAFKA-12841: Remove an additional call of onAcknowledgement (#12064)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b020819ac4 KAFKA-12841: Remove an additional call of onAcknowledgement (#12064)
b020819ac4 is described below

commit b020819ac415976bd09a8e5bfdfb43642837c64d
Author: Philip Nee <ph...@gmail.com>
AuthorDate: Mon Apr 25 15:59:45 2022 -0700

    KAFKA-12841: Remove an additional call of onAcknowledgement (#12064)
    
    The bug was introduced in #11689 that an additional onAcknowledgement was made using the InterceptorCallback class. This is undesirable since onSendError will attempt to call onAcknowledgement once more.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../java/org/apache/kafka/clients/producer/KafkaProducer.java |  9 ++++-----
 .../org/apache/kafka/clients/producer/KafkaProducerTest.java  | 11 +++++++++--
 .../java/org/apache/kafka/test/MockProducerInterceptor.java   |  2 ++
 3 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 96d9125831..f36db02a9d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1019,11 +1019,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 tp = ProducerInterceptors.extractTopicPartition(record);
             }
 
-            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
-
-            // The onCompletion callback does expect a non-null metadata, but one will be created inside
-            // the interceptor's onCompletion implementation before the user's callback is invoked.
-            interceptCallback.onCompletion(null, e);
+            if (callback != null) {
+                RecordMetadata nullMetadata = new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
+                callback.onCompletion(nullMetadata, e);
+            }
             this.errors.record();
             this.interceptors.onSendError(record, tp, e);
             if (transactionManager != null) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index bc91340a7b..ce01620803 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -1901,10 +1901,13 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void testCallbackHandlesError() throws Exception {
+    public void testCallbackAndInterceptorHandleError() throws Exception {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+        configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName());
+        configs.put(MockProducerInterceptor.APPEND_STRING_PROP, "something");
+
 
         Time time = new MockTime();
         ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
@@ -1912,8 +1915,11 @@ public class KafkaProducerTest {
 
         String invalidTopicName = "topic abc"; // Invalid topic name due to space
 
+        ProducerInterceptors<String, String> producerInterceptors =
+                new ProducerInterceptors<>(Arrays.asList(new MockProducerInterceptor()));
+
         try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
-                producerMetadata, client, null, time)) {
+                producerMetadata, client, producerInterceptors, time)) {
             ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
 
             // Here's the important piece of the test. Let's make sure that the RecordMetadata we get
@@ -1938,6 +1944,7 @@ public class KafkaProducerTest {
             };
 
             producer.send(record, callBack);
+            assertEquals(1, MockProducerInterceptor.ON_ACKNOWLEDGEMENT_COUNT.intValue());
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
index 133ff567d4..eedc3bdaec 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
@@ -35,6 +35,7 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce
     public static final AtomicInteger ON_SUCCESS_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ON_ERROR_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ON_ERROR_WITH_METADATA_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger ON_ACKNOWLEDGEMENT_COUNT = new AtomicInteger(0);
     public static final AtomicReference<ClusterResource> CLUSTER_META = new AtomicReference<>();
     public static final ClusterResource NO_CLUSTER_ID = new ClusterResource("no_cluster_id");
     public static final AtomicReference<ClusterResource> CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT = new AtomicReference<>(NO_CLUSTER_ID);
@@ -69,6 +70,7 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce
 
     @Override
     public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
+        ON_ACKNOWLEDGEMENT_COUNT.incrementAndGet();
         // This will ensure that we get the cluster metadata when onAcknowledgement is called for the first time
         // as subsequent compareAndSet operations will fail.
         CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.compareAndSet(NO_CLUSTER_ID, CLUSTER_META.get());