You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/06/13 13:48:41 UTC

[kafka] branch trunk updated: KAFKA-8452: Compressed BufferValue (#6848)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e54ab29  KAFKA-8452: Compressed BufferValue (#6848)
e54ab29 is described below

commit e54ab292e7e2fd1d18e82387a586969ba57eb7ea
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Jun 13 08:48:15 2019 -0500

    KAFKA-8452: Compressed BufferValue (#6848)
    
    De-duplicate the common case in which the prior value is the same as the old value.
    
    Reviewers: Sophie Blee-Goldman <so...@confluent.io>,  Bill Bejeck <bb...@gmail.com>
---
 .../kafka/streams/kstream/internals/Change.java    |   2 +-
 .../streams/kstream/internals/FullChangeSerde.java | 153 +++++++---------
 .../internals/ProcessorRecordContext.java          |   2 +-
 .../kafka/streams/state/internals/BufferValue.java | 137 +++++++++++---
 .../streams/state/internals/ContextualRecord.java  |   4 +-
 .../InMemoryTimeOrderedKeyValueBuffer.java         |  71 ++++---
 .../streams/state/internals/LRUCacheEntry.java     |   2 +-
 .../kstream/internals/FullChangeSerdeTest.java     | 132 +++-----------
 .../KTableSuppressProcessorMetricsTest.java        |  15 +-
 .../suppress/KTableSuppressProcessorTest.java      |   3 +-
 .../kstream/internals/suppress/SuppressSuite.java  |  14 +-
 .../internals/ProcessorRecordContextTest.java      |  10 +-
 .../streams/state/internals/BufferValueTest.java   | 203 +++++++++++++++++++++
 .../internals/TimeOrderedKeyValueBufferTest.java   |  78 ++++----
 14 files changed, 518 insertions(+), 308 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
index c9a18de..f28a16d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
@@ -30,7 +30,7 @@ public class Change<T> {
 
     @Override
     public String toString() {
-        return "(" + newValue + "<-" + oldValue + ")";
+        return "(" + String.valueOf(newValue) + "<-" + String.valueOf(oldValue) + ")";
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index 30d55be..f28f9e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -21,19 +21,15 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 
 import java.nio.ByteBuffer;
-import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
-public final class FullChangeSerde<T> implements Serde<Change<T>> {
+public final class FullChangeSerde<T> {
     private final Serde<T> inner;
 
-    @SuppressWarnings("unchecked")
-    public static <T> FullChangeSerde<T> castOrWrap(final Serde<T> serde) {
+    public static <T> FullChangeSerde<T> wrap(final Serde<T> serde) {
         if (serde == null) {
             return null;
-        } else if (serde instanceof FullChangeSerde) {
-            return (FullChangeSerde<T>) serde;
         } else {
             return new FullChangeSerde<>(serde);
         }
@@ -47,98 +43,81 @@ public final class FullChangeSerde<T> implements Serde<Change<T>> {
         return inner;
     }
 
-    @Override
-    public void configure(final Map<String, ?> configs, final boolean isKey) {
-        inner.configure(configs, isKey);
+    public Change<byte[]> serializeParts(final String topic, final Change<T> data) {
+        if (data == null) {
+            return null;
+        }
+        final Serializer<T> innerSerializer = innerSerde().serializer();
+        final byte[] oldBytes = data.oldValue == null ? null : innerSerializer.serialize(topic, data.oldValue);
+        final byte[] newBytes = data.newValue == null ? null : innerSerializer.serialize(topic, data.newValue);
+        return new Change<>(newBytes, oldBytes);
     }
 
-    @Override
-    public void close() {
-        inner.close();
-    }
 
-    @Override
-    public Serializer<Change<T>> serializer() {
-        final Serializer<T> innerSerializer = inner.serializer();
-
-        return new Serializer<Change<T>>() {
-            @Override
-            public void configure(final Map<String, ?> configs, final boolean isKey) {
-                innerSerializer.configure(configs, isKey);
-            }
-
-            @Override
-            public byte[] serialize(final String topic, final Change<T> data) {
-                if (data == null) {
-                    return null;
-                }
-                final byte[] oldBytes = data.oldValue == null ? null : innerSerializer.serialize(topic, data.oldValue);
-                final int oldSize = oldBytes == null ? -1 : oldBytes.length;
-                final byte[] newBytes = data.newValue == null ? null : innerSerializer.serialize(topic, data.newValue);
-                final int newSize = newBytes == null ? -1 : newBytes.length;
-
-                final ByteBuffer buffer = ByteBuffer.wrap(
-                    new byte[4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize == -1 ? 0 : newSize)]
-                );
-                buffer.putInt(oldSize);
-                if (oldBytes != null) {
-                    buffer.put(oldBytes);
-                }
-                buffer.putInt(newSize);
-                if (newBytes != null) {
-                    buffer.put(newBytes);
-                }
-                return buffer.array();
-            }
-
-            @Override
-            public void close() {
-                innerSerializer.close();
-            }
-        };
+    public Change<T> deserializeParts(final String topic, final Change<byte[]> serialChange) {
+        if (serialChange == null) {
+            return null;
+        }
+        final Deserializer<T> innerDeserializer = innerSerde().deserializer();
+
+        final T oldValue =
+            serialChange.oldValue == null ? null : innerDeserializer.deserialize(topic, serialChange.oldValue);
+        final T newValue =
+            serialChange.newValue == null ? null : innerDeserializer.deserialize(topic, serialChange.newValue);
+
+        return new Change<>(newValue, oldValue);
     }
 
-    @Override
-    public Deserializer<Change<T>> deserializer() {
-        final Deserializer<T> innerDeserializer = inner.deserializer();
-        return new Deserializer<Change<T>>() {
-            @Override
-            public void configure(final Map<String, ?> configs, final boolean isKey) {
-                innerDeserializer.configure(configs, isKey);
-            }
-
-            @Override
-            public Change<T> deserialize(final String topic, final byte[] data) {
-                if (data == null) {
-                    return null;
-                }
-                final ByteBuffer buffer = ByteBuffer.wrap(data);
-
-                final byte[] oldBytes = extractOldValuePart(buffer);
-                final T oldValue = oldBytes == null ? null : innerDeserializer.deserialize(topic, oldBytes);
-
-                final int newSize = buffer.getInt();
-                final byte[] newBytes = newSize == -1 ? null : new byte[newSize];
-                if (newBytes != null) {
-                    buffer.get(newBytes);
-                }
-                final T newValue = newBytes == null ? null : innerDeserializer.deserialize(topic, newBytes);
-                return new Change<>(newValue, oldValue);
-            }
-
-            @Override
-            public void close() {
-                innerDeserializer.close();
-            }
-        };
+    /**
+     * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
+     * so that we can produce the legacy format to test that we can still deserialize it.
+     */
+    public static byte[] composeLegacyFormat(final Change<byte[]> serialChange) {
+        if (serialChange == null) {
+            return null;
+        }
+
+        final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length;
+        final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length;
+
+        final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize));
+
+
+        buffer.putInt(oldSize);
+        if (serialChange.oldValue != null) {
+            buffer.put(serialChange.oldValue);
+        }
+
+        buffer.putInt(newSize);
+        if (serialChange.newValue != null) {
+            buffer.put(serialChange.newValue);
+        }
+        return buffer.array();
     }
 
-    public static byte[] extractOldValuePart(final ByteBuffer buffer) {
+    /**
+     * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still
+     * need to be able to read it (so that we can load the state store from previously-written changelog records).
+     */
+    public static Change<byte[]> decomposeLegacyFormat(final byte[] data) {
+        if (data == null) {
+            return null;
+        }
+        final ByteBuffer buffer = ByteBuffer.wrap(data);
+
         final int oldSize = buffer.getInt();
         final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize];
         if (oldBytes != null) {
             buffer.get(oldBytes);
         }
-        return oldBytes;
+
+        final int newSize = buffer.getInt();
+        final byte[] newBytes = newSize == -1 ? null : new byte[newSize];
+        if (newBytes != null) {
+            buffer.get(newBytes);
+        }
+
+        return new Change<>(newBytes, oldBytes);
     }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 1b22482..5662417 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -73,7 +73,7 @@ public class ProcessorRecordContext implements RecordContext {
         return headers;
     }
 
-    public long sizeBytes() {
+    public long residentMemorySizeEstimate() {
         long size = 0;
         size += Long.BYTES; // value.context.timestamp
         size += Long.BYTES; // value.context.offset
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
index 816894e..f1990c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
@@ -16,60 +16,133 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Objects;
 
 public final class BufferValue {
-    private final ContextualRecord record;
+    private static final int NULL_VALUE_SENTINEL = -1;
+    private static final int OLD_PREV_DUPLICATE_VALUE_SENTINEL = -2;
     private final byte[] priorValue;
+    private final byte[] oldValue;
+    private final byte[] newValue;
+    private final ProcessorRecordContext recordContext;
+
+    BufferValue(final byte[] priorValue, final byte[] oldValue, final byte[] newValue, final ProcessorRecordContext recordContext) {
+        this.oldValue = oldValue;
+        this.newValue = newValue;
+        this.recordContext = recordContext;
+
+        // This de-duplicates the prior and old references.
+        // If they were already the same reference, the comparison is trivially fast, so we don't specifically check
+        // for that case.
+        if (Arrays.equals(priorValue, oldValue)) {
+            this.priorValue = oldValue;
+        } else {
+            this.priorValue = priorValue;
+        }
+    }
 
-    BufferValue(final ContextualRecord record,
-                final byte[] priorValue) {
-        this.record = record;
-        this.priorValue = priorValue;
+    byte[] priorValue() {
+        return priorValue;
     }
 
-    ContextualRecord record() {
-        return record;
+    byte[] oldValue() {
+        return oldValue;
     }
 
-    byte[] priorValue() {
-        return priorValue;
+    byte[] newValue() {
+        return newValue;
+    }
+
+    ProcessorRecordContext context() {
+        return recordContext;
     }
 
     static BufferValue deserialize(final ByteBuffer buffer) {
-        final ContextualRecord record = ContextualRecord.deserialize(buffer);
+        final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);
+
+        final byte[] priorValue = extractValue(buffer);
+
+        final byte[] oldValue;
+        final int oldValueLength = buffer.getInt();
+        if (oldValueLength == NULL_VALUE_SENTINEL) {
+            oldValue = null;
+        } else if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
+            oldValue = priorValue;
+        } else {
+            oldValue = new byte[oldValueLength];
+            buffer.get(oldValue);
+        }
+
+        final byte[] newValue = extractValue(buffer);
+
+        return new BufferValue(priorValue, oldValue, newValue, context);
+    }
 
-        final int priorValueLength = buffer.getInt();
-        if (priorValueLength == -1) {
-            return new BufferValue(record, null);
+    private static byte[] extractValue(final ByteBuffer buffer) {
+        final int valueLength = buffer.getInt();
+        if (valueLength == NULL_VALUE_SENTINEL) {
+            return null;
         } else {
-            final byte[] priorValue = new byte[priorValueLength];
-            buffer.get(priorValue);
-            return new BufferValue(record, priorValue);
+            final byte[] value = new byte[valueLength];
+            buffer.get(value);
+            return value;
         }
     }
 
     ByteBuffer serialize(final int endPadding) {
 
-        final int sizeOfPriorValueLength = Integer.BYTES;
+        final int sizeOfValueLength = Integer.BYTES;
+
         final int sizeOfPriorValue = priorValue == null ? 0 : priorValue.length;
+        final int sizeOfOldValue = oldValue == null || priorValue == oldValue ? 0 : oldValue.length;
+        final int sizeOfNewValue = newValue == null ? 0 : newValue.length;
+
+        final byte[] serializedContext = recordContext.serialize();
+
+        final ByteBuffer buffer = ByteBuffer.allocate(
+            serializedContext.length
+                + sizeOfValueLength + sizeOfPriorValue
+                + sizeOfValueLength + sizeOfOldValue
+                + sizeOfValueLength + sizeOfNewValue
+                + endPadding
+        );
 
-        final ByteBuffer buffer = record.serialize(sizeOfPriorValueLength + sizeOfPriorValue + endPadding);
+        buffer.put(serializedContext);
 
-        if (priorValue == null) {
-            buffer.putInt(-1);
+        addValue(buffer, priorValue);
+
+        if (oldValue == null) {
+            buffer.putInt(NULL_VALUE_SENTINEL);
+        } else if (priorValue == oldValue) {
+            buffer.putInt(OLD_PREV_DUPLICATE_VALUE_SENTINEL);
         } else {
-            buffer.putInt(priorValue.length);
-            buffer.put(priorValue);
+            buffer.putInt(sizeOfOldValue);
+            buffer.put(oldValue);
         }
 
+        addValue(buffer, newValue);
+
         return buffer;
     }
 
-    long sizeBytes() {
-        return (priorValue == null ? 0 : priorValue.length) + record.sizeBytes();
+    private static void addValue(final ByteBuffer buffer, final byte[] value) {
+        if (value == null) {
+            buffer.putInt(NULL_VALUE_SENTINEL);
+        } else {
+            buffer.putInt(value.length);
+            buffer.put(value);
+        }
+    }
+
+    long residentMemorySizeEstimate() {
+        return (priorValue == null ? 0 : priorValue.length)
+            + (oldValue == null || priorValue == oldValue ? 0 : oldValue.length)
+            + (newValue == null ? 0 : newValue.length)
+            + recordContext.residentMemorySizeEstimate();
     }
 
     @Override
@@ -77,22 +150,28 @@ public final class BufferValue {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         final BufferValue that = (BufferValue) o;
-        return Objects.equals(record, that.record) &&
-            Arrays.equals(priorValue, that.priorValue);
+        return Arrays.equals(priorValue, that.priorValue) &&
+            Arrays.equals(oldValue, that.oldValue) &&
+            Arrays.equals(newValue, that.newValue) &&
+            Objects.equals(recordContext, that.recordContext);
     }
 
     @Override
     public int hashCode() {
-        int result = Objects.hash(record);
+        int result = Objects.hash(recordContext);
         result = 31 * result + Arrays.hashCode(priorValue);
+        result = 31 * result + Arrays.hashCode(oldValue);
+        result = 31 * result + Arrays.hashCode(newValue);
         return result;
     }
 
     @Override
     public String toString() {
         return "BufferValue{" +
-            "record=" + record +
-            ", priorValue=" + Arrays.toString(priorValue) +
+            "priorValue=" + Arrays.toString(priorValue) +
+            ", oldValue=" + Arrays.toString(oldValue) +
+            ", newValue=" + Arrays.toString(newValue) +
+            ", recordContext=" + recordContext +
             '}';
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
index 3cd2c37..3c24f52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
@@ -39,8 +39,8 @@ public class ContextualRecord {
         return value;
     }
 
-    long sizeBytes() {
-        return (value == null ? 0 : value.length) + recordContext.sizeBytes();
+    long residentMemorySizeEstimate() {
+        return (value == null ? 0 : value.length) + recordContext.residentMemorySizeEstimate();
     }
 
     ByteBuffer serialize(final int endPadding) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 6c5022f..6c6ef36 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -159,7 +159,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
         this.storeName = storeName;
         this.loggingEnabled = loggingEnabled;
         this.keySerde = keySerde;
-        this.valueSerde = FullChangeSerde.castOrWrap(valueSerde);
+        this.valueSerde = FullChangeSerde.wrap(valueSerde);
     }
 
     @Override
@@ -176,7 +176,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
     @Override
     public void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde) {
         this.keySerde = this.keySerde == null ? keySerde : this.keySerde;
-        this.valueSerde = this.valueSerde == null ? FullChangeSerde.castOrWrap(valueSerde) : this.valueSerde;
+        this.valueSerde = this.valueSerde == null ? FullChangeSerde.wrap(valueSerde) : this.valueSerde;
     }
 
     @Override
@@ -296,21 +296,26 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
                     final byte[] changelogValue = new byte[record.value().length - 8];
                     timeAndValue.get(changelogValue);
 
+                    final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(changelogValue));
+
+                    final ProcessorRecordContext recordContext = new ProcessorRecordContext(
+                        record.timestamp(),
+                        record.offset(),
+                        record.partition(),
+                        record.topic(),
+                        record.headers()
+                    );
+
                     cleanPut(
                         time,
                         key,
                         new BufferValue(
-                            new ContextualRecord(
-                                changelogValue,
-                                new ProcessorRecordContext(
-                                    record.timestamp(),
-                                    record.offset(),
-                                    record.partition(),
-                                    record.topic(),
-                                    record.headers()
-                                )
-                            ),
-                            inferPriorValue(key, changelogValue)
+                            index.containsKey(key)
+                                ? internalPriorValueForBuffered(key)
+                                : change.oldValue,
+                            change.oldValue,
+                            change.newValue,
+                            recordContext
                         )
                     );
                 } else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
@@ -321,7 +326,20 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
                     timeAndValue.get(changelogValue);
 
                     final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
-                    cleanPut(time, key, new BufferValue(contextualRecord, inferPriorValue(key, contextualRecord.value())));
+                    final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(contextualRecord.value()));
+
+                    cleanPut(
+                        time,
+                        key,
+                        new BufferValue(
+                            index.containsKey(key)
+                                ? internalPriorValueForBuffered(key)
+                                : change.oldValue,
+                            change.oldValue,
+                            change.newValue,
+                            contextualRecord.recordContext()
+                        )
+                    );
                 } else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
                     // in this case, the changelog value is a serialized BufferValue
 
@@ -346,13 +364,6 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
         updateBufferMetrics();
     }
 
-    private byte[] inferPriorValue(final Bytes key, final byte[] serializedChange) {
-        return index.containsKey(key)
-            ? internalPriorValueForBuffered(key)
-            : FullChangeSerde.extractOldValuePart(ByteBuffer.wrap(serializedChange));
-    }
-
-
     @Override
     public void evictWhile(final Supplier<Boolean> predicate,
                            final Consumer<Eviction<K, V>> callback) {
@@ -375,9 +386,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
                 }
                 final K key = keySerde.deserializer().deserialize(changelogTopic, next.getKey().key().get());
                 final BufferValue bufferValue = next.getValue();
-                final ContextualRecord record = bufferValue.record();
-                final Change<V> value = valueSerde.deserializer().deserialize(changelogTopic, record.value());
-                callback.accept(new Eviction<>(key, value, record.recordContext()));
+                final Change<V> value = valueSerde.deserializeParts(
+                    changelogTopic,
+                    new Change<>(bufferValue.newValue(), bufferValue.oldValue())
+                );
+                callback.accept(new Eviction<>(key, value, bufferValue.context()));
 
                 delegate.remove();
                 index.remove(next.getKey().key());
@@ -442,7 +455,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
         requireNonNull(recordContext, "recordContext cannot be null");
 
         final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
-        final byte[] serializedValue = valueSerde.serializer().serialize(changelogTopic, value);
+        final Change<byte[]> serialChange = valueSerde.serializeParts(changelogTopic, value);
 
         final BufferValue buffered = getBuffered(serializedKey);
         final byte[] serializedPriorValue;
@@ -453,7 +466,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
             serializedPriorValue = buffered.priorValue();
         }
 
-        cleanPut(time, serializedKey, new BufferValue(new ContextualRecord(serializedValue, recordContext), serializedPriorValue));
+        cleanPut(
+            time,
+            serializedKey,
+            new BufferValue(serializedPriorValue, serialChange.oldValue, serialChange.newValue, recordContext)
+        );
         dirtyKeys.add(serializedKey);
         updateBufferMetrics();
     }
@@ -504,7 +521,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
         size += 8; // buffer time
         size += key.get().length;
         if (value != null) {
-            size += value.sizeBytes();
+            size += value.residentMemorySizeEstimate();
         }
         return size;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index f454862..0f1a1ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -50,7 +50,7 @@ class LRUCacheEntry {
 
         this.isDirty = isDirty;
         this.sizeBytes = 1 + // isDirty
-            record.sizeBytes();
+            record.residentMemorySizeEstimate();
     }
 
     void markClean() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
index ddba05e..97e6c06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
@@ -16,146 +16,74 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.easymock.EasyMock;
 import org.junit.Test;
 
-import static java.util.Collections.emptyMap;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
 public class FullChangeSerdeTest {
-    private final FullChangeSerde<String> serde = FullChangeSerde.castOrWrap(Serdes.String());
+    private final FullChangeSerde<String> serde = FullChangeSerde.wrap(Serdes.String());
 
     @Test
     public void shouldRoundTripNull() {
-        final byte[] serialized = serde.serializer().serialize(null, null);
-        assertThat(
-            serde.deserializer().deserialize(null, serialized),
-            nullValue()
-        );
+        assertThat(serde.serializeParts(null, null), nullValue());
+        assertThat(FullChangeSerde.composeLegacyFormat(null), nullValue());
+        assertThat(FullChangeSerde.decomposeLegacyFormat(null), nullValue());
+        assertThat(serde.deserializeParts(null, null), nullValue());
     }
 
 
     @Test
     public void shouldRoundTripNullChange() {
-        final byte[] serialized = serde.serializer().serialize(null, new Change<>(null, null));
         assertThat(
-            serde.deserializer().deserialize(null, serialized),
-            is(new Change<>(null, null))
+            serde.serializeParts(null, new Change<>(null, null)),
+            is(new Change<byte[]>(null, null))
+        );
+
+        assertThat(
+            serde.deserializeParts(null, new Change<>(null, null)),
+            is(new Change<String>(null, null))
+        );
+
+        final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(new Change<>(null, null));
+        assertThat(
+            FullChangeSerde.decomposeLegacyFormat(legacyFormat),
+            is(new Change<byte[]>(null, null))
         );
     }
 
     @Test
     public void shouldRoundTripOldNull() {
-        final byte[] serialized = serde.serializer().serialize(null, new Change<>("new", null));
+        final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", null));
+        final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
+        final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
         assertThat(
-            serde.deserializer().deserialize(null, serialized),
+            serde.deserializeParts(null, decomposedLegacyFormat),
             is(new Change<>("new", null))
         );
     }
 
     @Test
     public void shouldRoundTripNewNull() {
-        final byte[] serialized = serde.serializer().serialize(null, new Change<>(null, "old"));
+        final Change<byte[]> serialized = serde.serializeParts(null, new Change<>(null, "old"));
+        final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
+        final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
         assertThat(
-            serde.deserializer().deserialize(null, serialized),
+            serde.deserializeParts(null, decomposedLegacyFormat),
             is(new Change<>(null, "old"))
         );
     }
 
     @Test
     public void shouldRoundTripChange() {
-        final byte[] serialized = serde.serializer().serialize(null, new Change<>("new", "old"));
+        final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", "old"));
+        final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
+        final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
         assertThat(
-            serde.deserializer().deserialize(null, serialized),
+            serde.deserializeParts(null, decomposedLegacyFormat),
             is(new Change<>("new", "old"))
         );
     }
-
-    @Test
-    public void shouldConfigureSerde() {
-        final Serde<Void> mock = EasyMock.mock(Serde.class);
-        mock.configure(emptyMap(), false);
-        EasyMock.expectLastCall();
-        EasyMock.replay(mock);
-        final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock);
-        serde.configure(emptyMap(), false);
-        EasyMock.verify(mock);
-    }
-
-    @Test
-    public void shouldCloseSerde() {
-        final Serde<Void> mock = EasyMock.mock(Serde.class);
-        mock.close();
-        EasyMock.expectLastCall();
-        EasyMock.replay(mock);
-        final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock);
-        serde.close();
-        EasyMock.verify(mock);
-    }
-
-    @Test
-    public void shouldConfigureSerializer() {
-        final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
-        final Serializer<Void> mockSerializer = EasyMock.mock(Serializer.class);
-        EasyMock.expect(mockSerde.serializer()).andReturn(mockSerializer);
-        EasyMock.replay(mockSerde);
-        mockSerializer.configure(emptyMap(), false);
-        EasyMock.expectLastCall();
-        EasyMock.replay(mockSerializer);
-        final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer();
-        serializer.configure(emptyMap(), false);
-        EasyMock.verify(mockSerde);
-        EasyMock.verify(mockSerializer);
-    }
-
-    @Test
-    public void shouldCloseSerializer() {
-        final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
-        final Serializer<Void> mockSerializer = EasyMock.mock(Serializer.class);
-        EasyMock.expect(mockSerde.serializer()).andReturn(mockSerializer);
-        EasyMock.replay(mockSerde);
-        mockSerializer.close();
-        EasyMock.expectLastCall();
-        EasyMock.replay(mockSerializer);
-        final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer();
-        serializer.close();
-        EasyMock.verify(mockSerde);
-        EasyMock.verify(mockSerializer);
-    }
-
-    @Test
-    public void shouldConfigureDeserializer() {
-        final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
-        final Deserializer<Void> mockDeserializer = EasyMock.mock(Deserializer.class);
-        EasyMock.expect(mockSerde.deserializer()).andReturn(mockDeserializer);
-        EasyMock.replay(mockSerde);
-        mockDeserializer.configure(emptyMap(), false);
-        EasyMock.expectLastCall();
-        EasyMock.replay(mockDeserializer);
-        final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer();
-        serializer.configure(emptyMap(), false);
-        EasyMock.verify(mockSerde);
-        EasyMock.verify(mockDeserializer);
-    }
-
-    @Test
-    public void shouldCloseDeserializer() {
-        final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
-        final Deserializer<Void> mockDeserializer = EasyMock.mock(Deserializer.class);
-        EasyMock.expect(mockSerde.deserializer()).andReturn(mockDeserializer);
-        EasyMock.replay(mockSerde);
-        mockDeserializer.close();
-        EasyMock.expectLastCall();
-        EasyMock.replay(mockDeserializer);
-        final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer();
-        serializer.close();
-        EasyMock.verify(mockSerde);
-        EasyMock.verify(mockDeserializer);
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 96ee735..ef46663 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.internals.Change;
-import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateStore;
@@ -139,7 +138,7 @@ public class KTableSuppressProcessorMetricsTest {
 
         final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(
             storeName, Serdes.String(),
-            FullChangeSerde.castOrWrap(Serdes.Long())
+            Serdes.Long()
         )
             .withLoggingDisabled()
             .build();
@@ -169,9 +168,9 @@ public class KTableSuppressProcessorMetricsTest {
 
             verifyMetric(metrics, EVICTION_RATE_METRIC, is(0.0));
             verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(0.0));
-            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(29.5));
-            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(59.0));
-            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(59.0));
+            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(21.5));
+            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(43.0));
+            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(43.0));
             verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(0.5));
             verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
             verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(1.0));
@@ -185,9 +184,9 @@ public class KTableSuppressProcessorMetricsTest {
 
             verifyMetric(metrics, EVICTION_RATE_METRIC, greaterThan(0.0));
             verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(1.0));
-            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(57.0));
-            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(55.0));
-            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(114.0));
+            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(41.0));
+            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(39.0));
+            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(82.0));
             verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(1.0));
             verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
             verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(2.0));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index d8cb858..1d1d6fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
 import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.Change;
-import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
@@ -75,7 +74,7 @@ public class KTableSuppressProcessorTest {
 
             final String storeName = "test-store";
 
-            final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valueSerde))
+            final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, valueSerde)
                 .withLoggingDisabled()
                 .build();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java
index 3aef6d0..a323b9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java
@@ -19,8 +19,10 @@ package org.apache.kafka.streams.kstream.internals.suppress;
 import org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest;
 import org.apache.kafka.streams.integration.SuppressionIntegrationTest;
 import org.apache.kafka.streams.kstream.SuppressedTest;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest;
 import org.apache.kafka.streams.kstream.internals.SuppressScenarioTest;
 import org.apache.kafka.streams.kstream.internals.SuppressTopologyTest;
+import org.apache.kafka.streams.state.internals.BufferValueTest;
 import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBufferTest;
 import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferTest;
 import org.junit.runner.RunWith;
@@ -30,21 +32,25 @@ import org.junit.runners.Suite;
  * This suite runs all the tests related to the Suppression feature.
  *
  * It can be used from an IDE to selectively just run these tests when developing code related to Suppress.
- * 
+ *
  * If desired, it can also be added to a Gradle build task, although this isn't strictly necessary, since all
  * these tests are already included in the `:streams:test` task.
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
+    BufferValueTest.class,
     KTableSuppressProcessorMetricsTest.class,
     KTableSuppressProcessorTest.class,
     SuppressScenarioTest.class,
     SuppressTopologyTest.class,
     SuppressedTest.class,
-    SuppressionIntegrationTest.class,
-    SuppressionDurabilityIntegrationTest.class,
     InMemoryTimeOrderedKeyValueBufferTest.class,
-    TimeOrderedKeyValueBufferTest.class
+    TimeOrderedKeyValueBufferTest.class,
+    FullChangeSerdeTest.class,
+    SuppressionIntegrationTest.class,
+    SuppressionDurabilityIntegrationTest.class
 })
 public class SuppressSuite {
 }
+
+
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
index 1ea646f..83ab127 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
@@ -37,7 +37,7 @@ public class ProcessorRecordContextTest {
             null
         );
 
-        assertEquals(MIN_SIZE, context.sizeBytes());
+        assertEquals(MIN_SIZE, context.residentMemorySizeEstimate());
     }
 
     @Test
@@ -50,7 +50,7 @@ public class ProcessorRecordContextTest {
             new RecordHeaders()
         );
 
-        assertEquals(MIN_SIZE, context.sizeBytes());
+        assertEquals(MIN_SIZE, context.residentMemorySizeEstimate());
     }
 
     @Test
@@ -63,7 +63,7 @@ public class ProcessorRecordContextTest {
             null
         );
 
-        assertEquals(MIN_SIZE + 5L, context.sizeBytes());
+        assertEquals(MIN_SIZE + 5L, context.residentMemorySizeEstimate());
     }
 
     @Test
@@ -78,7 +78,7 @@ public class ProcessorRecordContextTest {
             headers
         );
 
-        assertEquals(MIN_SIZE + 10L + 12L, context.sizeBytes());
+        assertEquals(MIN_SIZE + 10L + 12L, context.residentMemorySizeEstimate());
     }
 
     @Test
@@ -93,6 +93,6 @@ public class ProcessorRecordContextTest {
             headers
         );
 
-        assertEquals(MIN_SIZE + 10L, context.sizeBytes());
+        assertEquals(MIN_SIZE + 10L, context.residentMemorySizeEstimate());
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
new file mode 100644
index 0000000..d663461
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+public class BufferValueTest {
+    @Test
+    public void shouldDeduplicateNullValues() {
+        final BufferValue bufferValue = new BufferValue(null, null, null, null);
+        assertSame(bufferValue.priorValue(), bufferValue.oldValue());
+    }
+
+    @Test
+    public void shouldDeduplicateIndenticalValues() {
+        final byte[] bytes = {(byte) 0};
+        final BufferValue bufferValue = new BufferValue(bytes, bytes, null, null);
+        assertSame(bufferValue.priorValue(), bufferValue.oldValue());
+    }
+
+    @Test
+    public void shouldDeduplicateEqualValues() {
+        final BufferValue bufferValue = new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 0}, null, null);
+        assertSame(bufferValue.priorValue(), bufferValue.oldValue());
+    }
+
+    @Test
+    public void shouldStoreDifferentValues() {
+        final byte[] priorValue = {(byte) 0};
+        final byte[] oldValue = {(byte) 1};
+        final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
+        assertSame(priorValue, bufferValue.priorValue());
+        assertSame(oldValue, bufferValue.oldValue());
+    }
+
+    @Test
+    public void shouldStoreDifferentValuesWithPriorNull() {
+        final byte[] priorValue = null;
+        final byte[] oldValue = {(byte) 1};
+        final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
+        assertNull(bufferValue.priorValue());
+        assertSame(oldValue, bufferValue.oldValue());
+    }
+
+    @Test
+    public void shouldStoreDifferentValuesWithOldNull() {
+        final byte[] priorValue = {(byte) 0};
+        final byte[] oldValue = null;
+        final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
+        assertSame(priorValue, bufferValue.priorValue());
+        assertNull(bufferValue.oldValue());
+    }
+
+    @Test
+    public void shouldAccountForDeduplicationInSizeEstimate() {
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        assertEquals(25L, new BufferValue(null, null, null, context).residentMemorySizeEstimate());
+        assertEquals(26L, new BufferValue(new byte[] {(byte) 0}, null, null, context).residentMemorySizeEstimate());
+        assertEquals(26L, new BufferValue(null, new byte[] {(byte) 0}, null, context).residentMemorySizeEstimate());
+        assertEquals(26L, new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 0}, null, context).residentMemorySizeEstimate());
+        assertEquals(27L, new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 1}, null, context).residentMemorySizeEstimate());
+
+        // new value should get counted, but doesn't get deduplicated
+        assertEquals(28L, new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 1}, new byte[] {(byte) 0}, context).residentMemorySizeEstimate());
+    }
+
+    @Test
+    public void shouldSerializeNulls() {
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final byte[] serializedContext = context.serialize();
+        final byte[] bytes = new BufferValue(null, null, null, context).serialize(0).array();
+        final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
+
+        assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3).putInt(-1).putInt(-1).putInt(-1).array()));
+    }
+
+    @Test
+    public void shouldSerializePrior() {
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final byte[] serializedContext = context.serialize();
+        final byte[] priorValue = {(byte) 5};
+        final byte[] bytes = new BufferValue(priorValue, null, null, context).serialize(0).array();
+        final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
+
+        assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(1).put(priorValue).putInt(-1).putInt(-1).array()));
+    }
+
+    @Test
+    public void shouldSerializeOld() {
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final byte[] serializedContext = context.serialize();
+        final byte[] oldValue = {(byte) 5};
+        final byte[] bytes = new BufferValue(null, oldValue, null, context).serialize(0).array();
+        final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
+
+        assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(-1).putInt(1).put(oldValue).putInt(-1).array()));
+    }
+
+    @Test
+    public void shouldSerializeNew() {
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final byte[] serializedContext = context.serialize();
+        final byte[] newValue = {(byte) 5};
+        final byte[] bytes = new BufferValue(null, null, newValue, context).serialize(0).array();
+        final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
+
+        assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(-1).putInt(-1).putInt(1).put(newValue).array()));
+    }
+
+    @Test
+    public void shouldCompactDuplicates() {
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final byte[] serializedContext = context.serialize();
+        final byte[] duplicate = {(byte) 5};
+        final byte[] bytes = new BufferValue(duplicate, duplicate, null, context).serialize(0).array();
+        final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
+
+        assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(1).put(duplicate).putInt(-2).putInt(-1).array()));
+    }
+
+    @Test
+    public void shouldDeserializePrior() {
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final byte[] serializedContext = context.serialize();
+        final byte[] priorValue = {(byte) 5};
+        final ByteBuffer serialValue =
+            ByteBuffer
+                .allocate(serializedContext.length + Integer.BYTES * 3 + priorValue.length)
+                .put(serializedContext).putInt(1).put(priorValue).putInt(-1).putInt(-1);
+        serialValue.position(0);
+
+        final BufferValue deserialize = BufferValue.deserialize(serialValue);
+        assertThat(deserialize, is(new BufferValue(priorValue, null, null, context)));
+    }
+
+    @Test
+    public void shouldDeserializeOld() {
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final byte[] serializedContext = context.serialize();
+        final byte[] oldValue = {(byte) 5};
+        final ByteBuffer serialValue =
+            ByteBuffer
+                .allocate(serializedContext.length + Integer.BYTES * 3 + oldValue.length)
+                .put(serializedContext).putInt(-1).putInt(1).put(oldValue).putInt(-1);
+        serialValue.position(0);
+
+        assertThat(BufferValue.deserialize(serialValue), is(new BufferValue(null, oldValue, null, context)));
+    }
+
+    @Test
+    public void shouldDeserializeNew() {
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final byte[] serializedContext = context.serialize();
+        final byte[] newValue = {(byte) 5};
+        final ByteBuffer serialValue =
+            ByteBuffer
+                .allocate(serializedContext.length + Integer.BYTES * 3 + newValue.length)
+                .put(serializedContext).putInt(-1).putInt(-1).putInt(1).put(newValue);
+        serialValue.position(0);
+
+        assertThat(BufferValue.deserialize(serialValue), is(new BufferValue(null, null, newValue, context)));
+    }
+
+    @Test
+    public void shouldDeserializeCompactedDuplicates() {
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final byte[] serializedContext = context.serialize();
+        final byte[] duplicate = {(byte) 5};
+        final ByteBuffer serialValue =
+            ByteBuffer
+                .allocate(serializedContext.length + Integer.BYTES * 3 + duplicate.length)
+                .put(serializedContext).putInt(1).put(duplicate).putInt(-2).putInt(-1);
+        serialValue.position(0);
+
+        final BufferValue bufferValue = BufferValue.deserialize(serialValue);
+        assertThat(bufferValue, is(new BufferValue(duplicate, duplicate, null, context)));
+        assertSame(bufferValue.priorValue(), bufferValue.oldValue());
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 941832b..5c9cbf9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -190,11 +189,11 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "23roni");
-        assertThat(buffer.bufferSize(), is(51L));
+        assertThat(buffer.bufferSize(), is(43L));
         putRecord(buffer, context, 1L, 0L, "asdf", "3l");
-        assertThat(buffer.bufferSize(), is(47L));
+        assertThat(buffer.bufferSize(), is(39L));
         putRecord(buffer, context, 0L, 0L, "zxcv", "qfowin");
-        assertThat(buffer.bufferSize(), is(98L));
+        assertThat(buffer.bufferSize(), is(82L));
         cleanup(context, buffer);
     }
 
@@ -218,12 +217,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4");
         assertThat(buffer.numRecords(), is(1));
-        assertThat(buffer.bufferSize(), is(50L));
+        assertThat(buffer.bufferSize(), is(42L));
         assertThat(buffer.minTimestamp(), is(1L));
 
         putRecord(buffer, context, 0L, 0L, "asdf", "3ng");
         assertThat(buffer.numRecords(), is(2));
-        assertThat(buffer.bufferSize(), is(98L));
+        assertThat(buffer.bufferSize(), is(82L));
         assertThat(buffer.minTimestamp(), is(0L));
 
         final AtomicInteger callbackCount = new AtomicInteger(0);
@@ -232,14 +231,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                 case 1: {
                     assertThat(kv.key(), is("asdf"));
                     assertThat(buffer.numRecords(), is(2));
-                    assertThat(buffer.bufferSize(), is(98L));
+                    assertThat(buffer.bufferSize(), is(82L));
                     assertThat(buffer.minTimestamp(), is(0L));
                     break;
                 }
                 case 2: {
                     assertThat(kv.key(), is("zxcv"));
                     assertThat(buffer.numRecords(), is(1));
-                    assertThat(buffer.bufferSize(), is(50L));
+                    assertThat(buffer.bufferSize(), is(42L));
                     assertThat(buffer.minTimestamp(), is(1L));
                     break;
                 }
@@ -361,12 +360,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
 
-        final Serializer<Change<String>> serializer = FullChangeSerde.castOrWrap(Serdes.String()).serializer();
+        final FullChangeSerde<String> serializer = FullChangeSerde.wrap(Serdes.String());
 
-        final byte[] todeleteValue = serializer.serialize(null, new Change<>("doomed", null));
-        final byte[] asdfValue = serializer.serialize(null, new Change<>("qwer", null));
-        final byte[] zxcvValue1 = serializer.serialize(null, new Change<>("eo4im", "previous"));
-        final byte[] zxcvValue2 = serializer.serialize(null, new Change<>("next", "eo4im"));
+        final byte[] todeleteValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("doomed", null)));
+        final byte[] asdfValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("qwer", null)));
+        final byte[] zxcvValue1 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("eo4im", "previous")));
+        final byte[] zxcvValue2 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("next", "eo4im")));
         stateRestoreCallback.restoreBatch(asList(
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -412,7 +411,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.numRecords(), is(3));
         assertThat(buffer.minTimestamp(), is(0L));
-        assertThat(buffer.bufferSize(), is(196L));
+        assertThat(buffer.bufferSize(), is(172L));
 
         stateRestoreCallback.restoreBatch(singletonList(
             new ConsumerRecord<>("changelog-topic",
@@ -429,7 +428,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.numRecords(), is(2));
         assertThat(buffer.minTimestamp(), is(1L));
-        assertThat(buffer.bufferSize(), is(131L));
+        assertThat(buffer.bufferSize(), is(115L));
 
         assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
         assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
@@ -477,14 +476,13 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         final byte[] todeleteValue = getContextualRecord("doomed", 0).serialize(0).array();
         final byte[] asdfValue = getContextualRecord("qwer", 1).serialize(0).array();
-        final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String());
+        final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
         final byte[] zxcvValue1 = new ContextualRecord(
-            fullChangeSerde.serializer().serialize(null, new Change<>("3o4im", "previous")),
+            FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))),
             getContext(2L)
         ).serialize(0).array();
-        final FullChangeSerde<String> fullChangeSerde1 = FullChangeSerde.castOrWrap(Serdes.String());
         final byte[] zxcvValue2 = new ContextualRecord(
-            fullChangeSerde1.serializer().serialize(null, new Change<>("next", "3o4im")),
+            FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))),
             getContext(3L)
         ).serialize(0).array();
         stateRestoreCallback.restoreBatch(asList(
@@ -536,7 +534,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.numRecords(), is(3));
         assertThat(buffer.minTimestamp(), is(0L));
-        assertThat(buffer.bufferSize(), is(166L));
+        assertThat(buffer.bufferSize(), is(142L));
 
         stateRestoreCallback.restoreBatch(singletonList(
             new ConsumerRecord<>("changelog-topic",
@@ -553,7 +551,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.numRecords(), is(2));
         assertThat(buffer.minTimestamp(), is(1L));
-        assertThat(buffer.bufferSize(), is(111L));
+        assertThat(buffer.bufferSize(), is(95L));
 
         assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
         assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
@@ -601,23 +599,21 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         final byte[] todeleteValue = getBufferValue("doomed", 0).serialize(0).array();
         final byte[] asdfValue = getBufferValue("qwer", 1).serialize(0).array();
-        final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String());
+        final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
         final byte[] zxcvValue1 =
             new BufferValue(
-                new ContextualRecord(
-                    fullChangeSerde.serializer().serialize(null, new Change<>("3o4im", "IGNORED")),
-                    getContext(2L)
-                ),
-                Serdes.String().serializer().serialize(null, "previous")
+                Serdes.String().serializer().serialize(null, "previous"),
+                Serdes.String().serializer().serialize(null, "IGNORED"),
+                Serdes.String().serializer().serialize(null, "3o4im"),
+                getContext(2L)
             ).serialize(0).array();
-        final FullChangeSerde<String> fullChangeSerde1 = FullChangeSerde.castOrWrap(Serdes.String());
+        final FullChangeSerde<String> fullChangeSerde1 = FullChangeSerde.wrap(Serdes.String());
         final byte[] zxcvValue2 =
             new BufferValue(
-                new ContextualRecord(
-                    fullChangeSerde1.serializer().serialize(null, new Change<>("next", "3o4im")),
-                    getContext(3L)
-                ),
-                Serdes.String().serializer().serialize(null, "previous")
+                Serdes.String().serializer().serialize(null, "previous"),
+                Serdes.String().serializer().serialize(null, "3o4im"),
+                Serdes.String().serializer().serialize(null, "next"),
+                getContext(3L)
             ).serialize(0).array();
         stateRestoreCallback.restoreBatch(asList(
             new ConsumerRecord<>("changelog-topic",
@@ -668,7 +664,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.numRecords(), is(3));
         assertThat(buffer.minTimestamp(), is(0L));
-        assertThat(buffer.bufferSize(), is(166L));
+        assertThat(buffer.bufferSize(), is(142L));
 
         stateRestoreCallback.restoreBatch(singletonList(
             new ConsumerRecord<>("changelog-topic",
@@ -685,7 +681,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.numRecords(), is(2));
         assertThat(buffer.minTimestamp(), is(1L));
-        assertThat(buffer.bufferSize(), is(111L));
+        assertThat(buffer.bufferSize(), is(95L));
 
         assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
         assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
@@ -766,14 +762,18 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     }
 
     private static BufferValue getBufferValue(final String value, final long timestamp) {
-        final ContextualRecord contextualRecord = getContextualRecord(value, timestamp);
-        return new BufferValue(contextualRecord, null);
+        return new BufferValue(
+            null,
+            null,
+            Serdes.String().serializer().serialize(null, value),
+            getContext(timestamp)
+        );
     }
 
     private static ContextualRecord getContextualRecord(final String value, final long timestamp) {
-        final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String());
+        final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
         return new ContextualRecord(
-            fullChangeSerde.serializer().serialize(null, new Change<>(value, null)),
+            FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>(value, null))),
             getContext(timestamp)
         );
     }