You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/05/22 22:44:43 UTC

[kafka] branch trunk updated: KAFKA-6850: Add Record Header support to Kafka Streams Processor API (KIP-244) (#4955)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 133108c  KAFKA-6850: Add Record Header support to Kafka Streams Processor API (KIP-244) (#4955)
133108c is described below

commit 133108cdacf7ee1cc4569e797f2cdf9ec60f7fdd
Author: Jorge Quilcate Otoya <qu...@gmail.com>
AuthorDate: Wed May 23 00:44:37 2018 +0200

    KAFKA-6850: Add Record Header support to Kafka Streams Processor API (KIP-244) (#4955)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../kafka/streams/processor/ProcessorContext.java  |   7 +
 .../internals/AbstractProcessorContext.java        |  10 ++
 .../ForwardingDisabledProcessorContext.java        |   6 +
 .../processor/internals/GlobalStateUpdateTask.java |   3 +-
 .../internals/ProcessorRecordContext.java          |  17 +-
 .../processor/internals/RecordCollector.java       |   3 +
 .../processor/internals/RecordCollectorImpl.java   |   7 +-
 .../streams/processor/internals/RecordContext.java |   7 +
 .../processor/internals/RecordDeserializer.java    |   2 +-
 .../streams/processor/internals/SinkNode.java      |   2 +-
 .../streams/processor/internals/StampedRecord.java |   5 +
 .../processor/internals/StandbyContextImpl.java    |   3 +
 .../streams/processor/internals/StreamTask.java    |   8 +-
 .../state/internals/CachingKeyValueStore.java      |  13 +-
 .../state/internals/CachingSessionStore.java       |  11 +-
 .../state/internals/CachingWindowStore.java        |  11 +-
 .../streams/state/internals/LRUCacheEntry.java     |  18 +-
 .../streams/state/internals/StoreChangeLogger.java |   3 +-
 .../integration/utils/IntegrationTestUtils.java    |  92 +++++++++-
 .../kstream/internals/KGroupedStreamImplTest.java  |   2 +-
 .../internals/KStreamGlobalKTableJoinTest.java     |   2 +-
 .../internals/KStreamGlobalKTableLeftJoinTest.java |   2 +-
 .../kstream/internals/KStreamKTableJoinTest.java   |   4 +-
 .../internals/KStreamKTableLeftJoinTest.java       |   2 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |   2 +-
 .../internals/KTableKTableInnerJoinTest.java       |   2 +-
 .../internals/KTableKTableLeftJoinTest.java        |   2 +-
 .../internals/KTableKTableOuterJoinTest.java       |   2 +-
 .../internals/KTableKTableRightJoinTest.java       |   2 +-
 .../internals/KTableTransformValuesTest.java       |   4 +-
 .../internals/AbstractProcessorContextTest.java    |  28 ++-
 .../processor/internals/ProcessorTopologyTest.java |  65 ++++++-
 .../processor/internals/RecordCollectorTest.java   |  68 +++----
 .../processor/internals/RecordContextStub.java     |  21 ++-
 .../internals/RecordDeserializerTest.java          |  10 +-
 .../streams/state/KeyValueStoreTestDriver.java     |   3 +
 .../state/internals/CachingKeyValueStoreTest.java  |   3 +-
 .../state/internals/CachingSessionStoreTest.java   |   2 +-
 .../state/internals/CachingWindowStoreTest.java    |   2 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java       |   2 +
 .../ChangeLoggingSessionBytesStoreTest.java        |   2 +
 .../ChangeLoggingWindowBytesStoreTest.java         |   2 +
 .../streams/state/internals/NamedCacheTest.java    |  32 ++--
 .../state/internals/RocksDBWindowStoreTest.java    |   4 +-
 .../state/internals/StoreChangeLoggerTest.java     |  13 ++
 .../streams/state/internals/ThreadCacheTest.java   |   8 +-
 .../kafka/test/InternalMockProcessorContext.java   |  12 +-
 .../org/apache/kafka/test/KStreamTestDriver.java   |   7 +-
 .../org/apache/kafka/test/NoOpRecordCollector.java |   3 +
 .../apache/kafka/streams/TopologyTestDriver.java   |  20 ++-
 .../streams/processor/MockProcessorContext.java    |  13 +-
 .../kafka/streams/test/ConsumerRecordFactory.java  | 170 ++++++++++++++++--
 .../apache/kafka/streams/test/OutputVerifier.java  | 199 +++++++++++++++++++++
 .../kafka/streams/MockProcessorContextTest.java    |   2 +-
 .../kafka/streams/TopologyTestDriverTest.java      |  19 +-
 .../streams/test/ConsumerRecordFactoryTest.java    |   7 +-
 56 files changed, 839 insertions(+), 132 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 79d191c..f2a9f64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -198,6 +199,12 @@ public interface ProcessorContext {
     long offset();
 
     /**
+     * Returns the headers of the current input record
+     * @return the headers
+     */
+    Headers headers();
+
+    /**
      * Returns the current timestamp.
      *
      * If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 9687477..3338669 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -142,6 +143,15 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
         return recordContext.offset();
     }
 
+    @Override
+    public Headers headers() {
+        if (recordContext == null) {
+            throw new IllegalStateException("This should not happen as headers() should only be called while a record is processed");
+        }
+
+        return recordContext.headers();
+    }
+
     /**
      * @throws IllegalStateException if timestamp is null
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 35a0a7e..7e2610c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -133,6 +134,11 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
     }
 
     @Override
+    public Headers headers() {
+        return delegate.headers();
+    }
+
+    @Override
     public long timestamp() {
         return delegate.timestamp();
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 26bf493..d387713 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -89,7 +89,8 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
                 new ProcessorRecordContext(deserialized.timestamp(),
                     deserialized.offset(),
                     deserialized.partition(),
-                    deserialized.topic());
+                    deserialized.topic(),
+                    deserialized.headers());
             processorContext.setRecordContext(recordContext);
             processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
             sourceNodeAndDeserializer.sourceNode().process(deserialized.key(), deserialized.value());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 92acfc9..c071525 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Headers;
+
 import java.util.Objects;
 
 public class ProcessorRecordContext implements RecordContext {
@@ -24,16 +26,19 @@ public class ProcessorRecordContext implements RecordContext {
     private final long offset;
     private final String topic;
     private final int partition;
+    private final Headers headers;
 
     public ProcessorRecordContext(final long timestamp,
                                   final long offset,
                                   final int partition,
-                                  final String topic) {
+                                  final String topic,
+                                  final Headers headers) {
 
         this.timestamp = timestamp;
         this.offset = offset;
         this.topic = topic;
         this.partition = partition;
+        this.headers = headers;
     }
 
     public long offset() {
@@ -59,6 +64,11 @@ public class ProcessorRecordContext implements RecordContext {
     }
 
     @Override
+    public Headers headers() {
+        return headers;
+    }
+
+    @Override
     public boolean equals(final Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
@@ -66,11 +76,12 @@ public class ProcessorRecordContext implements RecordContext {
         return timestamp == that.timestamp &&
                 offset == that.offset &&
                 partition == that.partition &&
-                Objects.equals(topic, that.topic);
+                Objects.equals(topic, that.topic) &&
+                Objects.equals(headers, that.headers);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(timestamp, offset, topic, partition);
+        return Objects.hash(timestamp, offset, topic, partition, headers);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index b083869..bf10da2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
@@ -28,6 +29,7 @@ public interface RecordCollector {
     <K, V> void send(final String topic,
                      final K key,
                      final V value,
+                     final Headers headers,
                      final Integer partition,
                      final Long timestamp,
                      final Serializer<K> keySerializer,
@@ -36,6 +38,7 @@ public interface RecordCollector {
     <K, V> void send(final String topic,
                      final K key,
                      final V value,
+                     final Headers headers,
                      final Long timestamp,
                      final Serializer<K> keySerializer,
                      final Serializer<V> valueSerializer,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 8167539..1c8b0a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.ProductionExceptionHandler;
@@ -77,6 +78,7 @@ public class RecordCollectorImpl implements RecordCollector {
     public <K, V> void send(final String topic,
                             final K key,
                             final V value,
+                            final Headers headers,
                             final Long timestamp,
                             final Serializer<K> keySerializer,
                             final Serializer<V> valueSerializer,
@@ -93,7 +95,7 @@ public class RecordCollectorImpl implements RecordCollector {
             }
         }
 
-        send(topic, key, value, partition, timestamp, keySerializer, valueSerializer);
+        send(topic, key, value, headers, partition, timestamp, keySerializer, valueSerializer);
     }
 
     private boolean productionExceptionIsFatal(final Exception exception) {
@@ -142,6 +144,7 @@ public class RecordCollectorImpl implements RecordCollector {
     public <K, V> void send(final String topic,
                             final K key,
                             final V value,
+                            final Headers headers,
                             final Integer partition,
                             final Long timestamp,
                             final Serializer<K> keySerializer,
@@ -150,7 +153,7 @@ public class RecordCollectorImpl implements RecordCollector {
         final byte[] keyBytes = keySerializer.serialize(topic, key);
         final byte[] valBytes = valueSerializer.serialize(topic, value);
 
-        final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes);
+        final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
 
         try {
             producer.send(serializedRecord, new Callback() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
index dd58f4c..15add71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.processor.Processor;
 
 /**
@@ -47,4 +48,10 @@ public interface RecordContext {
      * @return The partition the record was received on
      */
     int partition();
+
+    /**
+     * @return The headers from the record received from Kafka
+     */
+    Headers headers();
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 36e2c9a..ade9664 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -63,7 +63,7 @@ class RecordDeserializer {
                 rawRecord.serializedKeySize(),
                 rawRecord.serializedValueSize(),
                 sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()),
-                sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()));
+                sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()), rawRecord.headers());
         } catch (final Exception deserializationException) {
             final DeserializationExceptionHandler.DeserializationHandlerResponse response;
             try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 0fbd6dc..7711905 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -84,7 +84,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
         }
 
         try {
-            collector.send(topic, key, value, timestamp, keySerializer, valSerializer, partitioner);
+            collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner);
         } catch (final ClassCastException e) {
             final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
             final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
index 243c41a..aa9b79d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
 
 public class StampedRecord extends Stamped<ConsumerRecord<Object, Object>> {
 
@@ -44,6 +45,10 @@ public class StampedRecord extends Stamped<ConsumerRecord<Object, Object>> {
         return value.offset();
     }
 
+    public Headers headers() {
+        return value.headers();
+    }
+
     @Override
     public String toString() {
         return value.toString() + ", timestamp = " + timestamp;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 6aeca44..14f986c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
@@ -40,6 +41,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
         public <K, V> void send(final String topic,
                                 final K key,
                                 final V value,
+                                final Headers headers,
                                 final Integer partition,
                                 final Long timestamp,
                                 final Serializer<K> keySerializer,
@@ -50,6 +52,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
         public <K, V> void send(final String topic,
                                 final K key,
                                 final V value,
+                                final Headers headers,
                                 final Long timestamp,
                                 final Serializer<K> keySerializer,
                                 final Serializer<V> valueSerializer,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 633e7ad..e2be3e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -343,7 +343,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     }
 
     private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
-        processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
+        processorContext.setRecordContext(
+            new ProcessorRecordContext(
+                record.timestamp,
+                record.offset(),
+                record.partition(),
+                record.topic(),
+                record.headers()));
         processorContext.setCurrentNode(currNode);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 525e92d..285bde5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -224,8 +224,17 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
     }
 
     private void putInternal(final Bytes key, final byte[] value) {
-        cache.put(cacheName, key, new LRUCacheEntry(value, true, context.offset(),
-              context.timestamp(), context.partition(), context.topic()));
+        cache.put(
+            cacheName,
+            key,
+            new LRUCacheEntry(
+                value,
+                context.headers(),
+                true,
+                context.offset(),
+                context.timestamp(),
+                context.partition(),
+                context.topic()));
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 1bb2ea7..c099faf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -141,8 +141,15 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
     public void put(final Windowed<Bytes> key, byte[] value) {
         validateStoreOpen();
         final Bytes binaryKey = Bytes.wrap(SessionKeySchema.toBinary(key));
-        final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(),
-                                                      key.window().end(), context.partition(), context.topic());
+        final LRUCacheEntry entry =
+            new LRUCacheEntry(
+                value,
+                context.headers(),
+                true,
+                context.offset(),
+                key.window().end(),
+                context.partition(),
+                context.topic());
         cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 7e58b68..ca24ffd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -150,8 +150,15 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
         validateStoreOpen();
         
         final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0);
-        final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(),
-                                                      timestamp, context.partition(), context.topic());
+        final LRUCacheEntry entry =
+            new LRUCacheEntry(
+                value,
+                context.headers(),
+                true,
+                context.offset(),
+                timestamp,
+                context.partition(),
+                context.topic());
         cache.put(name, cacheFunction.cacheKey(keyBytes), entry);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index af7059b..78c0331 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.processor.internals.RecordContext;
 
 /**
@@ -24,6 +25,7 @@ import org.apache.kafka.streams.processor.internals.RecordContext;
 class LRUCacheEntry implements RecordContext {
 
     public final byte[] value;
+    private final Headers headers;
     private final long offset;
     private final String topic;
     private final int partition;
@@ -33,13 +35,18 @@ class LRUCacheEntry implements RecordContext {
     private boolean isDirty;
 
     LRUCacheEntry(final byte[] value) {
-        this(value, false, -1, -1, -1, "");
+        this(value, null, false, -1, -1, -1, "");
     }
 
-    LRUCacheEntry(final byte[] value, final boolean isDirty,
-                  final long offset, final long timestamp, final int partition,
+    LRUCacheEntry(final byte[] value,
+                  final Headers headers,
+                  final boolean isDirty,
+                  final long offset,
+                  final long timestamp,
+                  final int partition,
                   final String topic) {
         this.value = value;
+        this.headers = headers;
         this.partition = partition;
         this.topic = topic;
         this.offset = offset;
@@ -78,6 +85,11 @@ class LRUCacheEntry implements RecordContext {
         return partition;
     }
 
+    @Override
+    public Headers headers() {
+        return headers;
+    }
+
     void markClean() {
         isDirty = false;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 1055df5..a8a04c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -55,7 +55,8 @@ class StoreChangeLogger<K, V> {
         if (collector != null) {
             final Serializer<K> keySerializer = serialization.keySerializer();
             final Serializer<V> valueSerializer = serialization.valueSerializer();
-            collector.send(this.topic, key, value, this.partition, context.timestamp(), keySerializer, valueSerializer);
+            // Sending null headers to changelog topics (KIP-244)
+            collector.send(this.topic, key, value, null, this.partition, context.timestamp(), keySerializer, valueSerializer);
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index d306ee4..fe897c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.requests.UpdateMetadataRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -91,6 +92,12 @@ public class IntegrationTestUtils {
         IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, time, false);
     }
 
+    public static <K, V> void produceKeyValuesSynchronously(
+        final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Headers headers, final Time time)
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, headers, time, false);
+    }
+
     /**
      * @param topic               Kafka topic to write the data records to
      * @param records             Data records to write to Kafka
@@ -102,10 +109,21 @@ public class IntegrationTestUtils {
     public static <K, V> void produceKeyValuesSynchronously(
         final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Time time, final boolean enableTransactions)
         throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, null, time, enableTransactions);
+    }
+
+    public static <K, V> void produceKeyValuesSynchronously(final String topic,
+                                                            final Collection<KeyValue<K, V>> records,
+                                                            final Properties producerConfig,
+                                                            final Headers headers,
+                                                            final Time time,
+                                                            final boolean enableTransactions)
+        throws ExecutionException, InterruptedException {
         for (final KeyValue<K, V> record : records) {
             produceKeyValuesSynchronouslyWithTimestamp(topic,
                 Collections.singleton(record),
                 producerConfig,
+                headers,
                 time.milliseconds(),
                 enableTransactions);
             time.sleep(1L);
@@ -123,20 +141,39 @@ public class IntegrationTestUtils {
     public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
                                                                          final Collection<KeyValue<K, V>> records,
                                                                          final Properties producerConfig,
+                                                                         final Headers headers,
+                                                                         final Long timestamp)
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, headers, timestamp, false);
+    }
+
+    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
+                                                                         final Collection<KeyValue<K, V>> records,
+                                                                         final Properties producerConfig,
                                                                          final Long timestamp,
                                                                          final boolean enableTransactions)
         throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions);
+    }
+
+    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
+                                                                         final Collection<KeyValue<K, V>> records,
+                                                                         final Properties producerConfig,
+                                                                         final Headers headers,
+                                                                         final Long timestamp,
+                                                                         final boolean enabledTransactions)
+        throws ExecutionException, InterruptedException {
         try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
-            if (enableTransactions) {
+            if (enabledTransactions) {
                 producer.initTransactions();
                 producer.beginTransaction();
             }
             for (final KeyValue<K, V> record : records) {
                 final Future<RecordMetadata> f = producer.send(
-                    new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
+                    new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers));
                 f.get();
             }
-            if (enableTransactions) {
+            if (enabledTransactions) {
                 producer.commitTransaction();
             }
             producer.flush();
@@ -194,6 +231,12 @@ public class IntegrationTestUtils {
         }
     }
 
+    public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig,
+                                                                                final String topic,
+                                                                                final int expectedNumRecords) throws InterruptedException {
+        return waitUntilMinRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
+    }
+
     public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
                                                                                   final String topic,
                                                                                   final int expectedNumRecords) throws InterruptedException {
@@ -232,6 +275,27 @@ public class IntegrationTestUtils {
         return accumData;
     }
 
+    public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig,
+                                                                                final String topic,
+                                                                                final int expectedNumRecords,
+                                                                                final long waitTime) throws InterruptedException {
+        final List<ConsumerRecord<K, V>> accumData = new ArrayList<>();
+        try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+            final TestCondition valuesRead = new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    final List<ConsumerRecord<K, V>> readData =
+                        readRecords(topic, consumer, waitTime, expectedNumRecords);
+                    accumData.addAll(readData);
+                    return accumData.size() >= expectedNumRecords;
+                }
+            };
+            final String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
+            TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+        }
+        return accumData;
+    }
+
     public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig,
                                                                 final String topic,
                                                                 final int expectedNumRecords) throws InterruptedException {
@@ -380,21 +444,33 @@ public class IntegrationTestUtils {
                                                              final Consumer<K, V> consumer,
                                                              final long waitTime,
                                                              final int maxMessages) {
-        final List<KeyValue<K, V>> consumedValues;
+        final List<KeyValue<K, V>> consumedValues = new ArrayList<>();
+        final List<ConsumerRecord<K, V>> records = readRecords(topic, consumer, waitTime, maxMessages);
+        for (final ConsumerRecord<K, V> record : records) {
+            consumedValues.add(new KeyValue<>(record.key(), record.value()));
+        }
+        return consumedValues;
+    }
+
+    private static <K, V> List<ConsumerRecord<K, V>> readRecords(final String topic,
+                                                                 final Consumer<K, V> consumer,
+                                                                 final long waitTime,
+                                                                 final int maxMessages) {
+        final List<ConsumerRecord<K, V>> consumerRecords;
         consumer.subscribe(Collections.singletonList(topic));
         final int pollIntervalMs = 100;
-        consumedValues = new ArrayList<>();
+        consumerRecords = new ArrayList<>();
         int totalPollTimeMs = 0;
         while (totalPollTimeMs < waitTime &&
-            continueConsuming(consumedValues.size(), maxMessages)) {
+            continueConsuming(consumerRecords.size(), maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
             final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
 
             for (final ConsumerRecord<K, V> record : records) {
-                consumedValues.add(new KeyValue<>(record.key(), record.value()));
+                consumerRecords.add(record);
             }
         }
-        return consumedValues;
+        return consumerRecords;
     }
 
     private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index e5acd01..1517f0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -531,7 +531,7 @@ public class KGroupedStreamImplTest {
         driver.pipeInput(recordFactory.create(TOPIC, "1", "D"));
         driver.pipeInput(recordFactory.create(TOPIC, "3", "E"));
         driver.pipeInput(recordFactory.create(TOPIC, "3", "F"));
-        driver.pipeInput(recordFactory.create(TOPIC, "3", null));
+        driver.pipeInput(recordFactory.create(TOPIC, "3", (String) null));
     }
 
     private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index ae76362..d5c5a54 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -106,7 +106,7 @@ public class KStreamGlobalKTableJoinTest {
     private void pushNullValueToGlobalTable(final int messageCount) {
         final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
-            driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], null));
+            driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], (String) null));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index 95fe8b9..248c3ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -108,7 +108,7 @@ public class KStreamGlobalKTableLeftJoinTest {
     private void pushNullValueToGlobalTable(final int messageCount) {
         final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
-            driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], null));
+            driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], (String) null));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 43eaf3b..6ffce04 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -97,7 +97,7 @@ public class KStreamKTableJoinTest {
 
     private void pushNullValueToTable() {
         for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], null));
+            driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], (String) null));
         }
     }
 
@@ -205,7 +205,7 @@ public class KStreamKTableJoinTest {
     @Test
     public void shouldLogAndMeterWhenSkippingNullLeftValue() {
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        driver.pipeInput(recordFactory.create(streamTopic, 1, null));
+        driver.pipeInput(recordFactory.create(streamTopic, 1, (String) null));
         LogCaptureAppender.unregister(appender);
 
         assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 251e58e..1c3e027 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -93,7 +93,7 @@ public class KStreamKTableLeftJoinTest {
 
     private void pushNullValueToTable(final int messageCount) {
         for (int i = 0; i < messageCount; i++) {
-            driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], null));
+            driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], (String) null));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index afc9be1..6b5e577 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -308,7 +308,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     public void shouldLogAndMeterWhenSkippingNullKey() {
         initStore(false);
         processor.init(context);
-        context.setRecordContext(new ProcessorRecordContext(-1, -2, -3, "topic"));
+        context.setRecordContext(new ProcessorRecordContext(-1, -2, -3, "topic", null));
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
         processor.process(null, "1");
         LogCaptureAppender.unregister(appender);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 0c56531..cd29b50 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -361,7 +361,7 @@ public class KTableKTableInnerJoinTest {
         ).get();
 
         final MockProcessorContext context = new MockProcessorContext();
-        context.setRecordMetadata("left", -1, -2, -3);
+        context.setRecordMetadata("left", -1, -2, null, -3);
         join.init(context);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
         join.process(null, new Change<>("new", "old"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index ef64f75..9be6189 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -424,7 +424,7 @@ public class KTableKTableLeftJoinTest {
         ).get();
 
         final MockProcessorContext context = new MockProcessorContext();
-        context.setRecordMetadata("left", -1, -2, -3);
+        context.setRecordMetadata("left", -1, -2, null, -3);
         join.init(context);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
         join.process(null, new Change<>("new", "old"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index e897ec3..3995fcf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -366,7 +366,7 @@ public class KTableKTableOuterJoinTest {
         ).get();
 
         final MockProcessorContext context = new MockProcessorContext();
-        context.setRecordMetadata("left", -1, -2, -3);
+        context.setRecordMetadata("left", -1, -2, null, -3);
         join.init(context);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
         join.process(null, new Change<>("new", "old"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
index d7411cb..d4805a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
@@ -41,7 +41,7 @@ public class KTableKTableRightJoinTest {
         ).get();
 
         final MockProcessorContext context = new MockProcessorContext();
-        context.setRecordMetadata("left", -1, -2, -3);
+        context.setRecordMetadata("left", -1, -2, null, -3);
         join.init(context);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
         join.process(null, new Change<>("new", "old"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 7c12dad..be20c86 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -325,7 +325,7 @@ public class KTableTransformValuesTest {
 
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 0L));
-        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", null, 0L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", (String) null, 0L));
 
         assertThat(output(), hasItems("A:A->a!", "B:B->b!", "D:D->null!"));
         assertThat("Store should not be materialized", driver.getKeyValueStore(QUERYABLE_NAME), is(nullValue()));
@@ -349,7 +349,7 @@ public class KTableTransformValuesTest {
 
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 0L));
-        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "C", null, 0L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "C", (String) null, 0L));
 
         assertThat(output(), hasItems("A:A->a!", "B:B->b!", "C:C->null!"));
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 86806b2..9aaa8a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
@@ -44,7 +48,8 @@ public class AbstractProcessorContextTest {
     private final MockStreamsMetrics metrics = new MockStreamsMetrics(new Metrics());
     private final AbstractProcessorContext context = new TestProcessorContext(metrics);
     private final MockStateStore stateStore = new MockStateStore("store", false);
-    private final RecordContext recordContext = new RecordContextStub(10, System.currentTimeMillis(), 1, "foo");
+    private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
+    private final RecordContext recordContext = new RecordContextStub(10, System.currentTimeMillis(), 1, "foo", headers);
 
     @Before
     public void before() {
@@ -141,6 +146,27 @@ public class AbstractProcessorContextTest {
         assertThat(context.timestamp(), equalTo(recordContext.timestamp()));
     }
 
+    @Test
+    public void shouldReturnHeadersFromRecordContext() {
+        assertThat(context.headers(), equalTo(recordContext.headers()));
+    }
+
+    @Test
+    public void shouldReturnNullIfHeadersAreNotSet() {
+        context.setRecordContext(new RecordContextStub(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
+        assertThat(context.headers(), nullValue());
+    }
+
+    @Test
+    public void shouldThrowIllegalStateExceptionOnHeadersIfNoRecordContext() {
+        context.setRecordContext(null);
+        try {
+            context.headers();
+        } catch (final IllegalStateException e) {
+            // pass
+        }
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void appConfigsShouldReturnParsedValues() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index e247647..033c0e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -18,6 +18,10 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
@@ -64,6 +68,9 @@ public class ProcessorTopologyTest {
     private static final String OUTPUT_TOPIC_2 = "output-topic-2";
     private static final String THROUGH_TOPIC_1 = "through-topic-1";
 
+    private static final Header HEADER = new RecordHeader("key", "value".getBytes());
+    private static final Headers HEADERS = new RecordHeaders(new Header[]{HEADER});
+
     private final TopologyWrapper topology = new TopologyWrapper();
     private final MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER, 0L);
@@ -71,6 +78,7 @@ public class ProcessorTopologyTest {
     private TopologyTestDriver driver;
     private final Properties props = new Properties();
 
+
     @Before
     public void setup() {
         // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
@@ -338,10 +346,33 @@ public class ProcessorTopologyTest {
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 40L);
     }
 
+    @Test
+    public void shouldConsiderHeaders() {
+        final int partition = 10;
+        driver = new TopologyTestDriver(createSimpleTopology(partition), props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", HEADERS, 10L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", HEADERS, 20L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", HEADERS, 30L));
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", HEADERS, partition, 10L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", HEADERS, partition, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, partition, 30L);
+    }
+
+    @Test
+    public void shouldAddHeaders() {
+        driver = new TopologyTestDriver(createAddHeaderTopology(), props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", HEADERS, 10L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", HEADERS, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, 30L);
+    }
+
     private void assertNextOutputRecord(final String topic,
                                         final String key,
                                         final String value) {
-        assertNextOutputRecord(topic, key, value, null, 0L);
+        assertNextOutputRecord(topic, key, value, (Integer) null, 0L);
     }
 
     private void assertNextOutputRecord(final String topic,
@@ -354,6 +385,23 @@ public class ProcessorTopologyTest {
     private void assertNextOutputRecord(final String topic,
                                         final String key,
                                         final String value,
+                                        final Headers headers,
+                                        final Long timestamp) {
+        assertNextOutputRecord(topic, key, value, headers, null, timestamp);
+    }
+
+    private void assertNextOutputRecord(final String topic,
+                                        final String key,
+                                        final String value,
+                                        final Integer partition,
+                                        final Long timestamp) {
+        assertNextOutputRecord(topic, key, value, new RecordHeaders(), partition, timestamp);
+    }
+
+    private void assertNextOutputRecord(final String topic,
+                                        final String key,
+                                        final String value,
+                                        final Headers headers,
                                         final Integer partition,
                                         final Long timestamp) {
         final ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
@@ -362,6 +410,7 @@ public class ProcessorTopologyTest {
         assertEquals(value, record.value());
         assertEquals(partition, record.partition());
         assertEquals(timestamp, record.timestamp());
+        assertEquals(headers, record.headers());
     }
 
     private void assertNoOutputRecord(final String topic) {
@@ -458,6 +507,12 @@ public class ProcessorTopologyTest {
                 .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2");
     }
 
+    private Topology createAddHeaderTopology() {
+        return topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+                .addProcessor("processor-1", define(new AddHeaderProcessor()), "source-1")
+                .addSink("sink-1", OUTPUT_TOPIC_1, "processor-1");
+    }
+
     /**
      * A processor that simply forwards all messages to all children.
      */
@@ -478,6 +533,14 @@ public class ProcessorTopologyTest {
         }
     }
 
+    protected static class AddHeaderProcessor extends AbstractProcessor<String, String> {
+        @Override
+        public void process(final String key, final String value) {
+            context().headers().add(HEADER);
+            context().forward(key, value);
+        }
+    }
+
     /**
      * A processor that removes custom timestamp information from messages and forwards modified messages to each child.
      * A message contains custom timestamp information if the value is in ".*@[0-9]+" format.
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 8a2f171..e439372 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -27,6 +27,10 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Sum;
@@ -85,14 +89,16 @@ public class RecordCollectorTest {
             new Metrics().sensor("skipped-records")
         );
 
-        collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
-        collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
-        collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
+        final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
 
-        collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
-        collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
+        collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer);
+        collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer);
+        collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer);
 
-        collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);
+        collector.send("topic1", "999", "0", headers, 1, null, stringSerializer, stringSerializer);
+        collector.send("topic1", "999", "0", headers, 1, null, stringSerializer, stringSerializer);
+
+        collector.send("topic1", "999", "0", headers, 2, null, stringSerializer, stringSerializer);
 
         final Map<TopicPartition, Long> offsets = collector.offsets();
 
@@ -101,9 +107,9 @@ public class RecordCollectorTest {
         assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
 
         // ignore StreamPartitioner
-        collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
-        collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
-        collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);
+        collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer);
+        collector.send("topic1", "999", "0", null, 1, null, stringSerializer, stringSerializer);
+        collector.send("topic1", "999", "0", headers, 2, null, stringSerializer, stringSerializer);
 
         assertEquals((Long) 3L, offsets.get(new TopicPartition("topic1", 0)));
         assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
@@ -121,17 +127,19 @@ public class RecordCollectorTest {
             new Metrics().sensor("skipped-records")
         );
 
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "27", "0", null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "81", "0", null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "243", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
+
+        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "9", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "27", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "81", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "243", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
 
-        collector.send("topic1", "28", "0", null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "82", "0", null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "244", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "28", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "82", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "244", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner);
 
-        collector.send("topic1", "245", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "245", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
 
         final Map<TopicPartition, Long> offsets = collector.offsets();
 
@@ -155,7 +163,7 @@ public class RecordCollectorTest {
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
     }
 
     @SuppressWarnings("unchecked")
@@ -174,10 +182,10 @@ public class RecordCollectorTest {
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
 
         try {
-            collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+            collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
             fail("Should have thrown StreamsException");
         } catch (final StreamsException expected) { /* ok */ }
     }
@@ -198,9 +206,9 @@ public class RecordCollectorTest {
             new AlwaysContinueProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
 
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
     }
 
     @SuppressWarnings("unchecked")
@@ -223,7 +231,7 @@ public class RecordCollectorTest {
             logContext,
             new AlwaysContinueProductionExceptionHandler(),
             sensor);
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
         assertEquals(1.0, metrics.metrics().get(metricName).metricValue());
         assertTrue(logCaptureAppender.getMessages().contains("test Error sending records (key=[3] value=[0] timestamp=[null]) to topic=[topic1] and partition=[0]; The exception handler chose to CONTINUE processing in spite of this error."));
         LogCaptureAppender.unregister(logCaptureAppender);
@@ -245,7 +253,7 @@ public class RecordCollectorTest {
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
 
         try {
             collector.flush();
@@ -269,7 +277,7 @@ public class RecordCollectorTest {
             new AlwaysContinueProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
 
         collector.flush();
     }
@@ -290,7 +298,7 @@ public class RecordCollectorTest {
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
 
         try {
             collector.close();
@@ -314,7 +322,7 @@ public class RecordCollectorTest {
             new AlwaysContinueProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
 
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
 
         collector.close();
     }
@@ -334,7 +342,7 @@ public class RecordCollectorTest {
             logContext,
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
     }
 
     @SuppressWarnings("unchecked")
@@ -352,6 +360,6 @@ public class RecordCollectorTest {
             logContext,
             new AlwaysContinueProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
-        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
index 0af5e17..7afd51e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
@@ -16,25 +16,37 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Headers;
+
 public class RecordContextStub implements RecordContext {
 
     private final long offset;
     private long timestamp;
     private final int partition;
     private final String topic;
+    private final Headers headers;
 
     public RecordContextStub() {
-        this(-1, -1, -1, "");
+        this(-1, -1, -1, "", null);
     }
 
     public RecordContextStub(final long offset,
                              final long timestamp,
                              final int partition,
-                             final String topic) {
+                             final String topic,
+                             final Headers headers) {
         this.offset = offset;
         this.timestamp = timestamp;
         this.partition = partition;
         this.topic = topic;
+        this.headers = headers;
+    }
+
+    public RecordContextStub(final long offset,
+                             final long timestamp,
+                             final int partition,
+                             final String topic) {
+        this(offset, timestamp, partition, topic, null);
     }
 
     @Override
@@ -61,4 +73,9 @@ public class RecordContextStub implements RecordContext {
     public int partition() {
         return partition;
     }
+
+    @Override
+    public Headers headers() {
+        return headers;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index de8e17b..36988c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -17,7 +17,10 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.LogContext;
@@ -29,16 +32,18 @@ import static org.junit.Assert.assertEquals;
 
 public class RecordDeserializerTest {
 
+    private final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())});
     private final ConsumerRecord<byte[], byte[]> rawRecord = new ConsumerRecord<>("topic",
         1,
         1,
         10,
         TimestampType.LOG_APPEND_TIME,
-        5,
+        5L,
         3,
         5,
         new byte[0],
-        new byte[0]);
+        new byte[0],
+        headers);
 
 
     @SuppressWarnings("deprecation")
@@ -63,6 +68,7 @@ public class RecordDeserializerTest {
         assertEquals("value", record.value());
         assertEquals(rawRecord.timestamp(), record.timestamp());
         assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+        assertEquals(rawRecord.headers(), record.headers());
     }
 
     static class TheSourceNode extends SourceNode<Object, Object> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 4e80fa7..4b9e6a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -198,6 +199,7 @@ public class KeyValueStoreTestDriver<K, V> {
             public <K1, V1> void send(final String topic,
                                       final K1 key,
                                       final V1 value,
+                                      final Headers headers,
                                       final Integer partition,
                                       final Long timestamp,
                                       final Serializer<K1> keySerializer,
@@ -214,6 +216,7 @@ public class KeyValueStoreTestDriver<K, V> {
             public <K1, V1> void send(final String topic,
                                       final K1 key,
                                       final V1 value,
+                                      final Headers headers,
                                       final Long timestamp,
                                       final Serializer<K1> keySerializer,
                                       final Serializer<V1> valueSerializer,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 3e0241e..2f6aac7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -75,7 +75,8 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext(null, null, null, (RecordCollector) null, cache);
         topic = "topic";
-        context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic));
+        context.setRecordContext(
+            new ProcessorRecordContext(10, 0, 0, topic, null));
         store.init(context, null);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index b77f4e9..baa9ee4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -76,7 +76,7 @@ public class CachingSessionStoreTest {
                                                  );
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
-        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic"));
+        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic", null));
         cachingStore.init(context, cachingStore);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index a87b2e4..b8808ca 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -81,7 +81,7 @@ public class CachingWindowStoreTest {
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         topic = "topic";
         context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
-        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic));
+        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic, null));
         cachingStore.init(context, cachingStore);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 5bb0de7..7f5a08e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
@@ -57,6 +58,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
             public <K, V> void send(final String topic,
                                     K key,
                                     V value,
+                                    Headers headers,
                                     Integer partition,
                                     Long timestamp,
                                     Serializer<K> keySerializer,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index a658186..edcaa05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -49,6 +50,7 @@ public class ChangeLoggingSessionBytesStoreTest {
         public <K, V> void send(final String topic,
                                 K key,
                                 V value,
+                                Headers headers,
                                 Integer partition,
                                 Long timestamp,
                                 Serializer<K> keySerializer,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index 956172e..e56887e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -47,6 +48,7 @@ public class ChangeLoggingWindowBytesStoreTest {
         public <K, V> void send(final String topic,
                                 K key,
                                 V value,
+                                Headers headers,
                                 Integer partition,
                                 Long timestamp,
                                 Serializer<K> keySerializer,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 9ae0feb..92653ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
@@ -42,6 +46,7 @@ import static org.junit.Assert.assertSame;
 
 public class NamedCacheTest {
 
+    private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
     private NamedCache cache;
     private StreamsMetricsImpl metrics;
     private final String taskIDString = "0.0";
@@ -64,7 +69,7 @@ public class NamedCacheTest {
         for (int i = 0; i < toInsert.size(); i++) {
             byte[] key = toInsert.get(i).key.getBytes();
             byte[] value = toInsert.get(i).value.getBytes();
-            cache.put(Bytes.wrap(key), new LRUCacheEntry(value, true, 1, 1, 1, ""));
+            cache.put(Bytes.wrap(key), new LRUCacheEntry(value, null, true, 1, 1, 1, ""));
             LRUCacheEntry head = cache.first();
             LRUCacheEntry tail = cache.last();
             assertEquals(new String(head.value), toInsert.get(i).value);
@@ -170,9 +175,9 @@ public class NamedCacheTest {
     @Test
     public void shouldFlushDirtEntriesOnEviction() {
         final List<ThreadCache.DirtyEntry> flushed = new ArrayList<>();
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
         cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
-        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, headers, true, 0, 0, 0, ""));
 
         cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -185,6 +190,7 @@ public class NamedCacheTest {
 
         assertEquals(2, flushed.size());
         assertEquals(Bytes.wrap(new byte[] {0}), flushed.get(0).key());
+        assertEquals(headers, flushed.get(0).recordContext().headers());
         assertArrayEquals(new byte[] {10}, flushed.get(0).newValue());
         assertEquals(Bytes.wrap(new byte[] {2}), flushed.get(1).key());
         assertArrayEquals(new byte[] {30}, flushed.get(1).newValue());
@@ -193,9 +199,9 @@ public class NamedCacheTest {
 
     @Test
     public void shouldGetRangeIteratorOverKeys() {
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
         cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
-        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, ""));
 
         final Iterator<Bytes> iterator = cache.keyRange(Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{2}));
         assertEquals(Bytes.wrap(new byte[]{1}), iterator.next());
@@ -205,9 +211,9 @@ public class NamedCacheTest {
 
     @Test
     public void shouldGetIteratorOverAllKeys() {
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
         cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
-        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, ""));
 
         final Iterator<Bytes> iterator = cache.allKeys();
         assertEquals(Bytes.wrap(new byte[]{0}), iterator.next());
@@ -223,8 +229,8 @@ public class NamedCacheTest {
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowIllegalStateExceptionWhenTryingToOverwriteDirtyEntryWithCleanEntry() {
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, ""));
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, false, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, null, false, 0, 0, 0, ""));
     }
 
     @Test
@@ -235,8 +241,8 @@ public class NamedCacheTest {
                 // no-op
             }
         });
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, true, 0, 0, 0, ""));
-        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, headers, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, null, true, 0, 0, 0, ""));
         cache.flush();
         assertEquals(1, cache.size());
         assertNotNull(cache.get(Bytes.wrap(new byte[]{1})));
@@ -244,7 +250,7 @@ public class NamedCacheTest {
 
     @Test
     public void shouldBeReentrantAndNotBreakLRU() {
-        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 0, 0, "");
+        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, null, true, 0, 0, 0, "");
         final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
         cache.put(Bytes.wrap(new byte[]{0}), dirty);
         cache.put(Bytes.wrap(new byte[]{1}), clean);
@@ -290,7 +296,7 @@ public class NamedCacheTest {
 
     @Test
     public void shouldNotThrowIllegalArgumentAfterEvictingDirtyRecordAndThenPuttingNewRecordWithSameKey() {
-        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 0, 0, "");
+        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, null, true, 0, 0, 0, "");
         final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
         final Bytes key = Bytes.wrap(new byte[] {3});
         cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 92edbd8..be4ede8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
@@ -90,6 +91,7 @@ public class RocksDBWindowStoreTest {
         public <K1, V1> void send(final String topic,
                                   final K1 key,
                                   final V1 value,
+                                  final Headers headers,
                                   final Integer partition,
                                   final Long timestamp,
                                   final Serializer<K1> keySerializer,
@@ -160,7 +162,7 @@ public class RocksDBWindowStoreTest {
     }
 
     private ProcessorRecordContext createRecordContext(final long time) {
-        return new ProcessorRecordContext(time, 0, 0, "topic");
+        return new ProcessorRecordContext(time, 0, 0, "topic", null);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 6bacd91..5afe14f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.state.internals;
 
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
@@ -38,6 +40,7 @@ public class StoreChangeLoggerTest {
     private final String topic = "topic";
 
     private final Map<Integer, String> logged = new HashMap<>();
+    private final Map<Integer, Headers> loggedHeaders = new HashMap<>();
 
     private final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
         new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")) {
@@ -45,17 +48,20 @@ public class StoreChangeLoggerTest {
             public <K1, V1> void send(final String topic,
                                       final K1 key,
                                       final V1 value,
+                                      final Headers headers,
                                       final Integer partition,
                                       final Long timestamp,
                                       final Serializer<K1> keySerializer,
                                       final Serializer<V1> valueSerializer) {
                 logged.put((Integer) key, (String) value);
+                loggedHeaders.put((Integer) key, headers);
             }
 
             @Override
             public <K1, V1> void send(final String topic,
                                       final K1 key,
                                       final V1 value,
+                                      final Headers headers,
                                       final Long timestamp,
                                       final Serializer<K1> keySerializer,
                                       final Serializer<V1> valueSerializer,
@@ -80,6 +86,13 @@ public class StoreChangeLoggerTest {
 
         changeLogger.logChange(0, null);
         assertNull(logged.get(0));
+    }
+
+    @Test
+    public void shouldNotSendRecordHeadersToChangelogTopic() {
+        context.headers().add(new RecordHeader("key", "value".getBytes()));
+        changeLogger.logChange(0, "zero");
 
+        assertNull(loggedHeaders.get(0));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 164e71e..d100ae5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -59,7 +59,7 @@ public class ThreadCacheTest {
         for (KeyValue<String, String> kvToInsert : toInsert) {
             Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
             byte[] value = kvToInsert.value.getBytes();
-            cache.put(namespace, key, new LRUCacheEntry(value, true, 1L, 1L, 1, ""));
+            cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1L, 1L, 1, ""));
         }
 
         for (KeyValue<String, String> kvToInsert : toInsert) {
@@ -89,7 +89,7 @@ public class ThreadCacheTest {
             String keyStr = "K" + i;
             Bytes key = Bytes.wrap(keyStr.getBytes());
             byte[] value = new byte[valueSizeBytes];
-            cache.put(namespace, key, new LRUCacheEntry(value, true, 1L, 1L, 1, ""));
+            cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1L, 1L, 1, ""));
         }
 
 
@@ -171,7 +171,7 @@ public class ThreadCacheTest {
         for (KeyValue<String, String> kvToInsert : toInsert) {
             final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
             final byte[] value = kvToInsert.value.getBytes();
-            cache.put(namespace, key, new LRUCacheEntry(value, true, 1, 1, 1, ""));
+            cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1, 1, 1, ""));
         }
 
         for (int i = 0; i < expected.size(); i++) {
@@ -520,7 +520,7 @@ public class ThreadCacheTest {
     }
 
     private LRUCacheEntry dirtyEntry(final byte[] key) {
-        return new LRUCacheEntry(key, true, -1, -1, -1, "");
+        return new LRUCacheEntry(key, null, true, -1, -1, -1, "");
     }
 
     private LRUCacheEntry cleanEntry(final byte[] key) {
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index eb72e13..e5571eb 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
@@ -242,7 +244,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
     // and also not throwing exceptions if record context is not available.
     public void setTime(final long timestamp) {
         if (recordContext != null) {
-            recordContext = new ProcessorRecordContext(timestamp, recordContext.offset(), recordContext.partition(), recordContext.topic());
+            recordContext = new ProcessorRecordContext(timestamp, recordContext.offset(), recordContext.partition(), recordContext.topic(), recordContext.headers());
         }
         this.timestamp = timestamp;
     }
@@ -279,6 +281,14 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
         return recordContext.offset();
     }
 
+    @Override
+    public Headers headers() {
+        if (recordContext == null) {
+            return new RecordHeaders();
+        }
+        return recordContext.headers();
+    }
+
     Map<String, StateStore> allStateStores() {
         return Collections.unmodifiableMap(storeMap);
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 2c3461a..698cdc7 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -90,7 +91,7 @@ public class KStreamTestDriver extends ExternalResource {
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic", null));
 
         // init global topology first as it will add stores to the
         // store map that are required for joins etc.
@@ -229,7 +230,7 @@ public class KStreamTestDriver extends ExternalResource {
     }
 
     private ProcessorRecordContext createRecordContext(final String topicName, final long timestamp) {
-        return new ProcessorRecordContext(timestamp, -1, -1, topicName);
+        return new ProcessorRecordContext(timestamp, -1, -1, topicName, null);
     }
 
     private class MockRecordCollector extends RecordCollectorImpl {
@@ -241,6 +242,7 @@ public class KStreamTestDriver extends ExternalResource {
         public <K, V> void send(final String topic,
                                 final K key,
                                 final V value,
+                                final Headers headers,
                                 final Long timestamp,
                                 final Serializer<K> keySerializer,
                                 final Serializer<V> valueSerializer,
@@ -255,6 +257,7 @@ public class KStreamTestDriver extends ExternalResource {
         public <K, V> void send(final String topic,
                                 final K key,
                                 final V value,
+                                final Headers headers,
                                 final Integer partition,
                                 final Long timestamp,
                                 final Serializer<K> keySerializer,
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
index 66271a0..893d356 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -30,6 +31,7 @@ public class NoOpRecordCollector implements RecordCollector {
     public <K, V> void send(final String topic,
                             final K key,
                             final V value,
+                            final Headers headers,
                             final Integer partition,
                             final Long timestamp,
                             final Serializer<K> keySerializer,
@@ -39,6 +41,7 @@ public class NoOpRecordCollector implements RecordCollector {
     public <K, V> void send(final String topic,
                             final K key,
                             final V value,
+                            final Headers headers,
                             final Long timestamp,
                             final Serializer<K> keySerializer,
                             final Serializer<V> valueSerializer,
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index c237ca7..e46ec6a 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -227,7 +227,7 @@ public class TopologyTestDriver implements Closeable {
      * @param config the configuration for the topology
      */
     TopologyTestDriver(final InternalTopologyBuilder builder,
-                              final Properties config) {
+                       final Properties config) {
         this(builder, config,  System.currentTimeMillis());
 
     }
@@ -382,14 +382,16 @@ public class TopologyTestDriver implements Closeable {
                 offset,
                 consumerRecord.timestamp(),
                 consumerRecord.timestampType(),
-                ConsumerRecord.NULL_CHECKSUM,
+                (long) ConsumerRecord.NULL_CHECKSUM,
                 consumerRecord.serializedKeySize(),
                 consumerRecord.serializedValueSize(),
                 consumerRecord.key(),
-                consumerRecord.value())));
+                consumerRecord.value(),
+                consumerRecord.headers())));
 
             // Process the record ...
-            ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(consumerRecord.timestamp(), offset, topicPartition.partition(), topicName));
+            ((InternalProcessorContext) task.context()).setRecordContext(
+                    new ProcessorRecordContext(consumerRecord.timestamp(), offset, topicPartition.partition(), topicName, consumerRecord.headers()));
             task.process();
             task.maybePunctuateStreamTime();
             task.commit();
@@ -407,11 +409,12 @@ public class TopologyTestDriver implements Closeable {
                 offset,
                 consumerRecord.timestamp(),
                 consumerRecord.timestampType(),
-                ConsumerRecord.NULL_CHECKSUM,
+                (long) ConsumerRecord.NULL_CHECKSUM,
                 consumerRecord.serializedKeySize(),
                 consumerRecord.serializedValueSize(),
                 consumerRecord.key(),
-                consumerRecord.value()));
+                consumerRecord.value(),
+                consumerRecord.headers()));
             globalStateTask.flushState();
         }
     }
@@ -467,7 +470,8 @@ public class TopologyTestDriver implements Closeable {
                     serializedKey == null ? 0 : serializedKey.length,
                     serializedValue == null ? 0 : serializedValue.length,
                     serializedKey,
-                    serializedValue));
+                    serializedValue,
+                    record.headers()));
             }
         }
     }
@@ -536,7 +540,7 @@ public class TopologyTestDriver implements Closeable {
         }
         final K key = keyDeserializer.deserialize(record.topic(), record.key());
         final V value = valueDeserializer.deserialize(record.topic(), record.value());
-        return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value);
+        return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value, record.headers());
     }
 
     /**
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 3e29cde..b14a791 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
@@ -61,6 +62,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     private String topic;
     private Integer partition;
     private Long offset;
+    private Headers headers;
     private Long timestamp;
 
     // mocks ================================================
@@ -250,10 +252,11 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      * @param offset    A record offset
      * @param timestamp A record timestamp
      */
-    public void setRecordMetadata(final String topic, final int partition, final long offset, final long timestamp) {
+    public void setRecordMetadata(final String topic, final int partition, final long offset, final Headers headers, final long timestamp) {
         this.topic = topic;
         this.partition = partition;
         this.offset = offset;
+        this.headers = headers;
         this.timestamp = timestamp;
     }
 
@@ -289,6 +292,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         this.offset = offset;
     }
 
+    public void setHeaders(final Headers headers) {
+        this.headers = headers;
+    }
 
     /**
      * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
@@ -325,6 +331,11 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     }
 
     @Override
+    public Headers headers() {
+        return headers;
+    }
+
+    @Override
     public long timestamp() {
         if (timestamp == null) {
             throw new IllegalStateException("Timestamp must be set before use via setRecordMetadata() or setTimestamp().");
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
index b0ccd61..507249d 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -18,6 +18,8 @@ package org.apache.kafka.streams.test;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KeyValue;
@@ -153,20 +155,23 @@ public class ConsumerRecordFactory<K, V> {
     }
 
     /**
-     * Create a {@link ConsumerRecord} with the given topic name, key, value, and timestamp.
+     * Create a {@link ConsumerRecord} with the given topic name, key, value, headers, and timestamp.
      * Does not auto advance internally tracked time.
      *
      * @param topicName the topic name
      * @param key the record key
      * @param value the record value
+     * @param headers the record headers
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final K key,
                                                  final V value,
+                                                 final Headers headers,
                                                  final long timestampMs) {
         Objects.requireNonNull(topicName, "topicName cannot be null.");
+        Objects.requireNonNull(headers, "headers cannot be null.");
         final byte[] serializedKey = keySerializer.serialize(topicName, key);
         final byte[] serializedValue = valueSerializer.serialize(topicName, value);
         return new ConsumerRecord<>(
@@ -175,11 +180,29 @@ public class ConsumerRecordFactory<K, V> {
             -1L,
             timestampMs,
             TimestampType.CREATE_TIME,
-            ConsumerRecord.NULL_CHECKSUM,
+            (long) ConsumerRecord.NULL_CHECKSUM,
             serializedKey == null ? 0 : serializedKey.length,
             serializedValue == null ? 0 : serializedValue.length,
             serializedKey,
-            serializedValue);
+            serializedValue,
+            headers);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with the given topic name and given topic, key, value, and timestamp.
+     * Does not auto advance internally tracked time.
+     *
+     * @param topicName the topic name
+     * @param key the record key
+     * @param value the record value
+     * @param timestampMs the record timestamp
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final String topicName,
+                                                 final K key,
+                                                 final V value,
+                                                 final long timestampMs) {
+        return create(topicName, key, value, new RecordHeaders(), timestampMs);
     }
 
     /**
@@ -194,16 +217,33 @@ public class ConsumerRecordFactory<K, V> {
     public ConsumerRecord<byte[], byte[]> create(final K key,
                                                  final V value,
                                                  final long timestampMs) {
+        return create(key, value, new RecordHeaders(), timestampMs);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with default topic name and given key, value, headers, and timestamp.
+     * Does not auto advance internally tracked time.
+     *
+     * @param key the record key
+     * @param value the record value
+     * @param headers the record headers
+     * @param timestampMs the record timestamp
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final K key,
+                                                 final V value,
+                                                 final Headers headers,
+                                                 final long timestampMs) {
         if (topicName == null) {
             throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
                 "Use #create(String topicName, K key, V value, long timestampMs) instead.");
         }
-        return create(topicName, key, value, timestampMs);
+        return create(topicName, key, value, headers, timestampMs);
     }
 
     /**
      * Create a {@link ConsumerRecord} with the given topic name, key, and value.
-     * The timestamp will be generated from the constructor provided and time will auto advance.
+     * The timestamp will be generated based on the constructor provided start time and time will auto advance.
      *
      * @param topicName the topic name
      * @param key the record key
@@ -215,12 +255,31 @@ public class ConsumerRecordFactory<K, V> {
                                                  final V value) {
         final long timestamp = timeMs;
         timeMs += advanceMs;
-        return create(topicName, key, value, timestamp);
+        return create(topicName, key, value, new RecordHeaders(), timestamp);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with the given topic name, key, value, and headers.
+     * The timestamp will be generated based on the constructor provided start time and time will auto advance.
+     *
+     * @param topicName the topic name
+     * @param key the record key
+     * @param value the record value
+     * @param headers the record headers
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final String topicName,
+                                                 final K key,
+                                                 final V value,
+                                                 final Headers headers) {
+        final long timestamp = timeMs;
+        timeMs += advanceMs;
+        return create(topicName, key, value, headers, timestamp);
     }
 
     /**
      * Create a {@link ConsumerRecord} with default topic name and given key and value.
-     * The timestamp will be generated from the constructor provided and time will auto advance.
+     * The timestamp will be generated based on the constructor provided start time and time will auto advance.
      *
      * @param key the record key
      * @param value the record value
@@ -228,46 +287,110 @@ public class ConsumerRecordFactory<K, V> {
      */
     public ConsumerRecord<byte[], byte[]> create(final K key,
                                                  final V value) {
+        return create(key, value, new RecordHeaders());
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with default topic name and given key, value, and headers.
+     * The timestamp will be generated based on the constructor provided start time and time will auto advance.
+     *
+     * @param key the record key
+     * @param value the record value
+     * @param headers the record headers
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final K key,
+                                                 final V value,
+                                                 final Headers headers) {
         if (topicName == null) {
             throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
                 "Use #create(String topicName, K key, V value) instead.");
         }
-        return create(topicName, key, value);
+        return create(topicName, key, value, headers);
     }
 
     /**
      * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, and timestamp.
+     * Does not auto advance internally tracked time.
+     *
+     * @param topicName the topic name
+     * @param value the record value
+     * @param timestampMs the record timestamp
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final String topicName,
+                                                 final V value,
+                                                 final long timestampMs) {
+        return create(topicName, null, value, new RecordHeaders(), timestampMs);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, headers, and timestamp.
+     * Does not auto advance internally tracked time.
      *
      * @param topicName the topic name
      * @param value the record value
+     * @param headers the record headers
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final V value,
+                                                 final Headers headers,
                                                  final long timestampMs) {
-        return create(topicName, null, value, timestampMs);
+        return create(topicName, null, value, headers, timestampMs);
     }
 
     /**
      * Create a {@link ConsumerRecord} with default topic name and {@code null}-key as well as given value and timestamp.
+     * Does not auto advance internally tracked time.
+     *
+     * @param value the record value
+     * @param timestampMs the record timestamp
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final V value,
+                                                 final long timestampMs) {
+        return create(value, new RecordHeaders(), timestampMs);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with default topic name and {@code null}-key as well as given value, headers, and timestamp.
+     * Does not auto advance internally tracked time.
      *
      * @param value the record value
+     * @param headers the record headers
      * @param timestampMs the record timestamp
      * @return the generated {@link ConsumerRecord}
      */
     public ConsumerRecord<byte[], byte[]> create(final V value,
+                                                 final Headers headers,
                                                  final long timestampMs) {
         if (topicName == null) {
             throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
                 "Use #create(String topicName, V value, long timestampMs) instead.");
         }
-        return create(topicName, value, timestampMs);
+        return create(topicName, value, headers, timestampMs);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, and headers.
+     * The timestamp will be generated based on the constructor provided start time and time will auto advance.
+     *
+     * @param topicName the topic name
+     * @param value the record value
+     * @param headers the record headers
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final String topicName,
+                                                 final V value,
+                                                 final Headers headers) {
+        return create(topicName, null, value, headers);
     }
 
     /**
      * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name and value.
-     * The timestamp will be generated from the constructor provided and time will auto advance.
+     * The timestamp will be generated based on the constructor provided start time and time will auto advance.
      *
      * @param topicName the topic name
      * @param value the record value
@@ -275,27 +398,40 @@ public class ConsumerRecordFactory<K, V> {
      */
     public ConsumerRecord<byte[], byte[]> create(final String topicName,
                                                  final V value) {
-        return create(topicName, null, value);
+        return create(topicName, null, value, new RecordHeaders());
     }
 
     /**
      * Create a {@link ConsumerRecord} with default topic name and {@code null}-key was well as given value.
-     * The timestamp will be generated from the constructor provided and time will auto advance.
+     * The timestamp will be generated based on the constructor provided start time and time will auto advance.
      *
      * @param value the record value
      * @return the generated {@link ConsumerRecord}
      */
     public ConsumerRecord<byte[], byte[]> create(final V value) {
+        return create(value, new RecordHeaders());
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with default topic name and {@code null}-key was well as given value and headers.
+     * The timestamp will be generated based on the constructor provided start time and time will auto advance.
+     *
+     * @param value the record value
+     * @param headers the record headers
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final V value,
+                                                 final Headers headers) {
         if (topicName == null) {
             throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
                 "Use #create(String topicName, V value, long timestampMs) instead.");
         }
-        return create(topicName, value);
+        return create(topicName, value, headers);
     }
 
     /**
      * Creates {@link ConsumerRecord consumer records} with the given topic name, keys, and values.
-     * The timestamp will be generated from the constructor provided and time will auto advance.
+     * The timestamp will be generated based on the constructor provided start time and time will auto advance.
      *
      * @param topicName the topic name
      * @param keyValues the record keys and values
@@ -314,7 +450,7 @@ public class ConsumerRecordFactory<K, V> {
 
     /**
      * Creates {@link ConsumerRecord consumer records} with default topic name as well as given keys and values.
-     * The timestamp will be generated from the constructor provided and time will auto advance.
+     * The timestamp will be generated based on the constructor provided start time and time will auto advance.
      *
      * @param keyValues the record keys and values
      * @return the generated {@link ConsumerRecord consumer records}
@@ -350,7 +486,7 @@ public class ConsumerRecordFactory<K, V> {
 
         long timestamp = startTimestamp;
         for (final KeyValue<K, V> keyValue : keyValues) {
-            records.add(create(topicName, keyValue.key, keyValue.value, timestamp));
+            records.add(create(topicName, keyValue.key, keyValue.value, new RecordHeaders(), timestamp));
             timestamp += advanceMs;
         }
 
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
index 09ed294..aedb910 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.test;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.TopologyTestDriver;
 
 import java.util.Objects;
@@ -238,4 +239,202 @@ public class OutputVerifier {
         compareKeyValueTimestamp(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.timestamp());
     }
 
+    /**
+     * Compares a {@link ProducerRecord} with the provided value and headers and throws an {@link AssertionError} if
+     * the {@code ProducerRecord}'s value or headers is not equal to the expected value or headers.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedValue the expected value of the {@code ProducerRecord}
+     * @param expectedHeaders the expected headers of the {@code ProducerRecord}
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s value or headers is not equal to {@code expectedValue} or {@code expectedHeaders}
+     */
+    public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> record,
+                                                  final V expectedValue,
+                                                  final Headers expectedHeaders) throws AssertionError {
+        Objects.requireNonNull(record);
+
+        final V recordValue = record.value();
+        final Headers recordHeaders = record.headers();
+        final AssertionError error = new AssertionError("Expected value=" + expectedValue + " with headers=" + expectedHeaders +
+                " but was value=" + recordValue + " with headers=" + recordHeaders);
+
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+
+        if (recordHeaders != null) {
+            if (!recordHeaders.equals(expectedHeaders)) {
+                throw error;
+            }
+        } else if (expectedHeaders != null) {
+            throw error;
+        }
+    }
+
+    /**
+     * Compares the values and headers of two {@link ProducerRecord}'s and throws an {@link AssertionError} if the
+     * values or headers are not equal to each other.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedRecord a {@code ProducerRecord} for verification
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s value or headers is not equal to {@code expectedRecord}'s value or headers
+     */
+    public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> record,
+                                                  final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+        Objects.requireNonNull(expectedRecord);
+        compareValueHeaders(record, expectedRecord.value(), expectedRecord.headers());
+    }
+
+    /**
+     * Compares a {@link ProducerRecord} with the provided key, value, and headers and throws an
+     * {@link AssertionError} if the {@code ProducerRecord}'s key, value, or headers is not equal to the expected key,
+     * value, or headers.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedKey the expected key of the {@code ProducerRecord}
+     * @param expectedValue the expected value of the {@code ProducerRecord}
+     * @param expectedHeaders the expected headers of the {@code ProducerRecord}
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s key, value, headers is not equal to {@code expectedKey},
+     *                        {@code expectedValue}, or {@code expectedHeaders}
+     */
+    public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, V> record,
+                                                     final K expectedKey,
+                                                     final V expectedValue,
+                                                     final Headers expectedHeaders) throws AssertionError {
+        Objects.requireNonNull(record);
+
+        final K recordKey = record.key();
+        final V recordValue = record.value();
+        final Headers recordHeaders = record.headers();
+        final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with headers=" + expectedHeaders +
+                " but was <" + recordKey + ", " + recordValue + "> with headers=" + recordHeaders);
+
+        if (recordKey != null) {
+            if (!recordKey.equals(expectedKey)) {
+                throw error;
+            }
+        } else if (expectedKey != null) {
+            throw error;
+        }
+
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+
+        if (recordHeaders != null) {
+            if (!recordHeaders.equals(expectedHeaders)) {
+                throw error;
+            }
+        } else if (expectedHeaders != null) {
+            throw error;
+        }
+    }
+
+    /**
+     * Compares the keys, values, and headers of two {@link ProducerRecord}'s and throws an {@link AssertionError} if
+     * the keys, values, or headers are not equal to each other.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedRecord a {@code ProducerRecord} for verification
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s key, value, or headers is not equal to
+     *                        {@code expectedRecord}'s key, value, or headers
+     */
+    public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, V> record,
+                                                     final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+        Objects.requireNonNull(expectedRecord);
+        compareKeyValueHeaders(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.headers());
+    }
+
+    /**
+     * Compares a {@link ProducerRecord} with the provided key, value, headers, and timestamp and throws an
+     * {@link AssertionError} if the {@code ProducerRecord}'s key, value, headers, or timestamp is not equal to the expected key,
+     * value, headers, or timestamp.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedKey the expected key of the {@code ProducerRecord}
+     * @param expectedValue the expected value of the {@code ProducerRecord}
+     * @param expectedHeaders the expected headers of the {@code ProducerRecord}
+     * @param expectedTimestamp the expected timestamp of the {@code ProducerRecord}
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s key, value, headers is not equal to {@code expectedKey},
+     *                        {@code expectedValue}, or {@code expectedHeaders}
+     */
+    public static <K, V> void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record,
+                                                              final K expectedKey,
+                                                              final V expectedValue,
+                                                              final Headers expectedHeaders,
+                                                              final long expectedTimestamp) throws AssertionError {
+        Objects.requireNonNull(record);
+
+        final K recordKey = record.key();
+        final V recordValue = record.value();
+        final Headers recordHeaders = record.headers();
+        final long recordTimestamp = record.timestamp();
+        final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + ">" +
+                " with timestamp=" + expectedTimestamp + " and headers=" + expectedHeaders +
+                " but was <" + recordKey + ", " + recordValue + ">" +
+                " with timestamp=" + recordTimestamp + " and headers=" + recordHeaders);
+
+        if (recordKey != null) {
+            if (!recordKey.equals(expectedKey)) {
+                throw error;
+            }
+        } else if (expectedKey != null) {
+            throw error;
+        }
+
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+
+        if (recordHeaders != null) {
+            if (!recordHeaders.equals(expectedHeaders)) {
+                throw error;
+            }
+        } else if (expectedHeaders != null) {
+            throw error;
+        }
+
+        if (recordTimestamp != expectedTimestamp) {
+            throw error;
+        }
+    }
+
+    /**
+     * Compares the keys, values, headers, and timestamp of two {@link ProducerRecord}'s and throws an {@link AssertionError} if
+     * the keys, values, headers, or timestamps are not equal to each other.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedRecord a {@code ProducerRecord} for verification
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s key, value, headers, or timestamp is not equal to
+     *                        {@code expectedRecord}'s key, value, headers, or timestamp
+     */
+    public static <K, V> void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record,
+                                                              final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+        Objects.requireNonNull(expectedRecord);
+        compareKeyValueHeadersTimestamp(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.headers(), expectedRecord.timestamp());
+    }
 }
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index dbb26e0..64d5b12 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -286,7 +286,7 @@ public class MockProcessorContextTest {
         }
 
         context.resetForwards();
-        context.setRecordMetadata("t1", 0, 0L, 0L);
+        context.setRecordMetadata("t1", 0, 0L, null, 0L);
 
         {
             processor.process("foo", 5L);
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 6cc96a2..2d446d1 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -18,6 +18,10 @@ package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -74,10 +78,12 @@ public class TopologyTestDriverTest {
         new ByteArraySerializer(),
         new ByteArraySerializer());
 
+    private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
+
     private final byte[] key1 = new byte[0];
     private final byte[] value1 = new byte[0];
     private final long timestamp1 = 42L;
-    private final ConsumerRecord<byte[], byte[]> consumerRecord1 = consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, timestamp1);
+    private final ConsumerRecord<byte[], byte[]> consumerRecord1 = consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, headers, timestamp1);
 
     private final byte[] key2 = new byte[0];
     private final byte[] value2 = new byte[0];
@@ -107,6 +113,7 @@ public class TopologyTestDriverTest {
         private long timestamp;
         private long offset;
         private String topic;
+        private Headers headers;
 
         Record(final ConsumerRecord consumerRecord) {
             key = consumerRecord.key();
@@ -114,15 +121,18 @@ public class TopologyTestDriverTest {
             timestamp = consumerRecord.timestamp();
             offset = consumerRecord.offset();
             topic = consumerRecord.topic();
+            headers = consumerRecord.headers();
         }
 
         Record(final Object key,
                final Object value,
+               final Headers headers,
                final long timestamp,
                final long offset,
                final String topic) {
             this.key = key;
             this.value = value;
+            this.headers = headers;
             this.timestamp = timestamp;
             this.offset = offset;
             this.topic = topic;
@@ -146,12 +156,13 @@ public class TopologyTestDriverTest {
                 offset == record.offset &&
                 Objects.equals(key, record.key) &&
                 Objects.equals(value, record.value) &&
-                Objects.equals(topic, record.topic);
+                Objects.equals(topic, record.topic) &&
+                Objects.equals(headers, record.headers);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(key, value, timestamp, offset, topic);
+            return Objects.hash(key, value, headers, timestamp, offset, topic);
         }
     }
 
@@ -201,7 +212,7 @@ public class TopologyTestDriverTest {
 
         @Override
         public void process(Object key, Object value) {
-            processedRecords.add(new Record(key, value, context.timestamp(), context.offset(), context.topic()));
+            processedRecords.add(new Record(key, value, context.headers(), context.timestamp(), context.offset(), context.topic()));
             context.forward(key, value);
         }
 
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
index 469d241..855aa9f 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
@@ -63,12 +63,17 @@ public class ConsumerRecordFactoryTest {
     }
 
     @Test(expected = NullPointerException.class)
+    public void shouldNotAllowToCreateTopicWithNullHeaders() {
+        factory.create(topicName, rawKey, value, null, timestamp);
+    }
+
+    @Test(expected = NullPointerException.class)
     public void shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp() {
         factory.create(null, rawKey, value);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowToCreateTopicWithNullTopicNameWithNulKey() {
+    public void shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey() {
         factory.create((String) null, value, timestamp);
     }
 

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.