You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/10/09 22:30:18 UTC

[kafka] branch 2.1 updated: KAFKA-7483: Allow streams to pass headers through Serializer. (#5751)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 712ed98  KAFKA-7483: Allow streams to pass headers through Serializer. (#5751)
712ed98 is described below

commit 712ed98376b4a19ec3e0c6613a91306bfb5aa1ff
Author: Kamal Chandraprakash <ka...@gmail.com>
AuthorDate: Wed Oct 10 03:59:04 2018 +0530

    KAFKA-7483: Allow streams to pass headers through Serializer. (#5751)
    
    Satish Duggana <sd...@hortonworks.com>, Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../processor/internals/RecordCollectorImpl.java   |  4 +-
 .../processor/internals/RecordCollectorTest.java   | 60 +++++++++++++++++++---
 .../streams/state/KeyValueStoreTestDriver.java     |  4 +-
 .../state/internals/RocksDBWindowStoreTest.java    |  4 +-
 .../org/apache/kafka/test/MockRestoreConsumer.java |  5 +-
 .../kafka/streams/test/ConsumerRecordFactory.java  |  4 +-
 6 files changed, 65 insertions(+), 16 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 7e19297..5df14ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -153,8 +153,8 @@ public class RecordCollectorImpl implements RecordCollector {
                             final Serializer<K> keySerializer,
                             final Serializer<V> valueSerializer) {
         checkForException();
-        final byte[] keyBytes = keySerializer.serialize(topic, key);
-        final byte[] valBytes = valueSerializer.serialize(topic, value);
+        final byte[] keyBytes = keySerializer.serialize(topic, headers, key);
+        final byte[] valBytes = valueSerializer.serialize(topic, headers, value);
 
         final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 4f89a1e..c4e58be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -65,17 +65,14 @@ public class RecordCollectorTest {
     );
 
     private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos,
-        Collections.<String>emptySet(), Collections.<String>emptySet());
+        Collections.emptySet(), Collections.emptySet());
 
 
     private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
     private final StringSerializer stringSerializer = new StringSerializer();
 
-    private final StreamPartitioner<String, Object> streamPartitioner = new StreamPartitioner<String, Object>() {
-        @Override
-        public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
-            return Integer.parseInt(key) % numPartitions;
-        }
+    private final StreamPartitioner<String, Object> streamPartitioner = (topic, key, value, numPartitions) -> {
+        return Integer.parseInt(key) % numPartitions;
     };
 
     @Test
@@ -362,4 +359,55 @@ public class RecordCollectorTest {
         });
         collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
     }
+
+    @Test
+    public void testRecordHeaderPassThroughSerializer() {
+        final CustomStringSerializer keySerializer = new CustomStringSerializer();
+        final CustomStringSerializer valueSerializer = new CustomStringSerializer();
+        keySerializer.configure(Collections.EMPTY_MAP, true);
+
+        final RecordCollectorImpl collector = new RecordCollectorImpl(
+                "test",
+                logContext,
+                new DefaultProductionExceptionHandler(),
+                new Metrics().sensor("skipped-records")
+        );
+        final MockProducer<byte[], byte[]> mockProducer = new MockProducer<>(cluster, true, new DefaultPartitioner(),
+                byteArraySerializer, byteArraySerializer);
+        collector.init(mockProducer);
+
+        collector.send("topic1", "3", "0", new RecordHeaders(), null, keySerializer, valueSerializer, streamPartitioner);
+
+        final List<ProducerRecord<byte[], byte[]>> recordHistory = mockProducer.history();
+        for (final ProducerRecord<byte[], byte[]> sentRecord : recordHistory) {
+            final Headers headers = sentRecord.headers();
+            assertEquals(2, headers.toArray().length);
+            assertEquals(new RecordHeader("key", "key".getBytes()), headers.lastHeader("key"));
+            assertEquals(new RecordHeader("value", "value".getBytes()), headers.lastHeader("value"));
+        }
+    }
+
+    private static class CustomStringSerializer extends StringSerializer {
+
+        private boolean isKey;
+
+        private CustomStringSerializer() {
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean isKey) {
+            this.isKey = isKey;
+            super.configure(configs, isKey);
+        }
+
+        @Override
+        public byte[] serialize(final String topic, final Headers headers, final String data) {
+            if (isKey) {
+                headers.add(new RecordHeader("key", "key".getBytes()));
+            } else {
+                headers.add(new RecordHeader("value", "value".getBytes()));
+            }
+            return serialize(topic, data);
+        }
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 6999633..096f792 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -206,8 +206,8 @@ public class KeyValueStoreTestDriver<K, V> {
                                       final Serializer<V1> valueSerializer) {
                 // for byte arrays we need to wrap it for comparison
 
-                final K keyTest = serdes.keyFrom(keySerializer.serialize(topic, key));
-                final V valueTest = serdes.valueFrom(valueSerializer.serialize(topic, value));
+                final K keyTest = serdes.keyFrom(keySerializer.serialize(topic, headers, key));
+                final V valueTest = serdes.valueFrom(valueSerializer.serialize(topic, headers, value));
 
                 recordFlushed(keyTest, valueTest);
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 08f019f..cd0f49a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -96,8 +96,8 @@ public class RocksDBWindowStoreTest {
                                   final Serializer<K1> keySerializer,
                                   final Serializer<V1> valueSerializer) {
             changeLog.add(new KeyValue<>(
-                keySerializer.serialize(topic, key),
-                valueSerializer.serialize(topic, value))
+                keySerializer.serialize(topic, headers, key),
+                valueSerializer.serialize(topic, headers, value))
             );
         }
     };
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
index bc918ea..244b35f 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
@@ -61,8 +61,9 @@ public class MockRestoreConsumer<K, V> extends MockConsumer<byte[], byte[]> {
         recordBuffer.add(
             new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), record.timestamp(),
                                  record.timestampType(), 0L, 0, 0,
-                                 keySerializer.serialize(record.topic(), record.key()),
-                                 valueSerializer.serialize(record.topic(), record.value())));
+                                 keySerializer.serialize(record.topic(), record.headers(), record.key()),
+                                 valueSerializer.serialize(record.topic(), record.headers(), record.value()),
+                                 record.headers()));
         endOffset = record.offset();
 
         super.updateEndOffsets(Collections.singletonMap(assignedPartition, endOffset));
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
index 108dafd..87ec7c1 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -180,8 +180,8 @@ public class ConsumerRecordFactory<K, V> {
                                                  final long timestampMs) {
         Objects.requireNonNull(topicName, "topicName cannot be null.");
         Objects.requireNonNull(headers, "headers cannot be null.");
-        final byte[] serializedKey = keySerializer.serialize(topicName, key);
-        final byte[] serializedValue = valueSerializer.serialize(topicName, value);
+        final byte[] serializedKey = keySerializer.serialize(topicName, headers, key);
+        final byte[] serializedValue = valueSerializer.serialize(topicName, headers, value);
         return new ConsumerRecord<>(
             topicName,
             -1,