You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/05/22 22:44:43 UTC
[kafka] branch trunk updated: KAFKA-6850: Add Record Header support
to Kafka Streams Processor API (KIP-244) (#4955)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 133108c KAFKA-6850: Add Record Header support to Kafka Streams Processor API (KIP-244) (#4955)
133108c is described below
commit 133108cdacf7ee1cc4569e797f2cdf9ec60f7fdd
Author: Jorge Quilcate Otoya <qu...@gmail.com>
AuthorDate: Wed May 23 00:44:37 2018 +0200
KAFKA-6850: Add Record Header support to Kafka Streams Processor API (KIP-244) (#4955)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../kafka/streams/processor/ProcessorContext.java | 7 +
.../internals/AbstractProcessorContext.java | 10 ++
.../ForwardingDisabledProcessorContext.java | 6 +
.../processor/internals/GlobalStateUpdateTask.java | 3 +-
.../internals/ProcessorRecordContext.java | 17 +-
.../processor/internals/RecordCollector.java | 3 +
.../processor/internals/RecordCollectorImpl.java | 7 +-
.../streams/processor/internals/RecordContext.java | 7 +
.../processor/internals/RecordDeserializer.java | 2 +-
.../streams/processor/internals/SinkNode.java | 2 +-
.../streams/processor/internals/StampedRecord.java | 5 +
.../processor/internals/StandbyContextImpl.java | 3 +
.../streams/processor/internals/StreamTask.java | 8 +-
.../state/internals/CachingKeyValueStore.java | 13 +-
.../state/internals/CachingSessionStore.java | 11 +-
.../state/internals/CachingWindowStore.java | 11 +-
.../streams/state/internals/LRUCacheEntry.java | 18 +-
.../streams/state/internals/StoreChangeLogger.java | 3 +-
.../integration/utils/IntegrationTestUtils.java | 92 +++++++++-
.../kstream/internals/KGroupedStreamImplTest.java | 2 +-
.../internals/KStreamGlobalKTableJoinTest.java | 2 +-
.../internals/KStreamGlobalKTableLeftJoinTest.java | 2 +-
.../kstream/internals/KStreamKTableJoinTest.java | 4 +-
.../internals/KStreamKTableLeftJoinTest.java | 2 +-
...KStreamSessionWindowAggregateProcessorTest.java | 2 +-
.../internals/KTableKTableInnerJoinTest.java | 2 +-
.../internals/KTableKTableLeftJoinTest.java | 2 +-
.../internals/KTableKTableOuterJoinTest.java | 2 +-
.../internals/KTableKTableRightJoinTest.java | 2 +-
.../internals/KTableTransformValuesTest.java | 4 +-
.../internals/AbstractProcessorContextTest.java | 28 ++-
.../processor/internals/ProcessorTopologyTest.java | 65 ++++++-
.../processor/internals/RecordCollectorTest.java | 68 +++----
.../processor/internals/RecordContextStub.java | 21 ++-
.../internals/RecordDeserializerTest.java | 10 +-
.../streams/state/KeyValueStoreTestDriver.java | 3 +
.../state/internals/CachingKeyValueStoreTest.java | 3 +-
.../state/internals/CachingSessionStoreTest.java | 2 +-
.../state/internals/CachingWindowStoreTest.java | 2 +-
.../ChangeLoggingKeyValueBytesStoreTest.java | 2 +
.../ChangeLoggingSessionBytesStoreTest.java | 2 +
.../ChangeLoggingWindowBytesStoreTest.java | 2 +
.../streams/state/internals/NamedCacheTest.java | 32 ++--
.../state/internals/RocksDBWindowStoreTest.java | 4 +-
.../state/internals/StoreChangeLoggerTest.java | 13 ++
.../streams/state/internals/ThreadCacheTest.java | 8 +-
.../kafka/test/InternalMockProcessorContext.java | 12 +-
.../org/apache/kafka/test/KStreamTestDriver.java | 7 +-
.../org/apache/kafka/test/NoOpRecordCollector.java | 3 +
.../apache/kafka/streams/TopologyTestDriver.java | 20 ++-
.../streams/processor/MockProcessorContext.java | 13 +-
.../kafka/streams/test/ConsumerRecordFactory.java | 170 ++++++++++++++++--
.../apache/kafka/streams/test/OutputVerifier.java | 199 +++++++++++++++++++++
.../kafka/streams/MockProcessorContextTest.java | 2 +-
.../kafka/streams/TopologyTestDriverTest.java | 19 +-
.../streams/test/ConsumerRecordFactoryTest.java | 7 +-
56 files changed, 839 insertions(+), 132 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 79d191c..f2a9f64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
@@ -198,6 +199,12 @@ public interface ProcessorContext {
long offset();
/**
+ * Returns the headers of the current input record
+ * @return the headers
+ */
+ Headers headers();
+
+ /**
* Returns the current timestamp.
*
* If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 9687477..3338669 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -142,6 +143,15 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
return recordContext.offset();
}
+ @Override
+ public Headers headers() {
+ if (recordContext == null) {
+ throw new IllegalStateException("This should not happen as headers() should only be called while a record is processed");
+ }
+
+ return recordContext.headers();
+ }
+
/**
* @throws IllegalStateException if timestamp is null
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 35a0a7e..7e2610c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
@@ -133,6 +134,11 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
}
@Override
+ public Headers headers() {
+ return delegate.headers();
+ }
+
+ @Override
public long timestamp() {
return delegate.timestamp();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 26bf493..d387713 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -89,7 +89,8 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
new ProcessorRecordContext(deserialized.timestamp(),
deserialized.offset(),
deserialized.partition(),
- deserialized.topic());
+ deserialized.topic(),
+ deserialized.headers());
processorContext.setRecordContext(recordContext);
processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
sourceNodeAndDeserializer.sourceNode().process(deserialized.key(), deserialized.value());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 92acfc9..c071525 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.header.Headers;
+
import java.util.Objects;
public class ProcessorRecordContext implements RecordContext {
@@ -24,16 +26,19 @@ public class ProcessorRecordContext implements RecordContext {
private final long offset;
private final String topic;
private final int partition;
+ private final Headers headers;
public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
- final String topic) {
+ final String topic,
+ final Headers headers) {
this.timestamp = timestamp;
this.offset = offset;
this.topic = topic;
this.partition = partition;
+ this.headers = headers;
}
public long offset() {
@@ -59,6 +64,11 @@ public class ProcessorRecordContext implements RecordContext {
}
@Override
+ public Headers headers() {
+ return headers;
+ }
+
+ @Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
@@ -66,11 +76,12 @@ public class ProcessorRecordContext implements RecordContext {
return timestamp == that.timestamp &&
offset == that.offset &&
partition == that.partition &&
- Objects.equals(topic, that.topic);
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(headers, that.headers);
}
@Override
public int hashCode() {
- return Objects.hash(timestamp, offset, topic, partition);
+ return Objects.hash(timestamp, offset, topic, partition, headers);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index b083869..bf10da2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -28,6 +29,7 @@ public interface RecordCollector {
<K, V> void send(final String topic,
final K key,
final V value,
+ final Headers headers,
final Integer partition,
final Long timestamp,
final Serializer<K> keySerializer,
@@ -36,6 +38,7 @@ public interface RecordCollector {
<K, V> void send(final String topic,
final K key,
final V value,
+ final Headers headers,
final Long timestamp,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 8167539..1c8b0a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
@@ -77,6 +78,7 @@ public class RecordCollectorImpl implements RecordCollector {
public <K, V> void send(final String topic,
final K key,
final V value,
+ final Headers headers,
final Long timestamp,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
@@ -93,7 +95,7 @@ public class RecordCollectorImpl implements RecordCollector {
}
}
- send(topic, key, value, partition, timestamp, keySerializer, valueSerializer);
+ send(topic, key, value, headers, partition, timestamp, keySerializer, valueSerializer);
}
private boolean productionExceptionIsFatal(final Exception exception) {
@@ -142,6 +144,7 @@ public class RecordCollectorImpl implements RecordCollector {
public <K, V> void send(final String topic,
final K key,
final V value,
+ final Headers headers,
final Integer partition,
final Long timestamp,
final Serializer<K> keySerializer,
@@ -150,7 +153,7 @@ public class RecordCollectorImpl implements RecordCollector {
final byte[] keyBytes = keySerializer.serialize(topic, key);
final byte[] valBytes = valueSerializer.serialize(topic, value);
- final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes);
+ final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
try {
producer.send(serializedRecord, new Callback() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
index dd58f4c..15add71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.Processor;
/**
@@ -47,4 +48,10 @@ public interface RecordContext {
* @return The partition the record was received on
*/
int partition();
+
+ /**
+ * @return The headers from the record received from Kafka
+ */
+ Headers headers();
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 36e2c9a..ade9664 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -63,7 +63,7 @@ class RecordDeserializer {
rawRecord.serializedKeySize(),
rawRecord.serializedValueSize(),
sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()),
- sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()));
+ sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()), rawRecord.headers());
} catch (final Exception deserializationException) {
final DeserializationExceptionHandler.DeserializationHandlerResponse response;
try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 0fbd6dc..7711905 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -84,7 +84,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
}
try {
- collector.send(topic, key, value, timestamp, keySerializer, valSerializer, partitioner);
+ collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner);
} catch (final ClassCastException e) {
final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
index 243c41a..aa9b79d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
public class StampedRecord extends Stamped<ConsumerRecord<Object, Object>> {
@@ -44,6 +45,10 @@ public class StampedRecord extends Stamped<ConsumerRecord<Object, Object>> {
return value.offset();
}
+ public Headers headers() {
+ return value.headers();
+ }
+
@Override
public String toString() {
return value.toString() + ", timestamp = " + timestamp;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 6aeca44..14f986c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
@@ -40,6 +41,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
public <K, V> void send(final String topic,
final K key,
final V value,
+ final Headers headers,
final Integer partition,
final Long timestamp,
final Serializer<K> keySerializer,
@@ -50,6 +52,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
public <K, V> void send(final String topic,
final K key,
final V value,
+ final Headers headers,
final Long timestamp,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 633e7ad..e2be3e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -343,7 +343,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
- processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
+ processorContext.setRecordContext(
+ new ProcessorRecordContext(
+ record.timestamp,
+ record.offset(),
+ record.partition(),
+ record.topic(),
+ record.headers()));
processorContext.setCurrentNode(currNode);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 525e92d..285bde5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -224,8 +224,17 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
}
private void putInternal(final Bytes key, final byte[] value) {
- cache.put(cacheName, key, new LRUCacheEntry(value, true, context.offset(),
- context.timestamp(), context.partition(), context.topic()));
+ cache.put(
+ cacheName,
+ key,
+ new LRUCacheEntry(
+ value,
+ context.headers(),
+ true,
+ context.offset(),
+ context.timestamp(),
+ context.partition(),
+ context.topic()));
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 1bb2ea7..c099faf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -141,8 +141,15 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
public void put(final Windowed<Bytes> key, byte[] value) {
validateStoreOpen();
final Bytes binaryKey = Bytes.wrap(SessionKeySchema.toBinary(key));
- final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(),
- key.window().end(), context.partition(), context.topic());
+ final LRUCacheEntry entry =
+ new LRUCacheEntry(
+ value,
+ context.headers(),
+ true,
+ context.offset(),
+ key.window().end(),
+ context.partition(),
+ context.topic());
cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 7e58b68..ca24ffd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -150,8 +150,15 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
validateStoreOpen();
final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0);
- final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(),
- timestamp, context.partition(), context.topic());
+ final LRUCacheEntry entry =
+ new LRUCacheEntry(
+ value,
+ context.headers(),
+ true,
+ context.offset(),
+ timestamp,
+ context.partition(),
+ context.topic());
cache.put(name, cacheFunction.cacheKey(keyBytes), entry);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index af7059b..78c0331 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.internals.RecordContext;
/**
@@ -24,6 +25,7 @@ import org.apache.kafka.streams.processor.internals.RecordContext;
class LRUCacheEntry implements RecordContext {
public final byte[] value;
+ private final Headers headers;
private final long offset;
private final String topic;
private final int partition;
@@ -33,13 +35,18 @@ class LRUCacheEntry implements RecordContext {
private boolean isDirty;
LRUCacheEntry(final byte[] value) {
- this(value, false, -1, -1, -1, "");
+ this(value, null, false, -1, -1, -1, "");
}
- LRUCacheEntry(final byte[] value, final boolean isDirty,
- final long offset, final long timestamp, final int partition,
+ LRUCacheEntry(final byte[] value,
+ final Headers headers,
+ final boolean isDirty,
+ final long offset,
+ final long timestamp,
+ final int partition,
final String topic) {
this.value = value;
+ this.headers = headers;
this.partition = partition;
this.topic = topic;
this.offset = offset;
@@ -78,6 +85,11 @@ class LRUCacheEntry implements RecordContext {
return partition;
}
+ @Override
+ public Headers headers() {
+ return headers;
+ }
+
void markClean() {
isDirty = false;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 1055df5..a8a04c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -55,7 +55,8 @@ class StoreChangeLogger<K, V> {
if (collector != null) {
final Serializer<K> keySerializer = serialization.keySerializer();
final Serializer<V> valueSerializer = serialization.valueSerializer();
- collector.send(this.topic, key, value, this.partition, context.timestamp(), keySerializer, valueSerializer);
+ // Sending null headers to changelog topics (KIP-244)
+ collector.send(this.topic, key, value, null, this.partition, context.timestamp(), keySerializer, valueSerializer);
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index d306ee4..fe897c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -91,6 +92,12 @@ public class IntegrationTestUtils {
IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, time, false);
}
+ public static <K, V> void produceKeyValuesSynchronously(
+ final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Headers headers, final Time time)
+ throws ExecutionException, InterruptedException {
+ IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, headers, time, false);
+ }
+
/**
* @param topic Kafka topic to write the data records to
* @param records Data records to write to Kafka
@@ -102,10 +109,21 @@ public class IntegrationTestUtils {
public static <K, V> void produceKeyValuesSynchronously(
final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Time time, final boolean enableTransactions)
throws ExecutionException, InterruptedException {
+ IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, null, time, enableTransactions);
+ }
+
+ public static <K, V> void produceKeyValuesSynchronously(final String topic,
+ final Collection<KeyValue<K, V>> records,
+ final Properties producerConfig,
+ final Headers headers,
+ final Time time,
+ final boolean enableTransactions)
+ throws ExecutionException, InterruptedException {
for (final KeyValue<K, V> record : records) {
produceKeyValuesSynchronouslyWithTimestamp(topic,
Collections.singleton(record),
producerConfig,
+ headers,
time.milliseconds(),
enableTransactions);
time.sleep(1L);
@@ -123,20 +141,39 @@ public class IntegrationTestUtils {
public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
final Collection<KeyValue<K, V>> records,
final Properties producerConfig,
+ final Headers headers,
+ final Long timestamp)
+ throws ExecutionException, InterruptedException {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, headers, timestamp, false);
+ }
+
+ public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
+ final Collection<KeyValue<K, V>> records,
+ final Properties producerConfig,
final Long timestamp,
final boolean enableTransactions)
throws ExecutionException, InterruptedException {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions);
+ }
+
+ public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
+ final Collection<KeyValue<K, V>> records,
+ final Properties producerConfig,
+ final Headers headers,
+ final Long timestamp,
+ final boolean enabledTransactions)
+ throws ExecutionException, InterruptedException {
try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
- if (enableTransactions) {
+ if (enabledTransactions) {
producer.initTransactions();
producer.beginTransaction();
}
for (final KeyValue<K, V> record : records) {
final Future<RecordMetadata> f = producer.send(
- new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
+ new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers));
f.get();
}
- if (enableTransactions) {
+ if (enabledTransactions) {
producer.commitTransaction();
}
producer.flush();
@@ -194,6 +231,12 @@ public class IntegrationTestUtils {
}
}
+ public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig,
+ final String topic,
+ final int expectedNumRecords) throws InterruptedException {
+ return waitUntilMinRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
+ }
+
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords) throws InterruptedException {
@@ -232,6 +275,27 @@ public class IntegrationTestUtils {
return accumData;
}
+ public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig,
+ final String topic,
+ final int expectedNumRecords,
+ final long waitTime) throws InterruptedException {
+ final List<ConsumerRecord<K, V>> accumData = new ArrayList<>();
+ try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+ final TestCondition valuesRead = new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ final List<ConsumerRecord<K, V>> readData =
+ readRecords(topic, consumer, waitTime, expectedNumRecords);
+ accumData.addAll(readData);
+ return accumData.size() >= expectedNumRecords;
+ }
+ };
+ final String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
+ TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+ }
+ return accumData;
+ }
+
public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords) throws InterruptedException {
@@ -380,21 +444,33 @@ public class IntegrationTestUtils {
final Consumer<K, V> consumer,
final long waitTime,
final int maxMessages) {
- final List<KeyValue<K, V>> consumedValues;
+ final List<KeyValue<K, V>> consumedValues = new ArrayList<>();
+ final List<ConsumerRecord<K, V>> records = readRecords(topic, consumer, waitTime, maxMessages);
+ for (final ConsumerRecord<K, V> record : records) {
+ consumedValues.add(new KeyValue<>(record.key(), record.value()));
+ }
+ return consumedValues;
+ }
+
+ private static <K, V> List<ConsumerRecord<K, V>> readRecords(final String topic,
+ final Consumer<K, V> consumer,
+ final long waitTime,
+ final int maxMessages) {
+ final List<ConsumerRecord<K, V>> consumerRecords;
consumer.subscribe(Collections.singletonList(topic));
final int pollIntervalMs = 100;
- consumedValues = new ArrayList<>();
+ consumerRecords = new ArrayList<>();
int totalPollTimeMs = 0;
while (totalPollTimeMs < waitTime &&
- continueConsuming(consumedValues.size(), maxMessages)) {
+ continueConsuming(consumerRecords.size(), maxMessages)) {
totalPollTimeMs += pollIntervalMs;
final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
for (final ConsumerRecord<K, V> record : records) {
- consumedValues.add(new KeyValue<>(record.key(), record.value()));
+ consumerRecords.add(record);
}
}
- return consumedValues;
+ return consumerRecords;
}
private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index e5acd01..1517f0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -531,7 +531,7 @@ public class KGroupedStreamImplTest {
driver.pipeInput(recordFactory.create(TOPIC, "1", "D"));
driver.pipeInput(recordFactory.create(TOPIC, "3", "E"));
driver.pipeInput(recordFactory.create(TOPIC, "3", "F"));
- driver.pipeInput(recordFactory.create(TOPIC, "3", null));
+ driver.pipeInput(recordFactory.create(TOPIC, "3", (String) null));
}
private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index ae76362..d5c5a54 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -106,7 +106,7 @@ public class KStreamGlobalKTableJoinTest {
private void pushNullValueToGlobalTable(final int messageCount) {
final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
for (int i = 0; i < messageCount; i++) {
- driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], null));
+ driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], (String) null));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index 95fe8b9..248c3ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -108,7 +108,7 @@ public class KStreamGlobalKTableLeftJoinTest {
private void pushNullValueToGlobalTable(final int messageCount) {
final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
for (int i = 0; i < messageCount; i++) {
- driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], null));
+ driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], (String) null));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 43eaf3b..6ffce04 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -97,7 +97,7 @@ public class KStreamKTableJoinTest {
private void pushNullValueToTable() {
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], null));
+ driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], (String) null));
}
}
@@ -205,7 +205,7 @@ public class KStreamKTableJoinTest {
@Test
public void shouldLogAndMeterWhenSkippingNullLeftValue() {
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- driver.pipeInput(recordFactory.create(streamTopic, 1, null));
+ driver.pipeInput(recordFactory.create(streamTopic, 1, (String) null));
LogCaptureAppender.unregister(appender);
assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 251e58e..1c3e027 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -93,7 +93,7 @@ public class KStreamKTableLeftJoinTest {
private void pushNullValueToTable(final int messageCount) {
for (int i = 0; i < messageCount; i++) {
- driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], null));
+ driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], (String) null));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index afc9be1..6b5e577 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -308,7 +308,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
public void shouldLogAndMeterWhenSkippingNullKey() {
initStore(false);
processor.init(context);
- context.setRecordContext(new ProcessorRecordContext(-1, -2, -3, "topic"));
+ context.setRecordContext(new ProcessorRecordContext(-1, -2, -3, "topic", null));
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
processor.process(null, "1");
LogCaptureAppender.unregister(appender);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 0c56531..cd29b50 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -361,7 +361,7 @@ public class KTableKTableInnerJoinTest {
).get();
final MockProcessorContext context = new MockProcessorContext();
- context.setRecordMetadata("left", -1, -2, -3);
+ context.setRecordMetadata("left", -1, -2, null, -3);
join.init(context);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
join.process(null, new Change<>("new", "old"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index ef64f75..9be6189 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -424,7 +424,7 @@ public class KTableKTableLeftJoinTest {
).get();
final MockProcessorContext context = new MockProcessorContext();
- context.setRecordMetadata("left", -1, -2, -3);
+ context.setRecordMetadata("left", -1, -2, null, -3);
join.init(context);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
join.process(null, new Change<>("new", "old"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index e897ec3..3995fcf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -366,7 +366,7 @@ public class KTableKTableOuterJoinTest {
).get();
final MockProcessorContext context = new MockProcessorContext();
- context.setRecordMetadata("left", -1, -2, -3);
+ context.setRecordMetadata("left", -1, -2, null, -3);
join.init(context);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
join.process(null, new Change<>("new", "old"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
index d7411cb..d4805a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
@@ -41,7 +41,7 @@ public class KTableKTableRightJoinTest {
).get();
final MockProcessorContext context = new MockProcessorContext();
- context.setRecordMetadata("left", -1, -2, -3);
+ context.setRecordMetadata("left", -1, -2, null, -3);
join.init(context);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
join.process(null, new Change<>("new", "old"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 7c12dad..be20c86 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -325,7 +325,7 @@ public class KTableTransformValuesTest {
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 0L));
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", null, 0L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", (String) null, 0L));
assertThat(output(), hasItems("A:A->a!", "B:B->b!", "D:D->null!"));
assertThat("Store should not be materialized", driver.getKeyValueStore(QUERYABLE_NAME), is(nullValue()));
@@ -349,7 +349,7 @@ public class KTableTransformValuesTest {
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 0L));
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "C", null, 0L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "C", (String) null, 0L));
assertThat(output(), hasItems("A:A->a!", "B:B->b!", "C:C->null!"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 86806b2..9aaa8a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -16,6 +16,10 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
@@ -44,7 +48,8 @@ public class AbstractProcessorContextTest {
private final MockStreamsMetrics metrics = new MockStreamsMetrics(new Metrics());
private final AbstractProcessorContext context = new TestProcessorContext(metrics);
private final MockStateStore stateStore = new MockStateStore("store", false);
- private final RecordContext recordContext = new RecordContextStub(10, System.currentTimeMillis(), 1, "foo");
+ private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
+ private final RecordContext recordContext = new RecordContextStub(10, System.currentTimeMillis(), 1, "foo", headers);
@Before
public void before() {
@@ -141,6 +146,27 @@ public class AbstractProcessorContextTest {
assertThat(context.timestamp(), equalTo(recordContext.timestamp()));
}
+ @Test
+ public void shouldReturnHeadersFromRecordContext() {
+ assertThat(context.headers(), equalTo(recordContext.headers()));
+ }
+
+ @Test
+ public void shouldReturnNullIfHeadersAreNotSet() {
+ context.setRecordContext(new RecordContextStub(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
+ assertThat(context.headers(), nullValue());
+ }
+
+ @Test
+ public void shouldThrowIllegalStateExceptionOnHeadersIfNoRecordContext() {
+ context.setRecordContext(null);
+ try {
+ context.headers();
+ } catch (final IllegalStateException e) {
+ // pass
+ }
+ }
+
@SuppressWarnings("unchecked")
@Test
public void appConfigsShouldReturnParsedValues() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index e247647..033c0e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -18,6 +18,10 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
@@ -64,6 +68,9 @@ public class ProcessorTopologyTest {
private static final String OUTPUT_TOPIC_2 = "output-topic-2";
private static final String THROUGH_TOPIC_1 = "through-topic-1";
+ private static final Header HEADER = new RecordHeader("key", "value".getBytes());
+ private static final Headers HEADERS = new RecordHeaders(new Header[]{HEADER});
+
private final TopologyWrapper topology = new TopologyWrapper();
private final MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER, 0L);
@@ -71,6 +78,7 @@ public class ProcessorTopologyTest {
private TopologyTestDriver driver;
private final Properties props = new Properties();
+
@Before
public void setup() {
// Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
@@ -338,10 +346,33 @@ public class ProcessorTopologyTest {
assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 40L);
}
+ @Test
+ public void shouldConsiderHeaders() {
+ final int partition = 10;
+ driver = new TopologyTestDriver(createSimpleTopology(partition), props);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", HEADERS, 10L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", HEADERS, 20L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", HEADERS, 30L));
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", HEADERS, partition, 10L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", HEADERS, partition, 20L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, partition, 30L);
+ }
+
+ @Test
+ public void shouldAddHeaders() {
+ driver = new TopologyTestDriver(createAddHeaderTopology(), props);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", HEADERS, 10L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", HEADERS, 20L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, 30L);
+ }
+
private void assertNextOutputRecord(final String topic,
final String key,
final String value) {
- assertNextOutputRecord(topic, key, value, null, 0L);
+ assertNextOutputRecord(topic, key, value, (Integer) null, 0L);
}
private void assertNextOutputRecord(final String topic,
@@ -354,6 +385,23 @@ public class ProcessorTopologyTest {
private void assertNextOutputRecord(final String topic,
final String key,
final String value,
+ final Headers headers,
+ final Long timestamp) {
+ assertNextOutputRecord(topic, key, value, headers, null, timestamp);
+ }
+
+ private void assertNextOutputRecord(final String topic,
+ final String key,
+ final String value,
+ final Integer partition,
+ final Long timestamp) {
+ assertNextOutputRecord(topic, key, value, new RecordHeaders(), partition, timestamp);
+ }
+
+ private void assertNextOutputRecord(final String topic,
+ final String key,
+ final String value,
+ final Headers headers,
final Integer partition,
final Long timestamp) {
final ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
@@ -362,6 +410,7 @@ public class ProcessorTopologyTest {
assertEquals(value, record.value());
assertEquals(partition, record.partition());
assertEquals(timestamp, record.timestamp());
+ assertEquals(headers, record.headers());
}
private void assertNoOutputRecord(final String topic) {
@@ -458,6 +507,12 @@ public class ProcessorTopologyTest {
.addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2");
}
+ private Topology createAddHeaderTopology() {
+ return topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+ .addProcessor("processor-1", define(new AddHeaderProcessor()), "source-1")
+ .addSink("sink-1", OUTPUT_TOPIC_1, "processor-1");
+ }
+
/**
* A processor that simply forwards all messages to all children.
*/
@@ -478,6 +533,14 @@ public class ProcessorTopologyTest {
}
}
+ protected static class AddHeaderProcessor extends AbstractProcessor<String, String> {
+ @Override
+ public void process(final String key, final String value) {
+ context().headers().add(HEADER);
+ context().forward(key, value);
+ }
+ }
+
/**
* A processor that removes custom timestamp information from messages and forwards modified messages to each child.
* A message contains custom timestamp information if the value is in ".*@[0-9]+" format.
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 8a2f171..e439372 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -27,6 +27,10 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Sum;
@@ -85,14 +89,16 @@ public class RecordCollectorTest {
new Metrics().sensor("skipped-records")
);
- collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
- collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
- collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
+ final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
- collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
- collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
+ collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer);
+ collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer);
+ collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer);
- collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);
+ collector.send("topic1", "999", "0", headers, 1, null, stringSerializer, stringSerializer);
+ collector.send("topic1", "999", "0", headers, 1, null, stringSerializer, stringSerializer);
+
+ collector.send("topic1", "999", "0", headers, 2, null, stringSerializer, stringSerializer);
final Map<TopicPartition, Long> offsets = collector.offsets();
@@ -101,9 +107,9 @@ public class RecordCollectorTest {
assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
// ignore StreamPartitioner
- collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
- collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
- collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);
+ collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer);
+ collector.send("topic1", "999", "0", null, 1, null, stringSerializer, stringSerializer);
+ collector.send("topic1", "999", "0", headers, 2, null, stringSerializer, stringSerializer);
assertEquals((Long) 3L, offsets.get(new TopicPartition("topic1", 0)));
assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
@@ -121,17 +127,19 @@ public class RecordCollectorTest {
new Metrics().sensor("skipped-records")
);
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "27", "0", null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "81", "0", null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "243", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
+
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "9", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "27", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "81", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "243", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "28", "0", null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "82", "0", null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "244", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "28", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "82", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "244", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "245", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "245", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
final Map<TopicPartition, Long> offsets = collector.offsets();
@@ -155,7 +163,7 @@ public class RecordCollectorTest {
new DefaultProductionExceptionHandler(),
new Metrics().sensor("skipped-records"));
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
}
@SuppressWarnings("unchecked")
@@ -174,10 +182,10 @@ public class RecordCollectorTest {
new DefaultProductionExceptionHandler(),
new Metrics().sensor("skipped-records"));
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
try {
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
fail("Should have thrown StreamsException");
} catch (final StreamsException expected) { /* ok */ }
}
@@ -198,9 +206,9 @@ public class RecordCollectorTest {
new AlwaysContinueProductionExceptionHandler(),
new Metrics().sensor("skipped-records"));
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
}
@SuppressWarnings("unchecked")
@@ -223,7 +231,7 @@ public class RecordCollectorTest {
logContext,
new AlwaysContinueProductionExceptionHandler(),
sensor);
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
assertEquals(1.0, metrics.metrics().get(metricName).metricValue());
assertTrue(logCaptureAppender.getMessages().contains("test Error sending records (key=[3] value=[0] timestamp=[null]) to topic=[topic1] and partition=[0]; The exception handler chose to CONTINUE processing in spite of this error."));
LogCaptureAppender.unregister(logCaptureAppender);
@@ -245,7 +253,7 @@ public class RecordCollectorTest {
new DefaultProductionExceptionHandler(),
new Metrics().sensor("skipped-records"));
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
try {
collector.flush();
@@ -269,7 +277,7 @@ public class RecordCollectorTest {
new AlwaysContinueProductionExceptionHandler(),
new Metrics().sensor("skipped-records"));
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
collector.flush();
}
@@ -290,7 +298,7 @@ public class RecordCollectorTest {
new DefaultProductionExceptionHandler(),
new Metrics().sensor("skipped-records"));
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
try {
collector.close();
@@ -314,7 +322,7 @@ public class RecordCollectorTest {
new AlwaysContinueProductionExceptionHandler(),
new Metrics().sensor("skipped-records"));
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
collector.close();
}
@@ -334,7 +342,7 @@ public class RecordCollectorTest {
logContext,
new DefaultProductionExceptionHandler(),
new Metrics().sensor("skipped-records"));
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
}
@SuppressWarnings("unchecked")
@@ -352,6 +360,6 @@ public class RecordCollectorTest {
logContext,
new AlwaysContinueProductionExceptionHandler(),
new Metrics().sensor("skipped-records"));
- collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
index 0af5e17..7afd51e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
@@ -16,25 +16,37 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.header.Headers;
+
public class RecordContextStub implements RecordContext {
private final long offset;
private long timestamp;
private final int partition;
private final String topic;
+ private final Headers headers;
public RecordContextStub() {
- this(-1, -1, -1, "");
+ this(-1, -1, -1, "", null);
}
public RecordContextStub(final long offset,
final long timestamp,
final int partition,
- final String topic) {
+ final String topic,
+ final Headers headers) {
this.offset = offset;
this.timestamp = timestamp;
this.partition = partition;
this.topic = topic;
+ this.headers = headers;
+ }
+
+ public RecordContextStub(final long offset,
+ final long timestamp,
+ final int partition,
+ final String topic) {
+ this(offset, timestamp, partition, topic, null);
}
@Override
@@ -61,4 +73,9 @@ public class RecordContextStub implements RecordContext {
public int partition() {
return partition;
}
+
+ @Override
+ public Headers headers() {
+ return headers;
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index de8e17b..36988c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -17,7 +17,10 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
@@ -29,16 +32,18 @@ import static org.junit.Assert.assertEquals;
public class RecordDeserializerTest {
+ private final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())});
private final ConsumerRecord<byte[], byte[]> rawRecord = new ConsumerRecord<>("topic",
1,
1,
10,
TimestampType.LOG_APPEND_TIME,
- 5,
+ 5L,
3,
5,
new byte[0],
- new byte[0]);
+ new byte[0],
+ headers);
@SuppressWarnings("deprecation")
@@ -63,6 +68,7 @@ public class RecordDeserializerTest {
assertEquals("value", record.value());
assertEquals(rawRecord.timestamp(), record.timestamp());
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+ assertEquals(rawRecord.headers(), record.headers());
}
static class TheSourceNode extends SourceNode<Object, Object> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 4e80fa7..4b9e6a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
@@ -198,6 +199,7 @@ public class KeyValueStoreTestDriver<K, V> {
public <K1, V1> void send(final String topic,
final K1 key,
final V1 value,
+ final Headers headers,
final Integer partition,
final Long timestamp,
final Serializer<K1> keySerializer,
@@ -214,6 +216,7 @@ public class KeyValueStoreTestDriver<K, V> {
public <K1, V1> void send(final String topic,
final K1 key,
final V1 value,
+ final Headers headers,
final Long timestamp,
final Serializer<K1> keySerializer,
final Serializer<V1> valueSerializer,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 3e0241e..2f6aac7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -75,7 +75,8 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
context = new InternalMockProcessorContext(null, null, null, (RecordCollector) null, cache);
topic = "topic";
- context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic));
+ context.setRecordContext(
+ new ProcessorRecordContext(10, 0, 0, topic, null));
store.init(context, null);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index b77f4e9..baa9ee4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -76,7 +76,7 @@ public class CachingSessionStoreTest {
);
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
- context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic"));
+ context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic", null));
cachingStore.init(context, cachingStore);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index a87b2e4..b8808ca 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -81,7 +81,7 @@ public class CachingWindowStoreTest {
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
topic = "topic";
context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
- context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic));
+ context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic, null));
cachingStore.init(context, cachingStore);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 5bb0de7..7f5a08e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
@@ -57,6 +58,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
public <K, V> void send(final String topic,
K key,
V value,
+ Headers headers,
Integer partition,
Long timestamp,
Serializer<K> keySerializer,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index a658186..edcaa05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
@@ -49,6 +50,7 @@ public class ChangeLoggingSessionBytesStoreTest {
public <K, V> void send(final String topic,
K key,
V value,
+ Headers headers,
Integer partition,
Long timestamp,
Serializer<K> keySerializer,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index 956172e..e56887e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
@@ -47,6 +48,7 @@ public class ChangeLoggingWindowBytesStoreTest {
public <K, V> void send(final String topic,
K key,
V value,
+ Headers headers,
Integer partition,
Long timestamp,
Serializer<K> keySerializer,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 9ae0feb..92653ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -16,6 +16,10 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
@@ -42,6 +46,7 @@ import static org.junit.Assert.assertSame;
public class NamedCacheTest {
+ private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
private NamedCache cache;
private StreamsMetricsImpl metrics;
private final String taskIDString = "0.0";
@@ -64,7 +69,7 @@ public class NamedCacheTest {
for (int i = 0; i < toInsert.size(); i++) {
byte[] key = toInsert.get(i).key.getBytes();
byte[] value = toInsert.get(i).value.getBytes();
- cache.put(Bytes.wrap(key), new LRUCacheEntry(value, true, 1, 1, 1, ""));
+ cache.put(Bytes.wrap(key), new LRUCacheEntry(value, null, true, 1, 1, 1, ""));
LRUCacheEntry head = cache.first();
LRUCacheEntry tail = cache.last();
assertEquals(new String(head.value), toInsert.get(i).value);
@@ -170,9 +175,9 @@ public class NamedCacheTest {
@Test
public void shouldFlushDirtEntriesOnEviction() {
final List<ThreadCache.DirtyEntry> flushed = new ArrayList<>();
- cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, ""));
+ cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
- cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, true, 0, 0, 0, ""));
+ cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, headers, true, 0, 0, 0, ""));
cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
@Override
@@ -185,6 +190,7 @@ public class NamedCacheTest {
assertEquals(2, flushed.size());
assertEquals(Bytes.wrap(new byte[] {0}), flushed.get(0).key());
+ assertEquals(headers, flushed.get(0).recordContext().headers());
assertArrayEquals(new byte[] {10}, flushed.get(0).newValue());
assertEquals(Bytes.wrap(new byte[] {2}), flushed.get(1).key());
assertArrayEquals(new byte[] {30}, flushed.get(1).newValue());
@@ -193,9 +199,9 @@ public class NamedCacheTest {
@Test
public void shouldGetRangeIteratorOverKeys() {
- cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, ""));
+ cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
- cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, true, 0, 0, 0, ""));
+ cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, ""));
final Iterator<Bytes> iterator = cache.keyRange(Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{2}));
assertEquals(Bytes.wrap(new byte[]{1}), iterator.next());
@@ -205,9 +211,9 @@ public class NamedCacheTest {
@Test
public void shouldGetIteratorOverAllKeys() {
- cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, ""));
+ cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
- cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, true, 0, 0, 0, ""));
+ cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, ""));
final Iterator<Bytes> iterator = cache.allKeys();
assertEquals(Bytes.wrap(new byte[]{0}), iterator.next());
@@ -223,8 +229,8 @@ public class NamedCacheTest {
@Test(expected = IllegalStateException.class)
public void shouldThrowIllegalStateExceptionWhenTryingToOverwriteDirtyEntryWithCleanEntry() {
- cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, ""));
- cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, false, 0, 0, 0, ""));
+ cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
+ cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, null, false, 0, 0, 0, ""));
}
@Test
@@ -235,8 +241,8 @@ public class NamedCacheTest {
// no-op
}
});
- cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, true, 0, 0, 0, ""));
- cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, true, 0, 0, 0, ""));
+ cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, headers, true, 0, 0, 0, ""));
+ cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, null, true, 0, 0, 0, ""));
cache.flush();
assertEquals(1, cache.size());
assertNotNull(cache.get(Bytes.wrap(new byte[]{1})));
@@ -244,7 +250,7 @@ public class NamedCacheTest {
@Test
public void shouldBeReentrantAndNotBreakLRU() {
- final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 0, 0, "");
+ final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, null, true, 0, 0, 0, "");
final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
cache.put(Bytes.wrap(new byte[]{0}), dirty);
cache.put(Bytes.wrap(new byte[]{1}), clean);
@@ -290,7 +296,7 @@ public class NamedCacheTest {
@Test
public void shouldNotThrowIllegalArgumentAfterEvictingDirtyRecordAndThenPuttingNewRecordWithSameKey() {
- final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 0, 0, "");
+ final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, null, true, 0, 0, 0, "");
final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
final Bytes key = Bytes.wrap(new byte[] {3});
cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 92edbd8..be4ede8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
@@ -90,6 +91,7 @@ public class RocksDBWindowStoreTest {
public <K1, V1> void send(final String topic,
final K1 key,
final V1 value,
+ final Headers headers,
final Integer partition,
final Long timestamp,
final Serializer<K1> keySerializer,
@@ -160,7 +162,7 @@ public class RocksDBWindowStoreTest {
}
private ProcessorRecordContext createRecordContext(final long time) {
- return new ProcessorRecordContext(time, 0, 0, "topic");
+ return new ProcessorRecordContext(time, 0, 0, "topic", null);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 6bacd91..5afe14f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
@@ -38,6 +40,7 @@ public class StoreChangeLoggerTest {
private final String topic = "topic";
private final Map<Integer, String> logged = new HashMap<>();
+ private final Map<Integer, Headers> loggedHeaders = new HashMap<>();
private final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")) {
@@ -45,17 +48,20 @@ public class StoreChangeLoggerTest {
public <K1, V1> void send(final String topic,
final K1 key,
final V1 value,
+ final Headers headers,
final Integer partition,
final Long timestamp,
final Serializer<K1> keySerializer,
final Serializer<V1> valueSerializer) {
logged.put((Integer) key, (String) value);
+ loggedHeaders.put((Integer) key, headers);
}
@Override
public <K1, V1> void send(final String topic,
final K1 key,
final V1 value,
+ final Headers headers,
final Long timestamp,
final Serializer<K1> keySerializer,
final Serializer<V1> valueSerializer,
@@ -80,6 +86,13 @@ public class StoreChangeLoggerTest {
changeLogger.logChange(0, null);
assertNull(logged.get(0));
+ }
+
+ @Test
+ public void shouldNotSendRecordHeadersToChangelogTopic() {
+ context.headers().add(new RecordHeader("key", "value".getBytes()));
+ changeLogger.logChange(0, "zero");
+ assertNull(loggedHeaders.get(0));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 164e71e..d100ae5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -59,7 +59,7 @@ public class ThreadCacheTest {
for (KeyValue<String, String> kvToInsert : toInsert) {
Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
byte[] value = kvToInsert.value.getBytes();
- cache.put(namespace, key, new LRUCacheEntry(value, true, 1L, 1L, 1, ""));
+ cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1L, 1L, 1, ""));
}
for (KeyValue<String, String> kvToInsert : toInsert) {
@@ -89,7 +89,7 @@ public class ThreadCacheTest {
String keyStr = "K" + i;
Bytes key = Bytes.wrap(keyStr.getBytes());
byte[] value = new byte[valueSizeBytes];
- cache.put(namespace, key, new LRUCacheEntry(value, true, 1L, 1L, 1, ""));
+ cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1L, 1L, 1, ""));
}
@@ -171,7 +171,7 @@ public class ThreadCacheTest {
for (KeyValue<String, String> kvToInsert : toInsert) {
final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
final byte[] value = kvToInsert.value.getBytes();
- cache.put(namespace, key, new LRUCacheEntry(value, true, 1, 1, 1, ""));
+ cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1, 1, 1, ""));
}
for (int i = 0; i < expected.size(); i++) {
@@ -520,7 +520,7 @@ public class ThreadCacheTest {
}
private LRUCacheEntry dirtyEntry(final byte[] key) {
- return new LRUCacheEntry(key, true, -1, -1, -1, "");
+ return new LRUCacheEntry(key, null, true, -1, -1, -1, "");
}
private LRUCacheEntry cleanEntry(final byte[] key) {
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index eb72e13..e5571eb 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.test;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
@@ -242,7 +244,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
// and also not throwing exceptions if record context is not available.
public void setTime(final long timestamp) {
if (recordContext != null) {
- recordContext = new ProcessorRecordContext(timestamp, recordContext.offset(), recordContext.partition(), recordContext.topic());
+ recordContext = new ProcessorRecordContext(timestamp, recordContext.offset(), recordContext.partition(), recordContext.topic(), recordContext.headers());
}
this.timestamp = timestamp;
}
@@ -279,6 +281,14 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
return recordContext.offset();
}
+ @Override
+ public Headers headers() {
+ if (recordContext == null) {
+ return new RecordHeaders();
+ }
+ return recordContext.headers();
+ }
+
Map<String, StateStore> allStateStores() {
return Collections.unmodifiableMap(storeMap);
}
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 2c3461a..698cdc7 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.test;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -90,7 +91,7 @@ public class KStreamTestDriver extends ExternalResource {
final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics()));
context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic", null));
// init global topology first as it will add stores to the
// store map that are required for joins etc.
@@ -229,7 +230,7 @@ public class KStreamTestDriver extends ExternalResource {
}
private ProcessorRecordContext createRecordContext(final String topicName, final long timestamp) {
- return new ProcessorRecordContext(timestamp, -1, -1, topicName);
+ return new ProcessorRecordContext(timestamp, -1, -1, topicName, null);
}
private class MockRecordCollector extends RecordCollectorImpl {
@@ -241,6 +242,7 @@ public class KStreamTestDriver extends ExternalResource {
public <K, V> void send(final String topic,
final K key,
final V value,
+ final Headers headers,
final Long timestamp,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
@@ -255,6 +257,7 @@ public class KStreamTestDriver extends ExternalResource {
public <K, V> void send(final String topic,
final K key,
final V value,
+ final Headers headers,
final Integer partition,
final Long timestamp,
final Serializer<K> keySerializer,
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
index 66271a0..893d356 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
@@ -17,6 +17,7 @@
package org.apache.kafka.test;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -30,6 +31,7 @@ public class NoOpRecordCollector implements RecordCollector {
public <K, V> void send(final String topic,
final K key,
final V value,
+ final Headers headers,
final Integer partition,
final Long timestamp,
final Serializer<K> keySerializer,
@@ -39,6 +41,7 @@ public class NoOpRecordCollector implements RecordCollector {
public <K, V> void send(final String topic,
final K key,
final V value,
+ final Headers headers,
final Long timestamp,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index c237ca7..e46ec6a 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -227,7 +227,7 @@ public class TopologyTestDriver implements Closeable {
* @param config the configuration for the topology
*/
TopologyTestDriver(final InternalTopologyBuilder builder,
- final Properties config) {
+ final Properties config) {
this(builder, config, System.currentTimeMillis());
}
@@ -382,14 +382,16 @@ public class TopologyTestDriver implements Closeable {
offset,
consumerRecord.timestamp(),
consumerRecord.timestampType(),
- ConsumerRecord.NULL_CHECKSUM,
+ (long) ConsumerRecord.NULL_CHECKSUM,
consumerRecord.serializedKeySize(),
consumerRecord.serializedValueSize(),
consumerRecord.key(),
- consumerRecord.value())));
+ consumerRecord.value(),
+ consumerRecord.headers())));
// Process the record ...
- ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(consumerRecord.timestamp(), offset, topicPartition.partition(), topicName));
+ ((InternalProcessorContext) task.context()).setRecordContext(
+ new ProcessorRecordContext(consumerRecord.timestamp(), offset, topicPartition.partition(), topicName, consumerRecord.headers()));
task.process();
task.maybePunctuateStreamTime();
task.commit();
@@ -407,11 +409,12 @@ public class TopologyTestDriver implements Closeable {
offset,
consumerRecord.timestamp(),
consumerRecord.timestampType(),
- ConsumerRecord.NULL_CHECKSUM,
+ (long) ConsumerRecord.NULL_CHECKSUM,
consumerRecord.serializedKeySize(),
consumerRecord.serializedValueSize(),
consumerRecord.key(),
- consumerRecord.value()));
+ consumerRecord.value(),
+ consumerRecord.headers()));
globalStateTask.flushState();
}
}
@@ -467,7 +470,8 @@ public class TopologyTestDriver implements Closeable {
serializedKey == null ? 0 : serializedKey.length,
serializedValue == null ? 0 : serializedValue.length,
serializedKey,
- serializedValue));
+ serializedValue,
+ record.headers()));
}
}
}
@@ -536,7 +540,7 @@ public class TopologyTestDriver implements Closeable {
}
final K key = keyDeserializer.deserialize(record.topic(), record.key());
final V value = valueDeserializer.deserialize(record.topic(), record.value());
- return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value);
+ return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value, record.headers());
}
/**
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 3e29cde..b14a791 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
@@ -61,6 +62,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
private String topic;
private Integer partition;
private Long offset;
+ private Headers headers;
private Long timestamp;
// mocks ================================================
@@ -250,10 +252,11 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
* @param offset A record offset
* @param timestamp A record timestamp
*/
- public void setRecordMetadata(final String topic, final int partition, final long offset, final long timestamp) {
+ public void setRecordMetadata(final String topic, final int partition, final long offset, final Headers headers, final long timestamp) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
+ this.headers = headers;
this.timestamp = timestamp;
}
@@ -289,6 +292,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
this.offset = offset;
}
+ public void setHeaders(final Headers headers) {
+ this.headers = headers;
+ }
/**
* The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
@@ -325,6 +331,11 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
}
@Override
+ public Headers headers() {
+ return headers;
+ }
+
+ @Override
public long timestamp() {
if (timestamp == null) {
throw new IllegalStateException("Timestamp must be set before use via setRecordMetadata() or setTimestamp().");
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
index b0ccd61..507249d 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -18,6 +18,8 @@ package org.apache.kafka.streams.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
@@ -153,20 +155,23 @@ public class ConsumerRecordFactory<K, V> {
}
/**
- * Create a {@link ConsumerRecord} with the given topic name, key, value, and timestamp.
+ * Create a {@link ConsumerRecord} with the given topic name, key, value, headers, and timestamp.
* Does not auto advance internally tracked time.
*
* @param topicName the topic name
* @param key the record key
* @param value the record value
+ * @param headers the record headers
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final K key,
final V value,
+ final Headers headers,
final long timestampMs) {
Objects.requireNonNull(topicName, "topicName cannot be null.");
+ Objects.requireNonNull(headers, "headers cannot be null.");
final byte[] serializedKey = keySerializer.serialize(topicName, key);
final byte[] serializedValue = valueSerializer.serialize(topicName, value);
return new ConsumerRecord<>(
@@ -175,11 +180,29 @@ public class ConsumerRecordFactory<K, V> {
-1L,
timestampMs,
TimestampType.CREATE_TIME,
- ConsumerRecord.NULL_CHECKSUM,
+ (long) ConsumerRecord.NULL_CHECKSUM,
serializedKey == null ? 0 : serializedKey.length,
serializedValue == null ? 0 : serializedValue.length,
serializedKey,
- serializedValue);
+ serializedValue,
+ headers);
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with the given topic name and given topic, key, value, and timestamp.
+ * Does not auto advance internally tracked time.
+ *
+ * @param topicName the topic name
+ * @param key the record key
+ * @param value the record value
+ * @param timestampMs the record timestamp
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final String topicName,
+ final K key,
+ final V value,
+ final long timestampMs) {
+ return create(topicName, key, value, new RecordHeaders(), timestampMs);
}
/**
@@ -194,16 +217,33 @@ public class ConsumerRecordFactory<K, V> {
public ConsumerRecord<byte[], byte[]> create(final K key,
final V value,
final long timestampMs) {
+ return create(key, value, new RecordHeaders(), timestampMs);
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with default topic name and given key, value, headers, and timestamp.
+ * Does not auto advance internally tracked time.
+ *
+ * @param key the record key
+ * @param value the record value
+ * @param headers the record headers
+ * @param timestampMs the record timestamp
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final K key,
+ final V value,
+ final Headers headers,
+ final long timestampMs) {
if (topicName == null) {
throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
"Use #create(String topicName, K key, V value, long timestampMs) instead.");
}
- return create(topicName, key, value, timestampMs);
+ return create(topicName, key, value, headers, timestampMs);
}
/**
* Create a {@link ConsumerRecord} with the given topic name, key, and value.
- * The timestamp will be generated from the constructor provided and time will auto advance.
+ * The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param topicName the topic name
* @param key the record key
@@ -215,12 +255,31 @@ public class ConsumerRecordFactory<K, V> {
final V value) {
final long timestamp = timeMs;
timeMs += advanceMs;
- return create(topicName, key, value, timestamp);
+ return create(topicName, key, value, new RecordHeaders(), timestamp);
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with the given topic name, key, value, and headers.
+ * The timestamp will be generated based on the constructor provided start time and time will auto advance.
+ *
+ * @param topicName the topic name
+ * @param key the record key
+ * @param value the record value
+ * @param headers the record headers
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final String topicName,
+ final K key,
+ final V value,
+ final Headers headers) {
+ final long timestamp = timeMs;
+ timeMs += advanceMs;
+ return create(topicName, key, value, headers, timestamp);
}
/**
* Create a {@link ConsumerRecord} with default topic name and given key and value.
- * The timestamp will be generated from the constructor provided and time will auto advance.
+ * The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param key the record key
* @param value the record value
@@ -228,46 +287,110 @@ public class ConsumerRecordFactory<K, V> {
*/
public ConsumerRecord<byte[], byte[]> create(final K key,
final V value) {
+ return create(key, value, new RecordHeaders());
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with default topic name and given key, value, and headers.
+ * The timestamp will be generated based on the constructor provided start time and time will auto advance.
+ *
+ * @param key the record key
+ * @param value the record value
+ * @param headers the record headers
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final K key,
+ final V value,
+ final Headers headers) {
if (topicName == null) {
throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
"Use #create(String topicName, K key, V value) instead.");
}
- return create(topicName, key, value);
+ return create(topicName, key, value, headers);
}
/**
* Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, and timestamp.
+ * Does not auto advance internally tracked time.
+ *
+ * @param topicName the topic name
+ * @param value the record value
+ * @param timestampMs the record timestamp
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final String topicName,
+ final V value,
+ final long timestampMs) {
+ return create(topicName, null, value, new RecordHeaders(), timestampMs);
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, headers, and timestamp.
+ * Does not auto advance internally tracked time.
*
* @param topicName the topic name
* @param value the record value
+ * @param headers the record headers
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final V value,
+ final Headers headers,
final long timestampMs) {
- return create(topicName, null, value, timestampMs);
+ return create(topicName, null, value, headers, timestampMs);
}
/**
* Create a {@link ConsumerRecord} with default topic name and {@code null}-key as well as given value and timestamp.
+ * Does not auto advance internally tracked time.
+ *
+ * @param value the record value
+ * @param timestampMs the record timestamp
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final V value,
+ final long timestampMs) {
+ return create(value, new RecordHeaders(), timestampMs);
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with default topic name and {@code null}-key as well as given value, headers, and timestamp.
+ * Does not auto advance internally tracked time.
*
* @param value the record value
+ * @param headers the record headers
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
public ConsumerRecord<byte[], byte[]> create(final V value,
+ final Headers headers,
final long timestampMs) {
if (topicName == null) {
throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
"Use #create(String topicName, V value, long timestampMs) instead.");
}
- return create(topicName, value, timestampMs);
+ return create(topicName, value, headers, timestampMs);
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, and headers.
+ * The timestamp will be generated based on the constructor provided start time and time will auto advance.
+ *
+ * @param topicName the topic name
+ * @param value the record value
+ * @param headers the record headers
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final String topicName,
+ final V value,
+ final Headers headers) {
+ return create(topicName, null, value, headers);
}
/**
* Create a {@link ConsumerRecord} with {@code null}-key and the given topic name and value.
- * The timestamp will be generated from the constructor provided and time will auto advance.
+ * The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param topicName the topic name
* @param value the record value
@@ -275,27 +398,40 @@ public class ConsumerRecordFactory<K, V> {
*/
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final V value) {
- return create(topicName, null, value);
+ return create(topicName, null, value, new RecordHeaders());
}
/**
* Create a {@link ConsumerRecord} with default topic name and {@code null}-key was well as given value.
- * The timestamp will be generated from the constructor provided and time will auto advance.
+ * The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param value the record value
* @return the generated {@link ConsumerRecord}
*/
public ConsumerRecord<byte[], byte[]> create(final V value) {
+ return create(value, new RecordHeaders());
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with default topic name and {@code null}-key was well as given value and headers.
+ * The timestamp will be generated based on the constructor provided start time and time will auto advance.
+ *
+ * @param value the record value
+ * @param headers the record headers
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final V value,
+ final Headers headers) {
if (topicName == null) {
throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
"Use #create(String topicName, V value, long timestampMs) instead.");
}
- return create(topicName, value);
+ return create(topicName, value, headers);
}
/**
* Creates {@link ConsumerRecord consumer records} with the given topic name, keys, and values.
- * The timestamp will be generated from the constructor provided and time will auto advance.
+ * The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param topicName the topic name
* @param keyValues the record keys and values
@@ -314,7 +450,7 @@ public class ConsumerRecordFactory<K, V> {
/**
* Creates {@link ConsumerRecord consumer records} with default topic name as well as given keys and values.
- * The timestamp will be generated from the constructor provided and time will auto advance.
+ * The timestamp will be generated based on the constructor provided start time and time will auto advance.
*
* @param keyValues the record keys and values
* @return the generated {@link ConsumerRecord consumer records}
@@ -350,7 +486,7 @@ public class ConsumerRecordFactory<K, V> {
long timestamp = startTimestamp;
for (final KeyValue<K, V> keyValue : keyValues) {
- records.add(create(topicName, keyValue.key, keyValue.value, timestamp));
+ records.add(create(topicName, keyValue.key, keyValue.value, new RecordHeaders(), timestamp));
timestamp += advanceMs;
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
index 09ed294..aedb910 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.test;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.TopologyTestDriver;
import java.util.Objects;
@@ -238,4 +239,202 @@ public class OutputVerifier {
compareKeyValueTimestamp(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.timestamp());
}
+ /**
+ * Compares a {@link ProducerRecord} with the provided value and headers and throws an {@link AssertionError} if
+ * the {@code ProducerRecord}'s value or headers is not equal to the expected value or headers.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedValue the expected value of the {@code ProducerRecord}
+ * @param expectedHeaders the expected headers of the {@code ProducerRecord}
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s value or headers is not equal to {@code expectedValue} or {@code expectedHeaders}
+ */
+ public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> record,
+ final V expectedValue,
+ final Headers expectedHeaders) throws AssertionError {
+ Objects.requireNonNull(record);
+
+ final V recordValue = record.value();
+ final Headers recordHeaders = record.headers();
+ final AssertionError error = new AssertionError("Expected value=" + expectedValue + " with headers=" + expectedHeaders +
+ " but was value=" + recordValue + " with headers=" + recordHeaders);
+
+ if (recordValue != null) {
+ if (!recordValue.equals(expectedValue)) {
+ throw error;
+ }
+ } else if (expectedValue != null) {
+ throw error;
+ }
+
+ if (recordHeaders != null) {
+ if (!recordHeaders.equals(expectedHeaders)) {
+ throw error;
+ }
+ } else if (expectedHeaders != null) {
+ throw error;
+ }
+ }
+
+ /**
+ * Compares the values and headers of two {@link ProducerRecord}'s and throws an {@link AssertionError} if the
+ * values or headers are not equal to each other.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedRecord a {@code ProducerRecord} for verification
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s value or headers is not equal to {@code expectedRecord}'s value or headers
+ */
+ public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> record,
+ final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+ Objects.requireNonNull(expectedRecord);
+ compareValueHeaders(record, expectedRecord.value(), expectedRecord.headers());
+ }
+
+ /**
+ * Compares a {@link ProducerRecord} with the provided key, value, and headers and throws an
+ * {@link AssertionError} if the {@code ProducerRecord}'s key, value, or headers is not equal to the expected key,
+ * value, or headers.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedKey the expected key of the {@code ProducerRecord}
+ * @param expectedValue the expected value of the {@code ProducerRecord}
+ * @param expectedHeaders the expected headers of the {@code ProducerRecord}
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s key, value, headers is not equal to {@code expectedKey},
+ * {@code expectedValue}, or {@code expectedHeaders}
+ */
+ public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, V> record,
+ final K expectedKey,
+ final V expectedValue,
+ final Headers expectedHeaders) throws AssertionError {
+ Objects.requireNonNull(record);
+
+ final K recordKey = record.key();
+ final V recordValue = record.value();
+ final Headers recordHeaders = record.headers();
+ final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with headers=" + expectedHeaders +
+ " but was <" + recordKey + ", " + recordValue + "> with headers=" + recordHeaders);
+
+ if (recordKey != null) {
+ if (!recordKey.equals(expectedKey)) {
+ throw error;
+ }
+ } else if (expectedKey != null) {
+ throw error;
+ }
+
+ if (recordValue != null) {
+ if (!recordValue.equals(expectedValue)) {
+ throw error;
+ }
+ } else if (expectedValue != null) {
+ throw error;
+ }
+
+ if (recordHeaders != null) {
+ if (!recordHeaders.equals(expectedHeaders)) {
+ throw error;
+ }
+ } else if (expectedHeaders != null) {
+ throw error;
+ }
+ }
+
+ /**
+ * Compares the keys, values, and headers of two {@link ProducerRecord}'s and throws an {@link AssertionError} if
+ * the keys, values, or headers are not equal to each other.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedRecord a {@code ProducerRecord} for verification
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s key, value, or headers is not equal to
+ * {@code expectedRecord}'s key, value, or headers
+ */
+ public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, V> record,
+ final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+ Objects.requireNonNull(expectedRecord);
+ compareKeyValueHeaders(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.headers());
+ }
+
+ /**
+ * Compares a {@link ProducerRecord} with the provided key, value, headers, and timestamp and throws an
+ * {@link AssertionError} if the {@code ProducerRecord}'s key, value, headers, or timestamp is not equal to the expected key,
+ * value, headers, or timestamp.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedKey the expected key of the {@code ProducerRecord}
+ * @param expectedValue the expected value of the {@code ProducerRecord}
+ * @param expectedHeaders the expected headers of the {@code ProducerRecord}
+ * @param expectedTimestamp the expected timestamp of the {@code ProducerRecord}
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s key, value, headers is not equal to {@code expectedKey},
+ * {@code expectedValue}, or {@code expectedHeaders}
+ */
+ public static <K, V> void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record,
+ final K expectedKey,
+ final V expectedValue,
+ final Headers expectedHeaders,
+ final long expectedTimestamp) throws AssertionError {
+ Objects.requireNonNull(record);
+
+ final K recordKey = record.key();
+ final V recordValue = record.value();
+ final Headers recordHeaders = record.headers();
+ final long recordTimestamp = record.timestamp();
+ final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + ">" +
+ " with timestamp=" + expectedTimestamp + " and headers=" + expectedHeaders +
+ " but was <" + recordKey + ", " + recordValue + ">" +
+ " with timestamp=" + recordTimestamp + " and headers=" + recordHeaders);
+
+ if (recordKey != null) {
+ if (!recordKey.equals(expectedKey)) {
+ throw error;
+ }
+ } else if (expectedKey != null) {
+ throw error;
+ }
+
+ if (recordValue != null) {
+ if (!recordValue.equals(expectedValue)) {
+ throw error;
+ }
+ } else if (expectedValue != null) {
+ throw error;
+ }
+
+ if (recordHeaders != null) {
+ if (!recordHeaders.equals(expectedHeaders)) {
+ throw error;
+ }
+ } else if (expectedHeaders != null) {
+ throw error;
+ }
+
+ if (recordTimestamp != expectedTimestamp) {
+ throw error;
+ }
+ }
+
+ /**
+ * Compares the keys, values, headers, and timestamp of two {@link ProducerRecord}'s and throws an {@link AssertionError} if
+ * the keys, values, headers, or timestamps are not equal to each other.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedRecord a {@code ProducerRecord} for verification
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s key, value, headers, or timestamp is not equal to
+ * {@code expectedRecord}'s key, value, headers, or timestamp
+ */
+ public static <K, V> void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record,
+ final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+ Objects.requireNonNull(expectedRecord);
+ compareKeyValueHeadersTimestamp(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.headers(), expectedRecord.timestamp());
+ }
}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index dbb26e0..64d5b12 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -286,7 +286,7 @@ public class MockProcessorContextTest {
}
context.resetForwards();
- context.setRecordMetadata("t1", 0, 0L, 0L);
+ context.setRecordMetadata("t1", 0, 0L, null, 0L);
{
processor.process("foo", 5L);
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 6cc96a2..2d446d1 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -18,6 +18,10 @@ package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
@@ -74,10 +78,12 @@ public class TopologyTestDriverTest {
new ByteArraySerializer(),
new ByteArraySerializer());
+ private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
+
private final byte[] key1 = new byte[0];
private final byte[] value1 = new byte[0];
private final long timestamp1 = 42L;
- private final ConsumerRecord<byte[], byte[]> consumerRecord1 = consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, timestamp1);
+ private final ConsumerRecord<byte[], byte[]> consumerRecord1 = consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, headers, timestamp1);
private final byte[] key2 = new byte[0];
private final byte[] value2 = new byte[0];
@@ -107,6 +113,7 @@ public class TopologyTestDriverTest {
private long timestamp;
private long offset;
private String topic;
+ private Headers headers;
Record(final ConsumerRecord consumerRecord) {
key = consumerRecord.key();
@@ -114,15 +121,18 @@ public class TopologyTestDriverTest {
timestamp = consumerRecord.timestamp();
offset = consumerRecord.offset();
topic = consumerRecord.topic();
+ headers = consumerRecord.headers();
}
Record(final Object key,
final Object value,
+ final Headers headers,
final long timestamp,
final long offset,
final String topic) {
this.key = key;
this.value = value;
+ this.headers = headers;
this.timestamp = timestamp;
this.offset = offset;
this.topic = topic;
@@ -146,12 +156,13 @@ public class TopologyTestDriverTest {
offset == record.offset &&
Objects.equals(key, record.key) &&
Objects.equals(value, record.value) &&
- Objects.equals(topic, record.topic);
+ Objects.equals(topic, record.topic) &&
+ Objects.equals(headers, record.headers);
}
@Override
public int hashCode() {
- return Objects.hash(key, value, timestamp, offset, topic);
+ return Objects.hash(key, value, headers, timestamp, offset, topic);
}
}
@@ -201,7 +212,7 @@ public class TopologyTestDriverTest {
@Override
public void process(Object key, Object value) {
- processedRecords.add(new Record(key, value, context.timestamp(), context.offset(), context.topic()));
+ processedRecords.add(new Record(key, value, context.headers(), context.timestamp(), context.offset(), context.topic()));
context.forward(key, value);
}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
index 469d241..855aa9f 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
@@ -63,12 +63,17 @@ public class ConsumerRecordFactoryTest {
}
@Test(expected = NullPointerException.class)
+ public void shouldNotAllowToCreateTopicWithNullHeaders() {
+ factory.create(topicName, rawKey, value, null, timestamp);
+ }
+
+ @Test(expected = NullPointerException.class)
public void shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp() {
factory.create(null, rawKey, value);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowToCreateTopicWithNullTopicNameWithNulKey() {
+ public void shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey() {
factory.create((String) null, value, timestamp);
}
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.