You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2021/02/13 17:49:52 UTC

[kafka] branch trunk updated: KAFKA-12297: Make MockProducer return RecordMetadata with values as per contract

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b3313b8  KAFKA-12297: Make MockProducer return RecordMetadata with values as per contract
b3313b8 is described below

commit b3313b8636f8b6abad6389c99aa6d62a8976482a
Author: Akhilesh Dubey <ad...@confluent.io>
AuthorDate: Sat Feb 13 23:18:10 2021 +0530

    KAFKA-12297: Make MockProducer return RecordMetadata with values as per contract
    
    This is a simple change to MockProducer as per request in KAFKA-12297.
    MockProducer currently returns a null RecordMetadata on Exception. The fix will make MockProducer return the right value as per specification.
    
    This only impacts clients which use send with a custom callback and try to then use the RecordMetadata inspite of getting an exception. This should mostly impact customer unit and integration tests as the mock end point was never intended for use in a real Kafka cluster.
    
    Author: Akhilesh Dubey <ad...@confluent.io>
    Author: Manikumar Reddy <ma...@gmail.com>
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
    
    Closes #10110 from aadubey/trunk
---
 .../kafka/clients/producer/MockProducer.java       |  9 ++++++---
 .../kafka/clients/producer/MockProducerTest.java   | 22 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 03c17d5..6eefb89 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -314,7 +314,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
                 0L, 0, 0, Time.SYSTEM);
         long offset = nextOffset(topicPartition);
         Completion completion = new Completion(offset, new RecordMetadata(topicPartition, 0, offset,
-                RecordBatch.NO_TIMESTAMP, 0L, 0, 0), result, callback);
+                RecordBatch.NO_TIMESTAMP, 0L, 0, 0), result, callback, topicPartition);
 
         if (!this.transactionInFlight)
             this.sent.add(record);
@@ -512,15 +512,18 @@ public class MockProducer<K, V> implements Producer<K, V> {
         private final RecordMetadata metadata;
         private final ProduceRequestResult result;
         private final Callback callback;
+        private final TopicPartition tp;
 
         public Completion(long offset,
                           RecordMetadata metadata,
                           ProduceRequestResult result,
-                          Callback callback) {
+                          Callback callback,
+                          TopicPartition tp) {
             this.metadata = metadata;
             this.offset = offset;
             this.result = result;
             this.callback = callback;
+            this.tp = tp;
         }
 
         public void complete(RuntimeException e) {
@@ -529,7 +532,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
                 if (e == null)
                     callback.onCompletion(metadata, null);
                 else
-                    callback.onCompletion(null, e);
+                    callback.onCompletion(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1L, -1, -1), e);
             }
             result.done();
         }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 08dc8a7..27b6787 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.test.MockSerializer;
@@ -42,6 +43,7 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -735,6 +737,26 @@ public class MockProducerTest {
         assertTrue(producer.flushed());
     }
 
+    @Test
+    public void testMetadataOnException() throws InterruptedException {
+        buildMockProducer(false);
+        Future<RecordMetadata> metadata = producer.send(record2, (md, exception) -> {
+            assertNotNull(md);
+            assertEquals(md.offset(), -1L, "Invalid offset");
+            assertEquals(md.timestamp(), RecordBatch.NO_TIMESTAMP, "Invalid timestamp");
+            assertEquals(md.serializedKeySize(), -1L, "Invalid Serialized Key size");
+            assertEquals(md.serializedValueSize(), -1L, "Invalid Serialized value size");
+        });
+        IllegalArgumentException e = new IllegalArgumentException("dummy exception");
+        assertTrue(producer.errorNext(e), "Complete the second request with an error");
+        try {
+            metadata.get();
+            fail("Something went wrong, expected an error");
+        } catch (ExecutionException err) {
+            assertEquals(e, err.getCause());
+        }
+    }
+
     private boolean isError(Future<?> future) {
         try {
             future.get();