You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/04 23:17:18 UTC
[kafka] branch 2.6 updated: KAFKA-10066: TestOutputTopic should
pass record headers into deserializers (#8759)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 382bb28 KAFKA-10066: TestOutputTopic should pass record headers into deserializers (#8759)
382bb28 is described below
commit 382bb28ac960aafe9fa5eae791606cabd903c52c
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu Jun 4 16:00:59 2020 -0700
KAFKA-10066: TestOutputTopic should pass record headers into deserializers (#8759)
Reviewers: John Roesler <jo...@confluent.io>, Boyang Chen <bo...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>
---
.../apache/kafka/streams/TopologyTestDriver.java | 8 ++--
.../kafka/streams/TopologyTestDriverTest.java | 54 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 4 deletions(-)
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 923a980..db31012 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -802,8 +802,8 @@ public class TopologyTestDriver implements Closeable {
if (record == null) {
return null;
}
- final K key = keyDeserializer.deserialize(record.topic(), record.key());
- final V value = valueDeserializer.deserialize(record.topic(), record.value());
+ final K key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key());
+ final V value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value());
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value, record.headers());
}
@@ -906,8 +906,8 @@ public class TopologyTestDriver implements Closeable {
if (record == null) {
throw new NoSuchElementException("Empty topic: " + topic);
}
- final K key = keyDeserializer.deserialize(record.topic(), record.key());
- final V value = valueDeserializer.deserialize(record.topic(), record.value());
+ final K key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key());
+ final V value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value());
return new TestRecord<>(key, value, record.headers(), record.timestamp());
}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 793f907..9b7b554 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -22,10 +22,13 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.SystemTime;
@@ -70,6 +73,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -712,6 +716,56 @@ public class TopologyTestDriverTest {
}
@Test
+ public void shouldPassRecordHeadersIntoSerializersAndDeserializers() {
+ testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
+
+ final AtomicBoolean passedHeadersToKeySerializer = new AtomicBoolean(false);
+ final AtomicBoolean passedHeadersToValueSerializer = new AtomicBoolean(false);
+ final AtomicBoolean passedHeadersToKeyDeserializer = new AtomicBoolean(false);
+ final AtomicBoolean passedHeadersToValueDeserializer = new AtomicBoolean(false);
+
+ final Serializer<byte[]> keySerializer = new ByteArraySerializer() {
+ @Override
+ public byte[] serialize(final String topic, final Headers headers, final byte[] data) {
+ passedHeadersToKeySerializer.set(true);
+ return serialize(topic, data);
+ }
+ };
+ final Serializer<byte[]> valueSerializer = new ByteArraySerializer() {
+ @Override
+ public byte[] serialize(final String topic, final Headers headers, final byte[] data) {
+ passedHeadersToValueSerializer.set(true);
+ return serialize(topic, data);
+ }
+ };
+
+ final Deserializer<byte[]> keyDeserializer = new ByteArrayDeserializer() {
+ @Override
+ public byte[] deserialize(final String topic, final Headers headers, final byte[] data) {
+ passedHeadersToKeyDeserializer.set(true);
+ return deserialize(topic, data);
+ }
+ };
+ final Deserializer<byte[]> valueDeserializer = new ByteArrayDeserializer() {
+ @Override
+ public byte[] deserialize(final String topic, final Headers headers, final byte[] data) {
+ passedHeadersToValueDeserializer.set(true);
+ return deserialize(topic, data);
+ }
+ };
+
+ final TestInputTopic<byte[], byte[]> inputTopic = testDriver.createInputTopic(SOURCE_TOPIC_1, keySerializer, valueSerializer);
+ final TestOutputTopic<byte[], byte[]> outputTopic = testDriver.createOutputTopic(SINK_TOPIC_1, keyDeserializer, valueDeserializer);
+ inputTopic.pipeInput(testRecord1);
+ outputTopic.readRecord();
+
+ assertThat(passedHeadersToKeySerializer.get(), equalTo(true));
+ assertThat(passedHeadersToValueSerializer.get(), equalTo(true));
+ assertThat(passedHeadersToKeyDeserializer.get(), equalTo(true));
+ assertThat(passedHeadersToValueDeserializer.get(), equalTo(true));
+ }
+
+ @Test
public void shouldUseSinkSpecificSerializers() {
final Topology topology = new Topology();