You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/04/26 22:15:32 UTC

[kafka] branch 2.1 updated: KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (#6602) (#6641)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new bc51d80  KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (#6602) (#6641)
bc51d80 is described below

commit bc51d803478e90af62d242ad8cc6cdaf348d3c17
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Apr 26 17:15:07 2019 -0500

    KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (#6602) (#6641)
    
    Cherry-picked from #6602
    
    Reviewers:  Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../streams/kstream/internals/FullChangeSerde.java |   8 +-
 .../streams/kstream/internals/KStreamImpl.java     |  36 +++-
 .../streams/kstream/internals/KTableImpl.java      |   9 +-
 .../suppress/KTableSuppressProcessor.java          |  43 ++---
 .../processor/internals/ProcessorContextImpl.java  |   4 +-
 .../internals/ProcessorRecordContext.java          |  23 +--
 .../InMemoryTimeOrderedKeyValueBuffer.java         |  62 ++++--
 .../state/internals/TimeOrderedKeyValueBuffer.java |  57 +++++-
 .../integration/SuppressionIntegrationTest.java    | 165 ++++++++++++++--
 .../kstream/internals/FullChangeSerdeTest.java     |  20 +-
 .../suppress/KTableSuppressProcessorTest.java      |  49 +++--
 .../internals/AbstractProcessorContextTest.java    |   4 +-
 .../InMemoryTimeOrderedKeyValueBufferTest.java     |   4 +-
 .../internals/TimeOrderedKeyValueBufferTest.java   | 208 +++++++++------------
 14 files changed, 448 insertions(+), 244 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index 9bb8373..f06a428 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -25,21 +25,21 @@ import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
-public class FullChangeSerde<T> implements Serde<Change<T>> {
+public final class FullChangeSerde<T> implements Serde<Change<T>> {
     private final Serde<T> inner;
 
     @SuppressWarnings("unchecked")
-    public static <T> FullChangeSerde<T> castOrWrap(final Serde<?> serde) {
+    public static <T> FullChangeSerde<T> castOrWrap(final Serde<T> serde) {
         if (serde == null) {
             return null;
         } else if (serde instanceof FullChangeSerde) {
             return (FullChangeSerde<T>) serde;
         } else {
-            return new FullChangeSerde<T>((Serde<T>) serde);
+            return new FullChangeSerde<>(serde);
         }
     }
 
-    public FullChangeSerde(final Serde<T> inner) {
+    private FullChangeSerde(final Serde<T> inner) {
         this.inner = requireNonNull(inner);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 26ea63c..88529fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -392,18 +392,26 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
     @Override
     public KStream<K, V> through(final String topic) {
-        return through(topic, Produced.with(null, null, null));
+        return through(topic, Produced.with(keySerde, valSerde, null));
     }
 
     @Override
     public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(produced, "Produced can't be null");
         final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
+        if (producedInternal.keySerde() == null) {
+            producedInternal.withKeySerde(keySerde);
+        }
+        if (producedInternal.valueSerde() == null) {
+            producedInternal.withValueSerde(valSerde);
+        }
         to(topic, producedInternal);
         return builder.stream(
             Collections.singleton(topic),
             new ConsumedInternal<>(
-                producedInternal.keySerde() != null ? producedInternal.keySerde() : keySerde,
-                producedInternal.valueSerde() != null ? producedInternal.valueSerde() : valSerde,
+                producedInternal.keySerde(),
+                producedInternal.valueSerde(),
                 new FailOnInvalidTimestamp(),
                 null
             )
@@ -412,26 +420,40 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
     @Override
     public void to(final String topic) {
-        to(topic, Produced.with(null, null, null));
+        to(topic, Produced.with(keySerde, valSerde, null));
     }
 
     @Override
     public void to(final String topic, final Produced<K, V> produced) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(produced, "Produced can't be null");
-        to(new StaticTopicNameExtractor<>(topic), new ProducedInternal<>(produced));
+        final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
+        if (producedInternal.keySerde() == null) {
+            producedInternal.withKeySerde(keySerde);
+        }
+        if (producedInternal.valueSerde() == null) {
+            producedInternal.withValueSerde(valSerde);
+        }
+        to(new StaticTopicNameExtractor<>(topic), producedInternal);
     }
 
     @Override
     public void to(final TopicNameExtractor<K, V> topicExtractor) {
-        to(topicExtractor, Produced.with(null, null, null));
+        to(topicExtractor, Produced.with(keySerde, valSerde, null));
     }
 
     @Override
     public void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, V> produced) {
         Objects.requireNonNull(topicExtractor, "topic extractor can't be null");
         Objects.requireNonNull(produced, "Produced can't be null");
-        to(topicExtractor, new ProducedInternal<>(produced));
+        final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
+        if (producedInternal.keySerde() == null) {
+            producedInternal.withKeySerde(keySerde);
+        }
+        if (producedInternal.valueSerde() == null) {
+            producedInternal.withValueSerde(valSerde);
+        }
+        to(topicExtractor, producedInternal);
     }
 
     private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index f49d109..25e76f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -366,18 +366,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME);
 
         final ProcessorSupplier<K, Change<V>> suppressionSupplier =
-            () -> new KTableSuppressProcessor<>(
-                suppressedInternal,
-                storeName,
-                keySerde,
-                valSerde == null ? null : new FullChangeSerde<>(valSerde)
-            );
+            () -> new KTableSuppressProcessor<>(suppressedInternal, storeName);
 
 
         final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
             name,
             new ProcessorParameters<>(suppressionSupplier, name),
-            new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName),
+            new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valSerde)),
             false
         );
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 622223c..fcbc60b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -18,8 +18,6 @@ package org.apache.kafka.streams.kstream.internals.suppress;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
@@ -28,7 +26,6 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.state.internals.ContextualRecord;
 import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
 
 import static java.util.Objects.requireNonNull;
@@ -42,22 +39,14 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
     private final boolean safeToDropTombstones;
     private final String storeName;
 
-    private TimeOrderedKeyValueBuffer buffer;
+    private TimeOrderedKeyValueBuffer<K, Change<V>> buffer;
     private InternalProcessorContext internalProcessorContext;
 
-    private Serde<K> keySerde;
-    private FullChangeSerde<V> valueSerde;
-
     private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
-    public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
-                                   final String storeName,
-                                   final Serde<K> keySerde,
-                                   final FullChangeSerde<V> valueSerde) {
+    public KTableSuppressProcessor(final SuppressedInternal<K> suppress, final String storeName) {
         this.storeName = storeName;
         requireNonNull(suppress);
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
         maxRecords = suppress.bufferConfig().maxRecords();
         maxBytes = suppress.bufferConfig().maxBytes();
         suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis();
@@ -70,9 +59,9 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
     @Override
     public void init(final ProcessorContext context) {
         internalProcessorContext = (InternalProcessorContext) context;
-        keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde;
-        valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
-        buffer = requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName));
+
+        buffer = requireNonNull((TimeOrderedKeyValueBuffer<K, Change<V>>) context.getStateStore(storeName));
+        buffer.setSerdesIfNull((Serde<K>) context.keySerde(), FullChangeSerde.castOrWrap((Serde<V>) context.valueSerde()));
     }
 
     @Override
@@ -84,12 +73,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
 
     private void buffer(final K key, final Change<V> value) {
         final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key);
-        final ProcessorRecordContext recordContext = internalProcessorContext.recordContext();
-
-        final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, key));
-        final byte[] serializedValue = valueSerde.serializer().serialize(null, value);
-
-        buffer.put(bufferTime, serializedKey, new ContextualRecord(serializedValue, recordContext));
+        buffer.put(bufferTime, key, value, internalProcessorContext.recordContext());
     }
 
     private void enforceConstraints() {
@@ -110,6 +94,11 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
                         buffer.numRecords(), maxRecords,
                         buffer.bufferSize(), maxBytes
                     ));
+                default:
+                    throw new UnsupportedOperationException(
+                        "The bufferFullStrategy [" + bufferFullStrategy +
+                            "] is not implemented. This is a bug in Kafka Streams."
+                    );
             }
         }
     }
@@ -118,14 +107,12 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
         return buffer.numRecords() > maxRecords || buffer.bufferSize() > maxBytes;
     }
 
-    private void emit(final KeyValue<Bytes, ContextualRecord> toEmit) {
-        final Change<V> value = valueSerde.deserializer().deserialize(null, toEmit.value.value());
-        if (shouldForward(value)) {
+    private void emit(final TimeOrderedKeyValueBuffer.Eviction<K, Change<V>> toEmit) {
+        if (shouldForward(toEmit.value())) {
             final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
-            internalProcessorContext.setRecordContext(toEmit.value.recordContext());
+            internalProcessorContext.setRecordContext(toEmit.recordContext());
             try {
-                final K key = keySerde.deserializer().deserialize(null, toEmit.key.get());
-                internalProcessorContext.forward(key, value);
+                internalProcessorContext.forward(toEmit.key(), toEmit.value());
             } finally {
                 internalProcessorContext.setRecordContext(prevRecordContext);
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index fa78d01..c9f2431 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -110,7 +110,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                                final V value,
                                final To to) {
         final ProcessorNode previousNode = currentNode();
-        final long currentTimestamp = recordContext.timestamp;
+        final long currentTimestamp = recordContext.timestamp();
 
         try {
             toInternal.update(to);
@@ -138,7 +138,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                 }
             }
         } finally {
-            recordContext.timestamp = currentTimestamp;
+            recordContext.setTimestamp(currentTimestamp);
             setCurrentNode(previousNode);
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 4f991a2..d4e1594 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -29,11 +29,11 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 public class ProcessorRecordContext implements RecordContext {
 
-    long timestamp;
-    final long offset;
-    final String topic;
-    final int partition;
-    final Headers headers;
+    private long timestamp;
+    private final long offset;
+    private final String topic;
+    private final int partition;
+    private final Headers headers;
 
     public ProcessorRecordContext(final long timestamp,
                                   final long offset,
@@ -48,13 +48,6 @@ public class ProcessorRecordContext implements RecordContext {
         this.headers = headers;
     }
 
-    public ProcessorRecordContext(final long timestamp,
-                                  final long offset,
-                                  final int partition,
-                                  final String topic) {
-        this(timestamp, offset, partition, topic, null);
-    }
-
     public void setTimestamp(final long timestamp) {
         this.timestamp = timestamp;
     }
@@ -221,9 +214,13 @@ public class ProcessorRecordContext implements RecordContext {
             Objects.equals(headers, that.headers);
     }
 
+    /**
+     * Equality is implemented in support of tests, *not* for use in Hash collections, since this class is mutable.
+     */
+    @Deprecated
     @Override
     public int hashCode() {
-        return Objects.hash(timestamp, offset, topic, partition, headers);
+        throw new UnsupportedOperationException("ProcessorRecordContext is unsafe for use in Hash collections");
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index d323d97..82d07a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -22,8 +22,8 @@ import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -47,7 +47,7 @@ import java.util.function.Supplier;
 
 import static java.util.Objects.requireNonNull;
 
-public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer {
+public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> {
     private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
     private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
     private static final RecordHeaders V_1_CHANGELOG_HEADERS =
@@ -60,6 +60,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
     private final String storeName;
     private final boolean loggingEnabled;
 
+    private Serde<K> keySerde;
+    private Serde<V> valueSerde;
+
     private long memBufferSize = 0L;
     private long minTimestamp = Long.MAX_VALUE;
     private RecordCollector collector;
@@ -69,13 +72,17 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
 
     private int partition;
 
-    public static class Builder implements StoreBuilder<StateStore> {
+    public static class Builder<K, V> implements StoreBuilder<StateStore> {
 
         private final String storeName;
+        private final Serde<K> keySerde;
+        private final Serde<V> valSerde;
         private boolean loggingEnabled = true;
 
-        public Builder(final String storeName) {
+        public Builder(final String storeName, final Serde<K> keySerde, final Serde<V> valSerde) {
             this.storeName = storeName;
+            this.keySerde = keySerde;
+            this.valSerde = valSerde;
         }
 
         /**
@@ -114,8 +121,8 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
         }
 
         @Override
-        public StateStore build() {
-            return new InMemoryTimeOrderedKeyValueBuffer(storeName, loggingEnabled);
+        public InMemoryTimeOrderedKeyValueBuffer<K, V> build() {
+            return new InMemoryTimeOrderedKeyValueBuffer<>(storeName, loggingEnabled, keySerde, valSerde);
         }
 
         @Override
@@ -145,8 +152,12 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
 
         @Override
         public boolean equals(final Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
             final BufferKey bufferKey = (BufferKey) o;
             return time == bufferKey.time &&
                 Objects.equals(key, bufferKey.key);
@@ -173,9 +184,14 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
         }
     }
 
-    private InMemoryTimeOrderedKeyValueBuffer(final String storeName, final boolean loggingEnabled) {
+    private InMemoryTimeOrderedKeyValueBuffer(final String storeName,
+                                              final boolean loggingEnabled,
+                                              final Serde<K> keySerde,
+                                              final Serde<V> valueSerde) {
         this.storeName = storeName;
         this.loggingEnabled = loggingEnabled;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
     }
 
     @Override
@@ -190,6 +206,12 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
     }
 
     @Override
+    public void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde) {
+        this.keySerde = this.keySerde == null ? keySerde : this.keySerde;
+        this.valueSerde = this.valueSerde == null ? valueSerde : this.valueSerde;
+    }
+
+    @Override
     public void init(final ProcessorContext context, final StateStore root) {
         context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
         if (loggingEnabled) {
@@ -343,7 +365,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
 
     @Override
     public void evictWhile(final Supplier<Boolean> predicate,
-                           final Consumer<KeyValue<Bytes, ContextualRecord>> callback) {
+                           final Consumer<Eviction<K, V>> callback) {
         final Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = sortedMap.entrySet().iterator();
 
         if (predicate.get()) {
@@ -360,7 +382,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
                             next.getKey().time + "]"
                     );
                 }
-                callback.accept(new KeyValue<>(next.getKey().key, next.getValue()));
+                final K key = keySerde.deserializer().deserialize(changelogTopic, next.getKey().key.get());
+                final V value = valueSerde.deserializer().deserialize(changelogTopic, next.getValue().value());
+                callback.accept(new Eviction<>(key, value, next.getValue().recordContext()));
 
                 delegate.remove();
                 index.remove(next.getKey().key);
@@ -383,13 +407,17 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
 
     @Override
     public void put(final long time,
-                    final Bytes key,
-                    final ContextualRecord contextualRecord) {
-        requireNonNull(contextualRecord.value(), "value cannot be null");
-        requireNonNull(contextualRecord.recordContext(), "recordContext cannot be null");
+                    final K key,
+                    final V value,
+                    final ProcessorRecordContext recordContext) {
+        requireNonNull(value, "value cannot be null");
+        requireNonNull(recordContext, "recordContext cannot be null");
+
+        final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
+        final byte[] serializedValue = valueSerde.serializer().serialize(changelogTopic, value);
 
-        cleanPut(time, key, contextualRecord);
-        dirtyKeys.add(key);
+        cleanPut(time, serializedKey, new ContextualRecord(serializedValue, recordContext));
+        dirtyKeys.add(serializedKey);
     }
 
     private void cleanPut(final long time, final Bytes key, final ContextualRecord value) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
index 86a8c1e..ffa1f49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
@@ -16,17 +16,64 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 
+import java.util.Objects;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-public interface TimeOrderedKeyValueBuffer extends StateStore {
-    void evictWhile(final Supplier<Boolean> predicate, final Consumer<KeyValue<Bytes, ContextualRecord>> callback);
+public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore {
+    final class Eviction<K, V> {
+        private final K key;
+        private final V value;
+        private final ProcessorRecordContext recordContext;
 
-    void put(final long time, final Bytes key, final ContextualRecord value);
+        Eviction(final K key, final V value, final ProcessorRecordContext recordContext) {
+            this.key = key;
+            this.value = value;
+            this.recordContext = recordContext;
+        }
+
+        public K key() {
+            return key;
+        }
+
+        public V value() {
+            return value;
+        }
+
+        public ProcessorRecordContext recordContext() {
+            return recordContext;
+        }
+
+        @Override
+        public String toString() {
+            return "Eviction{key=" + key + ", value=" + value + ", recordContext=" + recordContext + '}';
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            final Eviction<?, ?> eviction = (Eviction<?, ?>) o;
+            return Objects.equals(key, eviction.key) &&
+                Objects.equals(value, eviction.value) &&
+                Objects.equals(recordContext, eviction.recordContext);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key, value, recordContext);
+        }
+    }
+
+    void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde);
+
+    void evictWhile(final Supplier<Boolean> predicate, final Consumer<Eviction<K, V>> callback);
+
+    void put(long time, K key, V value, ProcessorRecordContext recordContext);
 
     int numRecords();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index da91b91..79957d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -17,9 +17,13 @@
 package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
@@ -35,6 +39,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
@@ -44,6 +49,8 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.Matchers;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -53,6 +60,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 import static java.lang.Long.MAX_VALUE;
 import static java.time.Duration.ofMillis;
@@ -62,6 +70,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
@@ -73,7 +82,7 @@ import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-@Category({IntegrationTest.class})
+@Category(IntegrationTest.class)
 public class SuppressionIntegrationTest {
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
@@ -147,7 +156,7 @@ public class SuppressionIntegrationTest {
         }
     }
 
-    private KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) {
+    private static KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) {
         return builder
             .table(
                 input,
@@ -161,6 +170,139 @@ public class SuppressionIntegrationTest {
     }
 
     @Test
+    public void shouldUseDefaultSerdes() {
+        final String testId = "-shouldInheritSerdes";
+        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> inputStream = builder.stream(input);
+
+        final KTable<String, String> valueCounts = inputStream
+            .groupByKey()
+            .aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")");
+
+        valueCounts
+            .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull()))
+            .toStream()
+            .to(outputSuppressed);
+
+        valueCounts
+            .toStream()
+            .to(outputRaw);
+
+        final Properties streamsConfig = getStreamsConfig(appId);
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
+        try {
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", "x", scaledTime(3L))
+                )
+            );
+            final boolean rawRecords = waitForAnyRecord(outputRaw);
+            final boolean suppressedRecords = waitForAnyRecord(outputSuppressed);
+            assertThat(rawRecords, Matchers.is(true));
+            assertThat(suppressedRecords, is(true));
+        } finally {
+            driver.close();
+            cleanStateAfterTest(CLUSTER, driver);
+        }
+    }
+
+    @Test
+    public void shouldInheritSerdes() {
+        final String testId = "-shouldInheritSerdes";
+        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> inputStream = builder.stream(input);
+
+        // count sets the serde to Long
+        final KTable<String, Long> valueCounts = inputStream
+            .groupByKey()
+            .count();
+
+        valueCounts
+            .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull()))
+            .toStream()
+            .to(outputSuppressed);
+
+        valueCounts
+            .toStream()
+            .to(outputRaw);
+
+        final Properties streamsConfig = getStreamsConfig(appId);
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
+        try {
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", "x", scaledTime(3L))
+                )
+            );
+            final boolean rawRecords = waitForAnyRecord(outputRaw);
+            final boolean suppressedRecords = waitForAnyRecord(outputSuppressed);
+            assertThat(rawRecords, Matchers.is(true));
+            assertThat(suppressedRecords, is(true));
+        } finally {
+            driver.close();
+            cleanStateAfterTest(CLUSTER, driver);
+        }
+    }
+
+    private static boolean waitForAnyRecord(final String topic) {
+        final Properties properties = new Properties();
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+        try (final Consumer<Object, Object> consumer = new KafkaConsumer<>(properties)) {
+            final List<TopicPartition> partitions =
+                consumer.partitionsFor(topic)
+                        .stream()
+                        .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
+                        .collect(Collectors.toList());
+            consumer.assign(partitions);
+            consumer.seekToBeginning(partitions);
+            final long start = System.currentTimeMillis();
+            while ((System.currentTimeMillis() - start) < DEFAULT_TIMEOUT) {
+                final ConsumerRecords<Object, Object> records = consumer.poll(ofMillis(500));
+
+                if (!records.isEmpty()) {
+                    return true;
+                }
+            }
+
+            return false;
+        }
+    }
+
+    @Test
     public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() {
         final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
         final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
@@ -336,7 +478,7 @@ public class SuppressionIntegrationTest {
 
         valueCounts
             // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size.
-            .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
+            .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
             .toStream()
             .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
 
@@ -396,7 +538,7 @@ public class SuppressionIntegrationTest {
 
         valueCounts
             // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size.
-            .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).shutDownWhenFull()))
+            .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxBytes(200L).shutDownWhenFull()))
             .toStream()
             .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
 
@@ -490,17 +632,18 @@ public class SuppressionIntegrationTest {
         }
     }
 
-    private Properties getStreamsConfig(final String appId) {
+    private static Properties getStreamsConfig(final String appId) {
         return mkProperties(mkMap(
             mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
             mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
             mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
             mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
-            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE)
+            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE),
+            mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
         ));
     }
 
-    private String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) {
+    private static String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) {
         return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), scaledTime(unscaledEnd))).toString();
     }
 
@@ -508,11 +651,11 @@ public class SuppressionIntegrationTest {
      * scaling to ensure that there are commits in between the various test events,
      * just to exercise that everything works properly in the presence of commits.
      */
-    private long scaledTime(final long unscaledTime) {
+    private static long scaledTime(final long unscaledTime) {
         return COMMIT_INTERVAL * 2 * unscaledTime;
     }
 
-    private void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
+    private static void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
         final Properties producerConfig = mkProperties(mkMap(
             mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"),
             mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
@@ -522,12 +665,12 @@ public class SuppressionIntegrationTest {
         IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, Optional.empty(), toProduce);
     }
 
-    private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException {
+    private static void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException {
         waitForCondition(() -> !driver.state().isRunning(), TIMEOUT_MS, "Streams didn't shut down.");
         assertThat(driver.state(), is(KafkaStreams.State.ERROR));
     }
 
-    private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
+    private static void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
         final Properties properties = mkProperties(
             mkMap(
                 mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
index a6a8888..ddba05e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
@@ -29,7 +29,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
 public class FullChangeSerdeTest {
-    private final FullChangeSerde<String> serde = new FullChangeSerde<>(Serdes.String());
+    private final FullChangeSerde<String> serde = FullChangeSerde.castOrWrap(Serdes.String());
 
     @Test
     public void shouldRoundTripNull() {
@@ -77,31 +77,28 @@ public class FullChangeSerdeTest {
         );
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldConfigureSerde() {
         final Serde<Void> mock = EasyMock.mock(Serde.class);
         mock.configure(emptyMap(), false);
         EasyMock.expectLastCall();
         EasyMock.replay(mock);
-        final FullChangeSerde<Void> serde = new FullChangeSerde<>(mock);
+        final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock);
         serde.configure(emptyMap(), false);
         EasyMock.verify(mock);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldCloseSerde() {
         final Serde<Void> mock = EasyMock.mock(Serde.class);
         mock.close();
         EasyMock.expectLastCall();
         EasyMock.replay(mock);
-        final FullChangeSerde<Void> serde = new FullChangeSerde<>(mock);
+        final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock);
         serde.close();
         EasyMock.verify(mock);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldConfigureSerializer() {
         final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
@@ -111,13 +108,12 @@ public class FullChangeSerdeTest {
         mockSerializer.configure(emptyMap(), false);
         EasyMock.expectLastCall();
         EasyMock.replay(mockSerializer);
-        final Serializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).serializer();
+        final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer();
         serializer.configure(emptyMap(), false);
         EasyMock.verify(mockSerde);
         EasyMock.verify(mockSerializer);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldCloseSerializer() {
         final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
@@ -127,13 +123,12 @@ public class FullChangeSerdeTest {
         mockSerializer.close();
         EasyMock.expectLastCall();
         EasyMock.replay(mockSerializer);
-        final Serializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).serializer();
+        final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer();
         serializer.close();
         EasyMock.verify(mockSerde);
         EasyMock.verify(mockSerializer);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldConfigureDeserializer() {
         final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
@@ -143,13 +138,12 @@ public class FullChangeSerdeTest {
         mockDeserializer.configure(emptyMap(), false);
         EasyMock.expectLastCall();
         EasyMock.replay(mockDeserializer);
-        final Deserializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).deserializer();
+        final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer();
         serializer.configure(emptyMap(), false);
         EasyMock.verify(mockSerde);
         EasyMock.verify(mockDeserializer);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldCloseDeserializer() {
         final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
@@ -159,7 +153,7 @@ public class FullChangeSerdeTest {
         mockDeserializer.close();
         EasyMock.expectLastCall();
         EasyMock.replay(mockDeserializer);
-        final Deserializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).deserializer();
+        final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer();
         serializer.close();
         EasyMock.verify(mockSerde);
         EasyMock.verify(mockDeserializer);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index bb1bc0f..6246459 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -72,11 +72,12 @@ public class KTableSuppressProcessorTest {
 
             final String storeName = "test-store";
 
-            final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName)
+            final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valueSerde))
                 .withLoggingDisabled()
                 .build();
+
             final KTableSuppressProcessor<K, V> processor =
-                new KTableSuppressProcessor<>(getImpl(suppressed), storeName, keySerde, new FullChangeSerde<>(valueSerde));
+                new KTableSuppressProcessor<>((SuppressedInternal<K>) suppressed, storeName);
 
             final MockInternalProcessorContext context = new MockInternalProcessorContext();
             buffer.init(context, buffer);
@@ -201,7 +202,6 @@ public class KTableSuppressProcessorTest {
         // note the record is in the past, but the window end is in the future, so we still have to buffer,
         // even though the grace period is 0.
         final long timestamp = 5L;
-        final long streamTime = 99L;
         final long windowEnd = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, windowEnd));
@@ -237,8 +237,12 @@ public class KTableSuppressProcessorTest {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
+    /**
+     * It's desirable to drop tombstones for final-results windowed streams, since (as described in the
+     * {@link SuppressedInternal} javadoc), they are unnecessary to emit.
+     */
     @Test
-    public void finalResultsShouldSuppressTombstonesForTimeWindows() {
+    public void finalResultsShouldDropTombstonesForTimeWindows() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
@@ -253,8 +257,13 @@ public class KTableSuppressProcessorTest {
         assertThat(context.forwarded(), hasSize(0));
     }
 
+
+    /**
+     * It's desirable to drop tombstones for final-results windowed streams, since (as described in the
+     * {@link SuppressedInternal} javadoc), they are unnecessary to emit.
+     */
     @Test
-    public void finalResultsShouldSuppressTombstonesForSessionWindows() {
+    public void finalResultsShouldDropTombstonesForSessionWindows() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), sessionWindowedSerdeFrom(String.class), Long());
         final MockInternalProcessorContext context = harness.context;
@@ -269,8 +278,12 @@ public class KTableSuppressProcessorTest {
         assertThat(context.forwarded(), hasSize(0));
     }
 
+    /**
+     * It's NOT OK to drop tombstones for non-final-results windowed streams, since we may have emitted some results for
+     * the window before getting the tombstone (see the {@link SuppressedInternal} javadoc).
+     */
     @Test
-    public void suppressShouldNotSuppressTombstonesForTimeWindows() {
+    public void suppressShouldNotDropTombstonesForTimeWindows() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
@@ -288,8 +301,13 @@ public class KTableSuppressProcessorTest {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
+
+    /**
+     * It's NOT OK to drop tombstones for non-final-results windowed streams, since we may have emitted some results for
+     * the window before getting the tombstone (see the {@link SuppressedInternal} javadoc).
+     */
     @Test
-    public void suppressShouldNotSuppressTombstonesForSessionWindows() {
+    public void suppressShouldNotDropTombstonesForSessionWindows() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), sessionWindowedSerdeFrom(String.class), Long());
         final MockInternalProcessorContext context = harness.context;
@@ -307,8 +325,13 @@ public class KTableSuppressProcessorTest {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
+
+    /**
+     * It's SUPER NOT OK to drop tombstones for non-windowed streams, since we may have emitted some results for
+     * the key before getting the tombstone (see the {@link SuppressedInternal} javadoc).
+     */
     @Test
-    public void suppressShouldNotSuppressTombstonesForKTable() {
+    public void suppressShouldNotDropTombstonesForKTable() {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), String(), Long());
         final MockInternalProcessorContext context = harness.context;
@@ -416,8 +439,8 @@ public class KTableSuppressProcessorTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private <K extends Windowed> SuppressedInternal<K> finalResults(final Duration grace) {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private static <K extends Windowed> SuppressedInternal<K> finalResults(final Duration grace) {
         return ((FinalResultsSuppressionBuilder) untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace);
     }
 
@@ -441,11 +464,7 @@ public class KTableSuppressProcessorTest {
         };
     }
 
-    private static <K> SuppressedInternal<K> getImpl(final Suppressed<K> suppressed) {
-        return (SuppressedInternal<K>) suppressed;
-    }
-
-    private <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K> rawType, final long windowSize) {
+    private static <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K> rawType, final long windowSize) {
         final Serde<K> kSerde = Serdes.serdeFrom(rawType);
         return new Serdes.WrapperSerde<>(
             new TimeWindowedSerializer<>(kSerde.serializer()),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 43df1d2..1864547 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -96,7 +96,7 @@ public class AbstractProcessorContextTest {
 
     @Test
     public void shouldReturnNullIfTopicEqualsNonExistTopic() {
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null));
         assertThat(context.topic(), nullValue());
     }
 
@@ -154,7 +154,7 @@ public class AbstractProcessorContextTest {
 
     @Test
     public void shouldReturnNullIfHeadersAreNotSet() {
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null));
         assertThat(context.headers(), nullValue());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
index ddc4046..18f689f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
@@ -22,11 +22,11 @@ public class InMemoryTimeOrderedKeyValueBufferTest {
 
     @Test
     public void bufferShouldAllowCacheEnablement() {
-        new InMemoryTimeOrderedKeyValueBuffer.Builder(null).withCachingEnabled();
+        new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingEnabled();
     }
 
     @Test
     public void bufferShouldAllowCacheDisablement() {
-        new InMemoryTimeOrderedKeyValueBuffer.Builder(null).withCachingDisabled();
+        new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingDisabled();
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 2953953..6ae36d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -22,13 +22,14 @@ import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer.Eviction;
 import org.apache.kafka.test.MockInternalProcessorContext;
 import org.apache.kafka.test.MockInternalProcessorContext.MockRecordCollector;
 import org.apache.kafka.test.TestUtils;
@@ -55,7 +56,7 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
-public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> {
+public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<String, String>> {
     private static final RecordHeaders V_1_CHANGELOG_HEADERS =
         new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
 
@@ -69,9 +70,9 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
         return singletonList(
             new Object[] {
                 "in-memory buffer",
-                (Function<String, InMemoryTimeOrderedKeyValueBuffer>) name ->
-                    (InMemoryTimeOrderedKeyValueBuffer) new InMemoryTimeOrderedKeyValueBuffer
-                        .Builder(name)
+                (Function<String, InMemoryTimeOrderedKeyValueBuffer<String, String>>) name ->
+                    new InMemoryTimeOrderedKeyValueBuffer
+                        .Builder<>(name, Serdes.String(), Serdes.String())
                         .build()
             }
         );
@@ -96,7 +97,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
     }
 
 
-    private static void cleanup(final MockInternalProcessorContext context, final TimeOrderedKeyValueBuffer buffer) {
+    private static void cleanup(final MockInternalProcessorContext context, final TimeOrderedKeyValueBuffer<String, String> buffer) {
         try {
             buffer.close();
             Utils.delete(context.stateDir());
@@ -107,7 +108,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
     @Test
     public void shouldInit() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
         cleanup(context, buffer);
@@ -115,23 +116,23 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
     @Test
     public void shouldAcceptData() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        putRecord(buffer, context, "2p93nf", 0, "asdf");
+        putRecord(buffer, context, 0L, 0L, "asdf", "2p93nf");
         cleanup(context, buffer);
     }
 
     @Test
     public void shouldRejectNullValues() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
         try {
-            buffer.put(0, getBytes("asdf"), new ContextualRecord(
-                null,
-                new ProcessorRecordContext(0, 0, 0, "topic")
-            ));
+            buffer.put(0, "asdf",
+                       null,
+                       getContext(0)
+            );
             fail("expected an exception");
         } catch (final NullPointerException expected) {
             // expected
@@ -139,27 +140,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
         cleanup(context, buffer);
     }
 
-    private static ContextualRecord getRecord(final String value) {
-        return getRecord(value, 0L);
-    }
-
-    private static ContextualRecord getRecord(final String value, final long timestamp) {
-        return new ContextualRecord(
-            value.getBytes(UTF_8),
-            new ProcessorRecordContext(timestamp, 0, 0, "topic")
-        );
-    }
-
-    private static Bytes getBytes(final String key) {
-        return Bytes.wrap(key.getBytes(UTF_8));
-    }
-
     @Test
     public void shouldRemoveData() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        putRecord(buffer, context, "qwer", 0, "asdf");
+        putRecord(buffer, context, 0L, 0L, "asdf", "qwer");
         assertThat(buffer.numRecords(), is(1));
         buffer.evictWhile(() -> true, kv -> { });
         assertThat(buffer.numRecords(), is(0));
@@ -168,90 +154,71 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
     @Test
     public void shouldRespectEvictionPredicate() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        final Bytes firstKey = getBytes("asdf");
-        final ContextualRecord firstRecord = getRecord("eyt");
-        putRecord(0, buffer, context, firstRecord, firstKey);
-        putRecord(buffer, context, "rtg", 1, "zxcv");
+        putRecord(buffer, context, 0L, 0L, "asdf", "eyt");
+        putRecord(buffer, context, 1L, 0L, "zxcv", "rtg");
         assertThat(buffer.numRecords(), is(2));
-        final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>();
+        final List<Eviction<String, String>> evicted = new LinkedList<>();
         buffer.evictWhile(() -> buffer.numRecords() > 1, evicted::add);
         assertThat(buffer.numRecords(), is(1));
-        assertThat(evicted, is(singletonList(new KeyValue<>(firstKey, firstRecord))));
+        assertThat(evicted, is(singletonList(new Eviction<>("asdf", "eyt", getContext(0L)))));
         cleanup(context, buffer);
     }
 
     @Test
     public void shouldTrackCount() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        putRecord(buffer, context, "oin", 0, "asdf");
+        putRecord(buffer, context, 0L, 0L, "asdf", "oin");
         assertThat(buffer.numRecords(), is(1));
-        putRecord(buffer, context, "wekjn", 1, "asdf");
+        putRecord(buffer, context, 1L, 0L, "asdf", "wekjn");
         assertThat(buffer.numRecords(), is(1));
-        putRecord(buffer, context, "24inf", 0, "zxcv");
+        putRecord(buffer, context, 0L, 0L, "zxcv", "24inf");
         assertThat(buffer.numRecords(), is(2));
         cleanup(context, buffer);
     }
 
     @Test
     public void shouldTrackSize() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        putRecord(buffer, context, "23roni", 0, "asdf");
+        putRecord(buffer, context, 0L, 0L, "asdf", "23roni");
         assertThat(buffer.bufferSize(), is(43L));
-        putRecord(buffer, context, "3l", 1, "asdf");
+        putRecord(buffer, context, 1L, 0L, "asdf", "3l");
         assertThat(buffer.bufferSize(), is(39L));
-        putRecord(buffer, context, "qfowin", 0, "zxcv");
+        putRecord(buffer, context, 0L, 0L, "zxcv", "qfowin");
         assertThat(buffer.bufferSize(), is(82L));
         cleanup(context, buffer);
     }
 
     @Test
     public void shouldTrackMinTimestamp() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        putRecord(buffer, context, "2093j", 1, "asdf");
+        putRecord(buffer, context, 1L, 0L, "asdf", "2093j");
         assertThat(buffer.minTimestamp(), is(1L));
-        putRecord(buffer, context, "3gon4i", 0, "zxcv");
+        putRecord(buffer, context, 0L, 0L, "zxcv", "3gon4i");
         assertThat(buffer.minTimestamp(), is(0L));
         cleanup(context, buffer);
     }
 
-    private static void putRecord(final TimeOrderedKeyValueBuffer buffer,
-                                  final MockInternalProcessorContext context,
-                                  final String value,
-                                  final int time,
-                                  final String key) {
-        putRecord(time, buffer, context, getRecord(value), getBytes(key));
-    }
-
-    private static void putRecord(final int time,
-                                  final TimeOrderedKeyValueBuffer buffer,
-                                  final MockInternalProcessorContext context,
-                                  final ContextualRecord firstRecord,
-                                  final Bytes firstKey) {
-        context.setRecordContext(firstRecord.recordContext());
-        buffer.put(time, firstKey, firstRecord);
-    }
-
     @Test
     public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
 
-        putRecord(buffer, context, "o23i4", 1, "zxcv");
+        putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4");
         assertThat(buffer.numRecords(), is(1));
         assertThat(buffer.bufferSize(), is(42L));
         assertThat(buffer.minTimestamp(), is(1L));
 
-        putRecord(buffer, context, "3ng", 0, "asdf");
+        putRecord(buffer, context, 0L, 0L, "asdf", "3ng");
         assertThat(buffer.numRecords(), is(2));
         assertThat(buffer.bufferSize(), is(82L));
         assertThat(buffer.minTimestamp(), is(0L));
@@ -260,14 +227,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
         buffer.evictWhile(() -> true, kv -> {
             switch (callbackCount.incrementAndGet()) {
                 case 1: {
-                    assertThat(new String(kv.key.get(), UTF_8), is("asdf"));
+                    assertThat(kv.key(), is("asdf"));
                     assertThat(buffer.numRecords(), is(2));
                     assertThat(buffer.bufferSize(), is(82L));
                     assertThat(buffer.minTimestamp(), is(0L));
                     break;
                 }
                 case 2: {
-                    assertThat(new String(kv.key.get(), UTF_8), is("zxcv"));
+                    assertThat(kv.key(), is("zxcv"));
                     assertThat(buffer.numRecords(), is(1));
                     assertThat(buffer.bufferSize(), is(42L));
                     assertThat(buffer.minTimestamp(), is(1L));
@@ -288,12 +255,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
     @Test
     public void shouldFlush() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        putRecord(2, buffer, context, getRecord("2093j", 0L), getBytes("asdf"));
-        putRecord(1, buffer, context, getRecord("3gon4i", 1L), getBytes("zxcv"));
-        putRecord(0, buffer, context, getRecord("deadbeef", 2L), getBytes("deleteme"));
+        putRecord(buffer, context, 2L, 0L, "asdf", "2093j");
+        putRecord(buffer, context, 1L, 1L, "zxcv", "3gon4i");
+        putRecord(buffer, context, 0L, 2L, "deleteme", "deadbeef");
 
         // replace "deleteme" with a tombstone
         buffer.evictWhile(() -> buffer.minTimestamp() < 1, kv -> { });
@@ -357,17 +324,16 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
         cleanup(context, buffer);
     }
 
-
     @Test
     public void shouldRestoreOldFormat() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
 
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, ""));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
 
         stateRestoreCallback.restoreBatch(asList(
             new ConsumerRecord<>("changelog-topic",
@@ -425,7 +391,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
         // flush the buffer into a list in buffer order so we can make assertions about the contents.
 
-        final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>();
+        final List<Eviction<String, String>> evicted = new LinkedList<>();
         buffer.evictWhile(() -> true, evicted::add);
 
         // Several things to note:
@@ -437,22 +403,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
         //   original format.
 
         assertThat(evicted, is(asList(
-            new KeyValue<>(
-                getBytes("zxcv"),
-                new ContextualRecord("3o4im".getBytes(UTF_8),
-                                     new ProcessorRecordContext(2,
-                                                                2,
-                                                                0,
-                                                                "changelog-topic",
-                                                                new RecordHeaders()))),
-            new KeyValue<>(
-                getBytes("asdf"),
-                new ContextualRecord("qwer".getBytes(UTF_8),
-                                     new ProcessorRecordContext(1,
-                                                                1,
-                                                                0,
-                                                                "changelog-topic",
-                                                                new RecordHeaders())))
+            new Eviction<>(
+                "zxcv",
+                "3o4im",
+                new ProcessorRecordContext(2L, 2, 0, "changelog-topic", new RecordHeaders())),
+            new Eviction<>(
+                "asdf",
+                "qwer",
+                new ProcessorRecordContext(1L, 1, 0, "changelog-topic", new RecordHeaders()))
         )));
 
         cleanup(context, buffer);
@@ -460,14 +418,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
     @Test
     public void shouldRestoreNewFormat() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
 
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, ""));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
 
         final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
 
@@ -533,7 +491,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
         // flush the buffer into a list in buffer order so we can make assertions about the contents.
 
-        final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>();
+        final List<Eviction<String, String>> evicted = new LinkedList<>();
         buffer.evictWhile(() -> true, evicted::add);
 
         // Several things to note:
@@ -541,41 +499,33 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
         // * The record timestamps are properly restored, and not conflated with the record's buffer time.
         // * The keys and values are properly restored
         // * The record topic is set to the original input topic, *not* the changelog topic
-        // * The record offset preserves the origininal input record's offset, *not* the offset of the changelog record
+        // * The record offset preserves the original input record's offset, *not* the offset of the changelog record
 
 
         assertThat(evicted, is(asList(
-            new KeyValue<>(
-                getBytes("zxcv"),
-                new ContextualRecord("3o4im".getBytes(UTF_8),
-                                     new ProcessorRecordContext(2,
-                                                                0,
-                                                                0,
-                                                                "topic",
-                                                                null))),
-            new KeyValue<>(
-                getBytes("asdf"),
-                new ContextualRecord("qwer".getBytes(UTF_8),
-                                     new ProcessorRecordContext(1,
-                                                                0,
-                                                                0,
-                                                                "topic",
-                                                                null)))
-        )));
+            new Eviction<>(
+                "zxcv",
+                "3o4im",
+                getContext(2L)),
+            new Eviction<>(
+                "asdf",
+                "qwer",
+                getContext(1L)
+            ))));
 
         cleanup(context, buffer);
     }
 
     @Test
     public void shouldNotRestoreUnrecognizedVersionRecord() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
 
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, ""));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
 
         final RecordHeaders unknownFlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) -1})});
 
@@ -601,4 +551,26 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
             cleanup(context, buffer);
         }
     }
+
+    private static void putRecord(final TimeOrderedKeyValueBuffer<String, String> buffer,
+                                  final MockInternalProcessorContext context,
+                                  final long streamTime,
+                                  final long recordTimestamp,
+                                  final String key,
+                                  final String value) {
+        final ProcessorRecordContext recordContext = getContext(recordTimestamp);
+        context.setRecordContext(recordContext);
+        buffer.put(streamTime, key, value, recordContext);
+    }
+
+    private static ContextualRecord getRecord(final String value, final long timestamp) {
+        return new ContextualRecord(
+            value.getBytes(UTF_8),
+            getContext(timestamp)
+        );
+    }
+
+    private static ProcessorRecordContext getContext(final long recordTimestamp) {
+        return new ProcessorRecordContext(recordTimestamp, 0, 0, "topic", null);
+    }
 }