You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2022/04/25 22:59:53 UTC
[kafka] branch trunk updated: KAFKA-12841: Remove an additional call of onAcknowledgement (#12064)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b020819ac4 KAFKA-12841: Remove an additional call of onAcknowledgement (#12064)
b020819ac4 is described below
commit b020819ac415976bd09a8e5bfdfb43642837c64d
Author: Philip Nee <ph...@gmail.com>
AuthorDate: Mon Apr 25 15:59:45 2022 -0700
KAFKA-12841: Remove an additional call of onAcknowledgement (#12064)
The bug was introduced in #11689 that an additional onAcknowledgement was made using the InterceptorCallback class. This is undesirable since onSendError will attempt to call onAcknowledgement once more.
Reviewers: Jun Rao <ju...@gmail.com>
---
.../java/org/apache/kafka/clients/producer/KafkaProducer.java | 9 ++++-----
.../org/apache/kafka/clients/producer/KafkaProducerTest.java | 11 +++++++++--
.../java/org/apache/kafka/test/MockProducerInterceptor.java | 2 ++
3 files changed, 15 insertions(+), 7 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 96d9125831..f36db02a9d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1019,11 +1019,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
tp = ProducerInterceptors.extractTopicPartition(record);
}
- Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
-
- // The onCompletion callback does expect a non-null metadata, but one will be created inside
- // the interceptor's onCompletion implementation before the user's callback is invoked.
- interceptCallback.onCompletion(null, e);
+ if (callback != null) {
+ RecordMetadata nullMetadata = new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
+ callback.onCompletion(nullMetadata, e);
+ }
this.errors.record();
this.interceptors.onSendError(record, tp, e);
if (transactionManager != null) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index bc91340a7b..ce01620803 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -1901,10 +1901,13 @@ public class KafkaProducerTest {
}
@Test
- public void testCallbackHandlesError() throws Exception {
+ public void testCallbackAndInterceptorHandleError() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+ configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName());
+ configs.put(MockProducerInterceptor.APPEND_STRING_PROP, "something");
+
Time time = new MockTime();
ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
@@ -1912,8 +1915,11 @@ public class KafkaProducerTest {
String invalidTopicName = "topic abc"; // Invalid topic name due to space
+ ProducerInterceptors<String, String> producerInterceptors =
+ new ProducerInterceptors<>(Arrays.asList(new MockProducerInterceptor()));
+
try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
- producerMetadata, client, null, time)) {
+ producerMetadata, client, producerInterceptors, time)) {
ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
// Here's the important piece of the test. Let's make sure that the RecordMetadata we get
@@ -1938,6 +1944,7 @@ public class KafkaProducerTest {
};
producer.send(record, callBack);
+ assertEquals(1, MockProducerInterceptor.ON_ACKNOWLEDGEMENT_COUNT.intValue());
}
}
diff --git a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
index 133ff567d4..eedc3bdaec 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
@@ -35,6 +35,7 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce
public static final AtomicInteger ON_SUCCESS_COUNT = new AtomicInteger(0);
public static final AtomicInteger ON_ERROR_COUNT = new AtomicInteger(0);
public static final AtomicInteger ON_ERROR_WITH_METADATA_COUNT = new AtomicInteger(0);
+ public static final AtomicInteger ON_ACKNOWLEDGEMENT_COUNT = new AtomicInteger(0);
public static final AtomicReference<ClusterResource> CLUSTER_META = new AtomicReference<>();
public static final ClusterResource NO_CLUSTER_ID = new ClusterResource("no_cluster_id");
public static final AtomicReference<ClusterResource> CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT = new AtomicReference<>(NO_CLUSTER_ID);
@@ -69,6 +70,7 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
+ ON_ACKNOWLEDGEMENT_COUNT.incrementAndGet();
// This will ensure that we get the cluster metadata when onAcknowledgement is called for the first time
// as subsequent compareAndSet operations will fail.
CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.compareAndSet(NO_CLUSTER_ID, CLUSTER_META.get());