You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/04 23:17:18 UTC

[kafka] branch 2.6 updated: KAFKA-10066: TestOutputTopic should pass record headers into deserializers (#8759)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 382bb28  KAFKA-10066: TestOutputTopic should pass record headers into deserializers (#8759)
382bb28 is described below

commit 382bb28ac960aafe9fa5eae791606cabd903c52c
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu Jun 4 16:00:59 2020 -0700

    KAFKA-10066: TestOutputTopic should pass record headers into deserializers (#8759)
    
    Reviewers: John Roesler <jo...@confluent.io>, Boyang Chen <bo...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>
---
 .../apache/kafka/streams/TopologyTestDriver.java   |  8 ++--
 .../kafka/streams/TopologyTestDriverTest.java      | 54 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 4 deletions(-)

diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 923a980..db31012 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -802,8 +802,8 @@ public class TopologyTestDriver implements Closeable {
         if (record == null) {
             return null;
         }
-        final K key = keyDeserializer.deserialize(record.topic(), record.key());
-        final V value = valueDeserializer.deserialize(record.topic(), record.value());
+        final K key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key());
+        final V value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value());
         return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value, record.headers());
     }
 
@@ -906,8 +906,8 @@ public class TopologyTestDriver implements Closeable {
         if (record == null) {
             throw new NoSuchElementException("Empty topic: " + topic);
         }
-        final K key = keyDeserializer.deserialize(record.topic(), record.key());
-        final V value = valueDeserializer.deserialize(record.topic(), record.value());
+        final K key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key());
+        final V value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value());
         return new TestRecord<>(key, value, record.headers(), record.timestamp());
     }
 
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 793f907..9b7b554 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -22,10 +22,13 @@ import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.SystemTime;
@@ -70,6 +73,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -712,6 +716,56 @@ public class TopologyTestDriverTest {
     }
 
     @Test
+    public void shouldPassRecordHeadersIntoSerializersAndDeserializers() {
+        testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
+
+        final AtomicBoolean passedHeadersToKeySerializer = new AtomicBoolean(false);
+        final AtomicBoolean passedHeadersToValueSerializer = new AtomicBoolean(false);
+        final AtomicBoolean passedHeadersToKeyDeserializer = new AtomicBoolean(false);
+        final AtomicBoolean passedHeadersToValueDeserializer = new AtomicBoolean(false);
+
+        final Serializer<byte[]> keySerializer = new ByteArraySerializer() {
+            @Override
+            public byte[] serialize(final String topic, final Headers headers, final byte[] data) {
+                passedHeadersToKeySerializer.set(true);
+                return serialize(topic, data);
+            }
+        };
+        final Serializer<byte[]> valueSerializer = new ByteArraySerializer() {
+            @Override
+            public byte[] serialize(final String topic, final Headers headers, final byte[] data) {
+                passedHeadersToValueSerializer.set(true);
+                return serialize(topic, data);
+            }
+        };
+
+        final Deserializer<byte[]> keyDeserializer = new ByteArrayDeserializer() {
+            @Override
+            public byte[] deserialize(final String topic, final Headers headers, final byte[] data) {
+                passedHeadersToKeyDeserializer.set(true);
+                return deserialize(topic, data);
+            }
+        };
+        final Deserializer<byte[]> valueDeserializer = new ByteArrayDeserializer() {
+            @Override
+            public byte[] deserialize(final String topic, final Headers headers, final byte[] data) {
+                passedHeadersToValueDeserializer.set(true);
+                return deserialize(topic, data);
+            }
+        };
+
+        final TestInputTopic<byte[], byte[]> inputTopic = testDriver.createInputTopic(SOURCE_TOPIC_1, keySerializer, valueSerializer);
+        final TestOutputTopic<byte[], byte[]> outputTopic = testDriver.createOutputTopic(SINK_TOPIC_1, keyDeserializer, valueDeserializer);
+        inputTopic.pipeInput(testRecord1);
+        outputTopic.readRecord();
+
+        assertThat(passedHeadersToKeySerializer.get(), equalTo(true));
+        assertThat(passedHeadersToValueSerializer.get(), equalTo(true));
+        assertThat(passedHeadersToKeyDeserializer.get(), equalTo(true));
+        assertThat(passedHeadersToValueDeserializer.get(), equalTo(true));
+    }
+
+    @Test
     public void shouldUseSinkSpecificSerializers() {
         final Topology topology = new Topology();