You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/06/27 03:20:54 UTC

[kafka] branch 2.5 updated: KAFKA-10173: Fix suppress changelog binary schema compatibility (#8905)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 1d07fb8  KAFKA-10173: Fix suppress changelog binary schema compatibility (#8905)
1d07fb8 is described below

commit 1d07fb8c583e24deb09b941cf9e9d5a493100fe6
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Jun 26 21:41:51 2020 -0500

    KAFKA-10173: Fix suppress changelog binary schema compatibility (#8905)
    
    We inadvertently changed the binary schema of the suppress buffer changelog
    in 2.4.0 without bumping the schema version number. As a result, it is impossible
    to upgrade from 2.3.x to 2.4+ if you are using suppression.
    
    * Refactor the schema compatibility test to use serialized data from older versions
    as a more foolproof compatibility test.
    * Refactor the upgrade system test to use the smoke test application so that we
    actually exercise a significant portion of the Streams API during upgrade testing
    * Add more recent versions to the upgrade system test matrix
    * Fix the compatibility bug by bumping the schema version to 3
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Guozhang Wang <wa...@gmail.com>
---
 .../java/org/apache/kafka/common/utils/Utils.java  |  32 ++
 .../org/apache/kafka/common/utils/UtilsTest.java   |  73 +++-
 .../streams/kstream/internals/FullChangeSerde.java |  43 +--
 .../internals/ProcessorRecordContext.java          |  23 +-
 .../kafka/streams/state/internals/BufferValue.java |  27 +-
 .../streams/state/internals/ContextualRecord.java  |  32 +-
 .../InMemoryTimeOrderedKeyValueBuffer.java         | 155 ++++-----
 ...yValueBufferChangelogDeserializationHelper.java | 158 +++++++++
 .../kstream/internals/FullChangeSerdeTest.java     |  39 ++-
 .../internals/TimeOrderedKeyValueBufferTest.java   | 366 +++++++++++++++++----
 10 files changed, 690 insertions(+), 258 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index e2160d9..548d40c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -30,6 +30,7 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
@@ -277,6 +278,37 @@ public final class Utils {
     }
 
     /**
+     * Starting from the current position, read an integer indicating the size of the byte array to read,
+     * then read the array. Consumes the buffer: upon returning, the buffer's position is after the array
+     * that is returned.
+     * @param buffer The buffer to read a size-prefixed array from
+     * @return The array
+     */
+    public static byte[] getNullableSizePrefixedArray(final ByteBuffer buffer) {
+        final int size = buffer.getInt();
+        return getNullableArray(buffer, size);
+    }
+
+    /**
+     * Read a byte array of the given size. Consumes the buffer: upon returning, the buffer's position
+     * is after the array that is returned.
+     * @param buffer The buffer to read a size-prefixed array from
+     * @param size The number of bytes to read out of the buffer
+     * @return The array
+     */
+    public static byte[] getNullableArray(final ByteBuffer buffer, final int size) {
+        if (size > buffer.remaining()) {
+            // preemptively throw this when the read is doomed to fail, so we don't have to allocate the array.
+            throw new BufferUnderflowException();
+        }
+        final byte[] oldBytes = size == -1 ? null : new byte[size];
+        if (oldBytes != null) {
+            buffer.get(oldBytes);
+        }
+        return oldBytes;
+    }
+
+    /**
      * Returns a copy of src byte array
      * @param src The byte array to copy
      * @return The copy
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 983232a..d4785d0 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
+import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
@@ -51,6 +52,8 @@ import static org.apache.kafka.common.utils.Utils.validHostPattern;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -70,7 +73,7 @@ public class UtilsTest {
         cases.put("a-little-bit-long-string".getBytes(), -985981536);
         cases.put("a-little-bit-longer-string".getBytes(), -1486304829);
         cases.put("lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8".getBytes(), -58897971);
-        cases.put(new byte[]{'a', 'b', 'c'}, 479470107);
+        cases.put(new byte[] {'a', 'b', 'c'}, 479470107);
 
         for (Map.Entry c : cases.entrySet()) {
             assertEquals((int) c.getValue(), murmur2((byte[]) c.getKey()));
@@ -204,6 +207,65 @@ public class UtilsTest {
     }
 
     @Test
+    public void getNullableSizePrefixedArrayExact() {
+        byte[] input = {0, 0, 0, 2, 1, 0};
+        final ByteBuffer buffer = ByteBuffer.wrap(input);
+        final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
+        assertArrayEquals(new byte[] {1, 0}, array);
+        assertEquals(6, buffer.position());
+        assertFalse(buffer.hasRemaining());
+    }
+
+    @Test
+    public void getNullableSizePrefixedArrayExactEmpty() {
+        byte[] input = {0, 0, 0, 0};
+        final ByteBuffer buffer = ByteBuffer.wrap(input);
+        final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
+        assertArrayEquals(new byte[] {}, array);
+        assertEquals(4, buffer.position());
+        assertFalse(buffer.hasRemaining());
+    }
+
+    @Test
+    public void getNullableSizePrefixedArrayRemainder() {
+        byte[] input = {0, 0, 0, 2, 1, 0, 9};
+        final ByteBuffer buffer = ByteBuffer.wrap(input);
+        final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
+        assertArrayEquals(new byte[] {1, 0}, array);
+        assertEquals(6, buffer.position());
+        assertTrue(buffer.hasRemaining());
+    }
+
+    @Test
+    public void getNullableSizePrefixedArrayNull() {
+        // -1
+        byte[] input = {-1, -1, -1, -1};
+        final ByteBuffer buffer = ByteBuffer.wrap(input);
+        final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
+        assertNull(array);
+        assertEquals(4, buffer.position());
+        assertFalse(buffer.hasRemaining());
+    }
+
+    @Test
+    public void getNullableSizePrefixedArrayInvalid() {
+        // -2
+        byte[] input = {-1, -1, -1, -2};
+        final ByteBuffer buffer = ByteBuffer.wrap(input);
+        assertThrows(NegativeArraySizeException.class, () -> Utils.getNullableSizePrefixedArray(buffer));
+    }
+
+    @Test
+    public void getNullableSizePrefixedArrayUnderflow() {
+        // Integer.MAX_VALUE
+        byte[] input = {127, -1, -1, -1};
+        final ByteBuffer buffer = ByteBuffer.wrap(input);
+        // note, we get a buffer underflow exception instead of an OOME, even though the encoded size
+        // would be 2,147,483,647 aka 2.1 GB, probably larger than the available heap
+        assertThrows(BufferUnderflowException.class, () -> Utils.getNullableSizePrefixedArray(buffer));
+    }
+
+    @Test
     public void utf8ByteArraySerde() {
         String utf8String = "A\u00ea\u00f1\u00fcC";
         byte[] utf8Bytes = utf8String.getBytes(StandardCharsets.UTF_8);
@@ -414,7 +476,7 @@ public class UtilsTest {
         String expectedBufferContent = fileChannelMockExpectReadWithRandomBytes(channelMock, bufferSize);
         Utils.readFullyOrFail(channelMock, buffer, 0L, "test");
         assertEquals("The buffer should be populated correctly", expectedBufferContent,
-                new String(buffer.array()));
+                     new String(buffer.array()));
         assertFalse("The buffer should be filled", buffer.hasRemaining());
         verify(channelMock, atLeastOnce()).read(any(), anyLong());
     }
@@ -431,7 +493,7 @@ public class UtilsTest {
         ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
         Utils.readFully(channelMock, buffer, 0L);
         assertEquals("The buffer should be populated correctly.", expectedBufferContent,
-                new String(buffer.array()));
+                     new String(buffer.array()));
         assertFalse("The buffer should be filled", buffer.hasRemaining());
         verify(channelMock, atLeastOnce()).read(any(), anyLong());
     }
@@ -480,7 +542,7 @@ public class UtilsTest {
      *
      * @param channelMock           The mocked FileChannel object
      * @param bufferSize            The buffer size
-     * @return                      Expected buffer string
+     * @return Expected buffer string
      * @throws IOException          If an I/O error occurs
      */
     private String fileChannelMockExpectReadWithRandomBytes(final FileChannel channelMock,
@@ -517,8 +579,9 @@ public class UtilsTest {
         @Override
         public void close() throws IOException {
             closed = true;
-            if (closeException != null)
+            if (closeException != null) {
                 throw closeException;
+            }
         }
 
         static TestCloseable[] createCloseables(boolean... exceptionOnClose) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index 5d7c7e3..3a34394 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import java.nio.ByteBuffer;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
 
 public final class FullChangeSerde<T> {
     private final Serde<T> inner;
@@ -69,33 +70,6 @@ public final class FullChangeSerde<T> {
     }
 
     /**
-     * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
-     * so that we can produce the legacy format to test that we can still deserialize it.
-     */
-    public static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {
-        if (serialChange == null) {
-            return null;
-        }
-
-        final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length;
-        final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length;
-
-        final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize));
-
-
-        buffer.putInt(oldSize);
-        if (serialChange.oldValue != null) {
-            buffer.put(serialChange.oldValue);
-        }
-
-        buffer.putInt(newSize);
-        if (serialChange.newValue != null) {
-            buffer.put(serialChange.newValue);
-        }
-        return buffer.array();
-    }
-
-    /**
      * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still
      * need to be able to read it (so that we can load the state store from previously-written changelog records).
      */
@@ -104,19 +78,8 @@ public final class FullChangeSerde<T> {
             return null;
         }
         final ByteBuffer buffer = ByteBuffer.wrap(data);
-
-        final int oldSize = buffer.getInt();
-        final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize];
-        if (oldBytes != null) {
-            buffer.get(oldBytes);
-        }
-
-        final int newSize = buffer.getInt();
-        final byte[] newBytes = newSize == -1 ? null : new byte[newSize];
-        if (newBytes != null) {
-            buffer.get(newBytes);
-        }
-
+        final byte[] oldBytes = getNullableSizePrefixedArray(buffer);
+        final byte[] newBytes = getNullableSizePrefixedArray(buffer);
         return new Change<>(newBytes, oldBytes);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 5662417..5dd0062 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -26,6 +26,8 @@ import java.nio.ByteBuffer;
 import java.util.Objects;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
 
 public class ProcessorRecordContext implements RecordContext {
 
@@ -161,12 +163,10 @@ public class ProcessorRecordContext implements RecordContext {
     public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
         final long timestamp = buffer.getLong();
         final long offset = buffer.getLong();
-        final int topicSize = buffer.getInt();
         final String topic;
         {
-            // not handling the null topic condition, because we believe the topic will never be null when we serialize
-            final byte[] topicBytes = new byte[topicSize];
-            buffer.get(topicBytes);
+            // we believe the topic will never be null when we serialize
+            final byte[] topicBytes = requireNonNull(getNullableSizePrefixedArray(buffer));
             topic = new String(topicBytes, UTF_8);
         }
         final int partition = buffer.getInt();
@@ -177,19 +177,8 @@ public class ProcessorRecordContext implements RecordContext {
         } else {
             final Header[] headerArr = new Header[headerCount];
             for (int i = 0; i < headerCount; i++) {
-                final int keySize = buffer.getInt();
-                final byte[] keyBytes = new byte[keySize];
-                buffer.get(keyBytes);
-
-                final int valueSize = buffer.getInt();
-                final byte[] valueBytes;
-                if (valueSize == -1) {
-                    valueBytes = null;
-                } else {
-                    valueBytes = new byte[valueSize];
-                    buffer.get(valueBytes);
-                }
-
+                final byte[] keyBytes = requireNonNull(getNullableSizePrefixedArray(buffer));
+                final byte[] valueBytes = getNullableSizePrefixedArray(buffer);
                 headerArr[i] = new RecordHeader(new String(keyBytes, UTF_8), valueBytes);
             }
             headers = new RecordHeaders(headerArr);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
index b52ec24..f27ab19 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
@@ -22,6 +22,9 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Objects;
 
+import static org.apache.kafka.common.utils.Utils.getNullableArray;
+import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
+
 public final class BufferValue {
     private static final int NULL_VALUE_SENTINEL = -1;
     private static final int OLD_PREV_DUPLICATE_VALUE_SENTINEL = -2;
@@ -67,35 +70,21 @@ public final class BufferValue {
     static BufferValue deserialize(final ByteBuffer buffer) {
         final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);
 
-        final byte[] priorValue = extractValue(buffer);
+        final byte[] priorValue = getNullableSizePrefixedArray(buffer);
 
         final byte[] oldValue;
         final int oldValueLength = buffer.getInt();
-        if (oldValueLength == NULL_VALUE_SENTINEL) {
-            oldValue = null;
-        } else if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
+        if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
             oldValue = priorValue;
         } else {
-            oldValue = new byte[oldValueLength];
-            buffer.get(oldValue);
+            oldValue = getNullableArray(buffer, oldValueLength);
         }
 
-        final byte[] newValue = extractValue(buffer);
+        final byte[] newValue = getNullableSizePrefixedArray(buffer);
 
         return new BufferValue(priorValue, oldValue, newValue, context);
     }
 
-    private static byte[] extractValue(final ByteBuffer buffer) {
-        final int valueLength = buffer.getInt();
-        if (valueLength == NULL_VALUE_SENTINEL) {
-            return null;
-        } else {
-            final byte[] value = new byte[valueLength];
-            buffer.get(value);
-            return value;
-        }
-    }
-
     ByteBuffer serialize(final int endPadding) {
 
         final int sizeOfValueLength = Integer.BYTES;
@@ -120,7 +109,7 @@ public final class BufferValue {
 
         if (oldValue == null) {
             buffer.putInt(NULL_VALUE_SENTINEL);
-        } else if (priorValue == oldValue) {
+        } else if (Arrays.equals(priorValue, oldValue)) {
             buffer.putInt(OLD_PREV_DUPLICATE_VALUE_SENTINEL);
         } else {
             buffer.putInt(sizeOfOldValue);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
index 3c24f52..a26b437 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Objects;
 
+import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
+
 public class ContextualRecord {
     private final byte[] value;
     private final ProcessorRecordContext recordContext;
@@ -43,36 +45,10 @@ public class ContextualRecord {
         return (value == null ? 0 : value.length) + recordContext.residentMemorySizeEstimate();
     }
 
-    ByteBuffer serialize(final int endPadding) {
-        final byte[] serializedContext = recordContext.serialize();
-
-        final int sizeOfContext = serializedContext.length;
-        final int sizeOfValueLength = Integer.BYTES;
-        final int sizeOfValue = value == null ? 0 : value.length;
-        final ByteBuffer buffer = ByteBuffer.allocate(sizeOfContext + sizeOfValueLength + sizeOfValue + endPadding);
-
-        buffer.put(serializedContext);
-        if (value == null) {
-            buffer.putInt(-1);
-        } else {
-            buffer.putInt(value.length);
-            buffer.put(value);
-        }
-
-        return buffer;
-    }
-
     static ContextualRecord deserialize(final ByteBuffer buffer) {
         final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);
-
-        final int valueLength = buffer.getInt();
-        if (valueLength == -1) {
-            return new ContextualRecord(null, context);
-        } else {
-            final byte[] value = new byte[valueLength];
-            buffer.get(value);
-            return new ContextualRecord(value, context);
-        }
+        final byte[] value = getNullableSizePrefixedArray(buffer);
+        return new ContextualRecord(value, context);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 4043cea..55921a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -38,9 +38,11 @@ import org.apache.kafka.streams.processor.internals.RecordQueue;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.DeserializationResult;
 import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -54,14 +56,19 @@ import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV0;
+import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV1;
+import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV3;
+import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2;
 
 public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> {
     private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
     private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
-    private static final RecordHeaders V_1_CHANGELOG_HEADERS =
-        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
-    private static final RecordHeaders V_2_CHANGELOG_HEADERS =
-        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
+    private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1};
+    private static final byte[] V_2_CHANGELOG_HEADER_VALUE = {(byte) 2};
+    private static final byte[] V_3_CHANGELOG_HEADER_VALUE = {(byte) 3};
+    static final RecordHeaders CHANGELOG_HEADERS =
+        new RecordHeaders(new Header[] {new RecordHeader("v", V_3_CHANGELOG_HEADER_VALUE)});
     private static final String METRIC_SCOPE = "in-memory-suppression";
 
     private final Map<Bytes, BufferKey> index = new HashMap<>();
@@ -259,12 +266,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
         final int sizeOfBufferTime = Long.BYTES;
         final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
         buffer.putLong(bufferKey.time());
-
         collector.send(
             changelogTopic,
             key,
             buffer.array(),
-            V_2_CHANGELOG_HEADERS,
+            CHANGELOG_HEADERS,
             partition,
             null,
             KEY_SERIALIZER,
@@ -286,6 +292,15 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
 
     private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch) {
         for (final ConsumerRecord<byte[], byte[]> record : batch) {
+            if (record.partition() != partition) {
+                throw new IllegalStateException(
+                    String.format(
+                        "record partition [%d] is being restored by the wrong suppress partition [%d]",
+                        record.partition(),
+                        partition
+                    )
+                );
+            }
             final Bytes key = Bytes.wrap(record.key());
             if (record.value() == null) {
                 // This was a tombstone. Delete the record.
@@ -299,92 +314,63 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
                         minTimestamp = sortedMap.isEmpty() ? Long.MAX_VALUE : sortedMap.firstKey().time();
                     }
                 }
-
-                if (record.partition() != partition) {
-                    throw new IllegalStateException(
-                        String.format(
-                            "record partition [%d] is being restored by the wrong suppress partition [%d]",
-                            record.partition(),
-                            partition
-                        )
-                    );
-                }
             } else {
-                if (record.headers().lastHeader("v") == null) {
-                    // in this case, the changelog value is just the serialized record value
-                    final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
-                    final long time = timeAndValue.getLong();
-                    final byte[] changelogValue = new byte[record.value().length - 8];
-                    timeAndValue.get(changelogValue);
-
-                    final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(changelogValue));
-
-                    final ProcessorRecordContext recordContext = new ProcessorRecordContext(
-                        record.timestamp(),
-                        record.offset(),
-                        record.partition(),
-                        record.topic(),
-                        record.headers()
-                    );
-
-                    cleanPut(
-                        time,
-                        key,
-                        new BufferValue(
-                            index.containsKey(key)
-                                ? internalPriorValueForBuffered(key)
-                                : change.oldValue,
-                            change.oldValue,
-                            change.newValue,
-                            recordContext
-                        )
-                    );
-                } else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
-                    // in this case, the changelog value is a serialized ContextualRecord
-                    final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
-                    final long time = timeAndValue.getLong();
-                    final byte[] changelogValue = new byte[record.value().length - 8];
-                    timeAndValue.get(changelogValue);
-
-                    final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
-                    final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
-
-                    cleanPut(
-                        time,
-                        key,
-                        new BufferValue(
-                            index.containsKey(key)
-                                ? internalPriorValueForBuffered(key)
-                                : change.oldValue,
-                            change.oldValue,
-                            change.newValue,
-                            contextualRecord.recordContext()
-                        )
-                    );
-                } else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
-                    // in this case, the changelog value is a serialized BufferValue
-
-                    final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value());
-                    final BufferValue bufferValue = BufferValue.deserialize(valueAndTime);
-                    final long time = valueAndTime.getLong();
-                    cleanPut(time, key, bufferValue);
+                final Header versionHeader = record.headers().lastHeader("v");
+                if (versionHeader == null) {
+                    // Version 0:
+                    // value:
+                    //  - buffer time
+                    //  - old value
+                    //  - new value
+                    final byte[] previousBufferedValue = index.containsKey(key)
+                        ? internalPriorValueForBuffered(key)
+                        : null;
+                    final DeserializationResult deserializationResult = deserializeV0(record, key, previousBufferedValue);
+                    cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
+                } else if (Arrays.equals(versionHeader.value(), V_3_CHANGELOG_HEADER_VALUE)) {
+                    // Version 3:
+                    // value:
+                    //  - record context
+                    //  - prior value
+                    //  - old value
+                    //  - new value
+                    //  - buffer time
+                    final DeserializationResult deserializationResult = deserializeV3(record, key);
+                    cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
+
+                } else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) {
+                    // Version 2:
+                    // value:
+                    //  - record context
+                    //  - old value
+                    //  - new value
+                    //  - prior value
+                    //  - buffer time
+                    // NOTE: 2.4.0, 2.4.1, and 2.5.0 actually encode Version 3 formatted data,
+                    // but still set the Version 2 flag, so to deserialize, we have to duck type.
+                    final DeserializationResult deserializationResult = duckTypeV2(record, key);
+                    cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
+                } else if (Arrays.equals(versionHeader.value(), V_1_CHANGELOG_HEADER_VALUE)) {
+                    // Version 1:
+                    // value:
+                    //  - buffer time
+                    //  - record context
+                    //  - old value
+                    //  - new value
+                    final byte[] previousBufferedValue = index.containsKey(key)
+                        ? internalPriorValueForBuffered(key)
+                        : null;
+                    final DeserializationResult deserializationResult = deserializeV1(record, key, previousBufferedValue);
+                    cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
                 } else {
                     throw new IllegalArgumentException("Restoring apparently invalid changelog record: " + record);
                 }
             }
-            if (record.partition() != partition) {
-                throw new IllegalStateException(
-                    String.format(
-                        "record partition [%d] is being restored by the wrong suppress partition [%d]",
-                        record.partition(),
-                        partition
-                    )
-                );
-            }
         }
         updateBufferMetrics();
     }
 
+
     @Override
     public void evictWhile(final Supplier<Boolean> predicate,
                            final Consumer<Eviction<K, V>> callback) {
@@ -481,8 +467,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
         final BufferValue buffered = getBuffered(serializedKey);
         final byte[] serializedPriorValue;
         if (buffered == null) {
-            final V priorValue = value.oldValue;
-            serializedPriorValue = (priorValue == null) ? null : valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
+            serializedPriorValue = serialChange.oldValue;
         } else {
             serializedPriorValue = buffered.priorValue();
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java
new file mode 100644
index 0000000..74489c2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+
+import java.nio.ByteBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+final class TimeOrderedKeyValueBufferChangelogDeserializationHelper {
+    private TimeOrderedKeyValueBufferChangelogDeserializationHelper() {}
+
+    static final class DeserializationResult {
+        private final long time;
+        private final Bytes key;
+        private final BufferValue bufferValue;
+
+        private DeserializationResult(final long time, final Bytes key, final BufferValue bufferValue) {
+            this.time = time;
+            this.key = key;
+            this.bufferValue = bufferValue;
+        }
+
+        long time() {
+            return time;
+        }
+
+        Bytes key() {
+            return key;
+        }
+
+        BufferValue bufferValue() {
+            return bufferValue;
+        }
+    }
+
+    static DeserializationResult deserializeV0(final ConsumerRecord<byte[], byte[]> record,
+                                               final Bytes key,
+                                               final byte[] previousBufferedValue) {
+
+        final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
+        final long time = timeAndValue.getLong();
+        final byte[] changelogValue = new byte[record.value().length - 8];
+        timeAndValue.get(changelogValue);
+
+        final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(changelogValue));
+
+        final ProcessorRecordContext recordContext = new ProcessorRecordContext(
+            record.timestamp(),
+            record.offset(),
+            record.partition(),
+            record.topic(),
+            record.headers()
+        );
+
+        return new DeserializationResult(
+            time,
+            key,
+            new BufferValue(
+                previousBufferedValue == null ? change.oldValue : previousBufferedValue,
+                change.oldValue,
+                change.newValue,
+                recordContext
+            )
+        );
+    }
+
+    static DeserializationResult deserializeV1(final ConsumerRecord<byte[], byte[]> record,
+                                               final Bytes key,
+                                               final byte[] previousBufferedValue) {
+        final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
+        final long time = timeAndValue.getLong();
+        final byte[] changelogValue = new byte[record.value().length - 8];
+        timeAndValue.get(changelogValue);
+
+        final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
+        final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
+
+        return new DeserializationResult(
+            time,
+            key,
+            new BufferValue(
+                previousBufferedValue == null ? change.oldValue : previousBufferedValue,
+                change.oldValue,
+                change.newValue,
+                contextualRecord.recordContext()
+            )
+        );
+    }
+
+    static DeserializationResult duckTypeV2(final ConsumerRecord<byte[], byte[]> record, final Bytes key) {
+        DeserializationResult deserializationResult = null;
+        RuntimeException v2DeserializationException = null;
+        RuntimeException v3DeserializationException = null;
+        try {
+            deserializationResult = deserializeV2(record, key);
+        } catch (final RuntimeException e) {
+            v2DeserializationException = e;
+        }
+        // versions 2.4.0, 2.4.1, and 2.5.0 would have erroneously encoded a V3 record with the
+        // V2 header, so we'll try duck-typing to see if this is decodable as V3
+        if (deserializationResult == null) {
+            try {
+                deserializationResult = deserializeV3(record, key);
+            } catch (final RuntimeException e) {
+                v3DeserializationException = e;
+            }
+        }
+
+        if (deserializationResult == null) {
+            // ok, it wasn't V3 either. Throw both exceptions:
+            final RuntimeException exception =
+                new RuntimeException("Couldn't deserialize record as v2 or v3: " + record,
+                                     v2DeserializationException);
+            exception.addSuppressed(v3DeserializationException);
+            throw exception;
+        }
+        return deserializationResult;
+    }
+
+    private static DeserializationResult deserializeV2(final ConsumerRecord<byte[], byte[]> record,
+                                                       final Bytes key) {
+        final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value());
+        final ContextualRecord contextualRecord = ContextualRecord.deserialize(valueAndTime);
+        final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
+        final byte[] priorValue = Utils.getNullableSizePrefixedArray(valueAndTime);
+        final long time = valueAndTime.getLong();
+        final BufferValue bufferValue = new BufferValue(priorValue, change.oldValue, change.newValue, contextualRecord.recordContext());
+        return new DeserializationResult(time, key, bufferValue);
+    }
+
+    static DeserializationResult deserializeV3(final ConsumerRecord<byte[], byte[]> record, final Bytes key) {
+        final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value());
+        final BufferValue bufferValue = BufferValue.deserialize(valueAndTime);
+        final long time = valueAndTime.getLong();
+        return new DeserializationResult(time, key, bufferValue);
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
index ac6762f..e7e0c88 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serdes;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
+
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -26,10 +28,37 @@ import static org.hamcrest.core.Is.is;
 public class FullChangeSerdeTest {
     private final FullChangeSerde<String> serde = FullChangeSerde.wrap(Serdes.String());
 
+    /**
+     * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
+     * so that we can produce the legacy format to test that we can still deserialize it.
+     */
+    private static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {
+        if (serialChange == null) {
+            return null;
+        }
+
+        final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length;
+        final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length;
+
+        final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize));
+
+
+        buffer.putInt(oldSize);
+        if (serialChange.oldValue != null) {
+            buffer.put(serialChange.oldValue);
+        }
+
+        buffer.putInt(newSize);
+        if (serialChange.newValue != null) {
+            buffer.put(serialChange.newValue);
+        }
+        return buffer.array();
+    }
+
     @Test
     public void shouldRoundTripNull() {
         assertThat(serde.serializeParts(null, null), nullValue());
-        assertThat(FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(null), nullValue());
+        assertThat(mergeChangeArraysIntoSingleLegacyFormattedArray(null), nullValue());
         assertThat(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(null), nullValue());
         assertThat(serde.deserializeParts(null, null), nullValue());
     }
@@ -47,7 +76,7 @@ public class FullChangeSerdeTest {
             is(new Change<String>(null, null))
         );
 
-        final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(new Change<>(null, null));
+        final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(new Change<>(null, null));
         assertThat(
             FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat),
             is(new Change<byte[]>(null, null))
@@ -57,7 +86,7 @@ public class FullChangeSerdeTest {
     @Test
     public void shouldRoundTripOldNull() {
         final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", null));
-        final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
+        final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
         final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
         assertThat(
             serde.deserializeParts(null, decomposedLegacyFormat),
@@ -68,7 +97,7 @@ public class FullChangeSerdeTest {
     @Test
     public void shouldRoundTripNewNull() {
         final Change<byte[]> serialized = serde.serializeParts(null, new Change<>(null, "old"));
-        final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
+        final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
         final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
         assertThat(
             serde.deserializeParts(null, decomposedLegacyFormat),
@@ -79,7 +108,7 @@ public class FullChangeSerdeTest {
     @Test
     public void shouldRoundTripChange() {
         final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", "old"));
-        final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
+        final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
         final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
         assertThat(
             serde.deserializeParts(null, decomposedLegacyFormat),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 25a44c4..a054ac9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.Change;
-import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
@@ -56,14 +55,13 @@ import java.util.stream.Collectors;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.CHANGELOG_HEADERS;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
 public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<String, String>> {
-    private static final RecordHeaders V_2_CHANGELOG_HEADERS =
-        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
 
     private static final String APP_ID = "test-app";
     private final Function<String, B> bufferSupplier;
@@ -73,7 +71,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         @Override
         public byte[] serialize(final String topic, final String data) {
             if (data == null) {
-                throw new IllegalArgumentException();
+                throw new IllegalArgumentException("null data not allowed");
             }
             return super.serialize(topic, data);
         }
@@ -347,14 +345,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  null,
                                  "zxcv",
                                  new KeyValue<>(1L, getBufferValue("3gon4i", 1)),
-                                 V_2_CHANGELOG_HEADERS
+                                 CHANGELOG_HEADERS
             ),
             new ProducerRecord<>(APP_ID + "-" + testName + "-changelog",
                                  0,
                                  null,
                                  "asdf",
                                  new KeyValue<>(2L, getBufferValue("2093j", 0)),
-                                 V_2_CHANGELOG_HEADERS
+                                 CHANGELOG_HEADERS
             )
         )));
 
@@ -362,7 +360,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     }
 
     @Test
-    public void shouldRestoreOldFormat() {
+    public void shouldRestoreOldUnversionedFormat() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
@@ -372,12 +370,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
 
-        final FullChangeSerde<String> serializer = FullChangeSerde.wrap(Serdes.String());
+        // These serialized formats were captured by running version 2.1 code.
+        // They verify that an upgrade from 2.1 will work.
+        // Do not change them.
+        final String toDeleteBinaryValue = "0000000000000000FFFFFFFF00000006646F6F6D6564";
+        final String asdfBinaryValue = "0000000000000002FFFFFFFF0000000471776572";
+        final String zxcvBinaryValue1 = "00000000000000010000000870726576696F757300000005656F34696D";
+        final String zxcvBinaryValue2 = "000000000000000100000005656F34696D000000046E657874";
 
-        final byte[] todeleteValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("doomed", null)));
-        final byte[] asdfValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("qwer", null)));
-        final byte[] zxcvValue1 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("eo4im", "previous")));
-        final byte[] zxcvValue2 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("next", "eo4im")));
         stateRestoreCallback.restoreBatch(asList(
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -388,7 +388,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "todelete".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array()),
+                                 hexStringToByteArray(toDeleteBinaryValue)),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  1,
@@ -398,7 +398,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "asdf".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + asdfValue.length).putLong(2L).put(asdfValue).array()),
+                                 hexStringToByteArray(asdfBinaryValue)),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  2,
@@ -408,7 +408,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).putLong(1L).put(zxcvValue1).array()),
+                                 hexStringToByteArray(zxcvBinaryValue1)),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  3,
@@ -418,7 +418,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).putLong(1L).put(zxcvValue2).array())
+                                 hexStringToByteArray(zxcvBinaryValue2))
         ));
 
         assertThat(buffer.numRecords(), is(3));
@@ -486,17 +486,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
 
-        final byte[] todeleteValue = getContextualRecord("doomed", 0).serialize(0).array();
-        final byte[] asdfValue = getContextualRecord("qwer", 1).serialize(0).array();
-        final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
-        final byte[] zxcvValue1 = new ContextualRecord(
-            FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))),
-            getContext(2L)
-        ).serialize(0).array();
-        final byte[] zxcvValue2 = new ContextualRecord(
-            FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))),
-            getContext(3L)
-        ).serialize(0).array();
+        // These serialized formats were captured by running version 2.2 code.
+        // They verify that an upgrade from 2.2 will work.
+        // Do not change them.
+        final String toDeleteBinary = "00000000000000000000000000000000000000000000000000000005746F70696300000000FFFFFFFF0000000EFFFFFFFF00000006646F6F6D6564";
+        final String asdfBinary = "00000000000000020000000000000001000000000000000000000005746F70696300000000FFFFFFFF0000000CFFFFFFFF0000000471776572";
+        final String zxcvBinary1 = "00000000000000010000000000000002000000000000000000000005746F70696300000000FFFFFFFF000000150000000870726576696F757300000005336F34696D";
+        final String zxcvBinary2 = "00000000000000010000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000001100000005336F34696D000000046E657874";
+
         stateRestoreCallback.restoreBatch(asList(
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -507,7 +504,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "todelete".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array(),
+                                 hexStringToByteArray(toDeleteBinary),
                                  v1FlagHeaders),
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -518,7 +515,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "asdf".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + asdfValue.length).putLong(2L).put(asdfValue).array(),
+                                 hexStringToByteArray(asdfBinary),
                                  v1FlagHeaders),
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -529,7 +526,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).putLong(1L).put(zxcvValue1).array(),
+                                 hexStringToByteArray(zxcvBinary1),
                                  v1FlagHeaders),
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -540,7 +537,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).putLong(1L).put(zxcvValue2).array(),
+                                 hexStringToByteArray(zxcvBinary2),
                                  v1FlagHeaders)
         ));
 
@@ -596,6 +593,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         cleanup(context, buffer);
     }
 
+
     @Test
     public void shouldRestoreV2Format() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
@@ -609,22 +607,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         final RecordHeaders v2FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
 
-        final byte[] todeleteValue = getBufferValue("doomed", 0).serialize(0).array();
-        final byte[] asdfValue = getBufferValue("qwer", 1).serialize(0).array();
-        final byte[] zxcvValue1 =
-            new BufferValue(
-                Serdes.String().serializer().serialize(null, "previous"),
-                Serdes.String().serializer().serialize(null, "IGNORED"),
-                Serdes.String().serializer().serialize(null, "3o4im"),
-                getContext(2L)
-            ).serialize(0).array();
-        final byte[] zxcvValue2 =
-            new BufferValue(
-                Serdes.String().serializer().serialize(null, "previous"),
-                Serdes.String().serializer().serialize(null, "3o4im"),
-                Serdes.String().serializer().serialize(null, "next"),
-                getContext(3L)
-            ).serialize(0).array();
+        // These serialized formats were captured by running version 2.3 code.
+        // They verify that an upgrade from 2.3 will work.
+        // Do not change them.
+        final String toDeleteBinary = "0000000000000000000000000000000000000005746F70696300000000FFFFFFFF0000000EFFFFFFFF00000006646F6F6D6564FFFFFFFF0000000000000000";
+        final String asdfBinary = "0000000000000001000000000000000000000005746F70696300000000FFFFFFFF0000000CFFFFFFFF0000000471776572FFFFFFFF0000000000000002";
+        final String zxcvBinary1 = "0000000000000002000000000000000000000005746F70696300000000FFFFFFFF000000140000000749474E4F52454400000005336F34696D0000000870726576696F75730000000000000001";
+        final String zxcvBinary2 = "0000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000001100000005336F34696D000000046E6578740000000870726576696F75730000000000000001";
+
         stateRestoreCallback.restoreBatch(asList(
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -635,7 +625,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "todelete".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + todeleteValue.length).put(todeleteValue).putLong(0L).array(),
+                                 hexStringToByteArray(toDeleteBinary),
                                  v2FlagHeaders),
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -646,7 +636,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "asdf".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + asdfValue.length).put(asdfValue).putLong(2L).array(),
+                                 hexStringToByteArray(asdfBinary),
                                  v2FlagHeaders),
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -657,7 +647,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).put(zxcvValue1).putLong(1L).array(),
+                                 hexStringToByteArray(zxcvBinary1),
                                  v2FlagHeaders),
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -668,7 +658,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).put(zxcvValue2).putLong(1L).array(),
+                                 hexStringToByteArray(zxcvBinary2),
                                  v2FlagHeaders)
         ));
 
@@ -725,6 +715,249 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     }
 
     @Test
+    public void shouldRestoreV3FormatWithV2Header() {
+        // versions 2.4.0, 2.4.1, and 2.5.0 would have erroneously encoded a V3 record with the
+        // V2 header, so we need to be sure to handle this case as well.
+        // Note the data is the same as the V3 test.
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
+        final MockInternalProcessorContext context = makeContext();
+        buffer.init(context, buffer);
+
+        final RecordBatchingStateRestoreCallback stateRestoreCallback =
+            (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
+
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
+
+        final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
+
+        // These serialized formats were captured by running version 2.4 code.
+        // They verify that an upgrade from 2.4 will work.
+        // Do not change them.
+        final String toDeleteBinary = "0000000000000000000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000006646F6F6D65640000000000000000";
+        final String asdfBinary = "0000000000000001000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000004717765720000000000000002";
+        final String zxcvBinary1 = "0000000000000002000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F75730000000749474E4F52454400000005336F34696D0000000000000001";
+        final String zxcvBinary2 = "0000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F757300000005336F34696D000000046E6578740000000000000001";
+
+        stateRestoreCallback.restoreBatch(asList(
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 0,
+                                 999,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "todelete".getBytes(UTF_8),
+                                 hexStringToByteArray(toDeleteBinary),
+                                 headers),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 1,
+                                 9999,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "asdf".getBytes(UTF_8),
+                                 hexStringToByteArray(asdfBinary),
+                                 headers),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 2,
+                                 99,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "zxcv".getBytes(UTF_8),
+                                 hexStringToByteArray(zxcvBinary1),
+                                 headers),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 2,
+                                 100,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "zxcv".getBytes(UTF_8),
+                                 hexStringToByteArray(zxcvBinary2),
+                                 headers)
+        ));
+
+        assertThat(buffer.numRecords(), is(3));
+        assertThat(buffer.minTimestamp(), is(0L));
+        assertThat(buffer.bufferSize(), is(142L));
+
+        stateRestoreCallback.restoreBatch(singletonList(
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 3,
+                                 3,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "todelete".getBytes(UTF_8),
+                                 null)
+        ));
+
+        assertThat(buffer.numRecords(), is(2));
+        assertThat(buffer.minTimestamp(), is(1L));
+        assertThat(buffer.bufferSize(), is(95L));
+
+        assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
+        assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
+        assertThat(buffer.priorValueForBuffered("zxcv"), is(Maybe.defined(ValueAndTimestamp.make("previous", -1))));
+
+        // flush the buffer into a list in buffer order so we can make assertions about the contents.
+
+        final List<Eviction<String, String>> evicted = new LinkedList<>();
+        buffer.evictWhile(() -> true, evicted::add);
+
+        // Several things to note:
+        // * The buffered records are ordered according to their buffer time (serialized in the value of the changelog)
+        // * The record timestamps are properly restored, and not conflated with the record's buffer time.
+        // * The keys and values are properly restored
+        // * The record topic is set to the original input topic, *not* the changelog topic
+        // * The record offset preserves the original input record's offset, *not* the offset of the changelog record
+
+
+        assertThat(evicted, is(asList(
+            new Eviction<>(
+                "zxcv",
+                new Change<>("next", "3o4im"),
+                getContext(3L)),
+            new Eviction<>(
+                "asdf",
+                new Change<>("qwer", null),
+                getContext(1L)
+            ))));
+
+        cleanup(context, buffer);
+    }
+
+    @Test
+    public void shouldRestoreV3Format() {
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
+        final MockInternalProcessorContext context = makeContext();
+        buffer.init(context, buffer);
+
+        final RecordBatchingStateRestoreCallback stateRestoreCallback =
+            (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
+
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
+
+        final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 3})});
+
+        // These serialized formats were captured by running version 2.4 code.
+        // They verify that an upgrade from 2.4 will work.
+        // Do not change them.
+        final String toDeleteBinary = "0000000000000000000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000006646F6F6D65640000000000000000";
+        final String asdfBinary = "0000000000000001000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000004717765720000000000000002";
+        final String zxcvBinary1 = "0000000000000002000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F75730000000749474E4F52454400000005336F34696D0000000000000001";
+        final String zxcvBinary2 = "0000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F757300000005336F34696D000000046E6578740000000000000001";
+
+        stateRestoreCallback.restoreBatch(asList(
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 0,
+                                 999,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "todelete".getBytes(UTF_8),
+                                 hexStringToByteArray(toDeleteBinary),
+                                 headers),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 1,
+                                 9999,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "asdf".getBytes(UTF_8),
+                                 hexStringToByteArray(asdfBinary),
+                                 headers),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 2,
+                                 99,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "zxcv".getBytes(UTF_8),
+                                 hexStringToByteArray(zxcvBinary1),
+                                 headers),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 2,
+                                 100,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "zxcv".getBytes(UTF_8),
+                                 hexStringToByteArray(zxcvBinary2),
+                                 headers)
+        ));
+
+        assertThat(buffer.numRecords(), is(3));
+        assertThat(buffer.minTimestamp(), is(0L));
+        assertThat(buffer.bufferSize(), is(142L));
+
+        stateRestoreCallback.restoreBatch(singletonList(
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 3,
+                                 3,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "todelete".getBytes(UTF_8),
+                                 null)
+        ));
+
+        assertThat(buffer.numRecords(), is(2));
+        assertThat(buffer.minTimestamp(), is(1L));
+        assertThat(buffer.bufferSize(), is(95L));
+
+        assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
+        assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
+        assertThat(buffer.priorValueForBuffered("zxcv"), is(Maybe.defined(ValueAndTimestamp.make("previous", -1))));
+
+        // flush the buffer into a list in buffer order so we can make assertions about the contents.
+
+        final List<Eviction<String, String>> evicted = new LinkedList<>();
+        buffer.evictWhile(() -> true, evicted::add);
+
+        // Several things to note:
+        // * The buffered records are ordered according to their buffer time (serialized in the value of the changelog)
+        // * The record timestamps are properly restored, and not conflated with the record's buffer time.
+        // * The keys and values are properly restored
+        // * The record topic is set to the original input topic, *not* the changelog topic
+        // * The record offset preserves the original input record's offset, *not* the offset of the changelog record
+
+
+        assertThat(evicted, is(asList(
+            new Eviction<>(
+                "zxcv",
+                new Change<>("next", "3o4im"),
+                getContext(3L)),
+            new Eviction<>(
+                "asdf",
+                new Change<>("qwer", null),
+                getContext(1L)
+            ))));
+
+        cleanup(context, buffer);
+    }
+
+    @Test
     public void shouldNotRestoreUnrecognizedVersionRecord() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
@@ -780,15 +1013,30 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         );
     }
 
-    private static ContextualRecord getContextualRecord(final String value, final long timestamp) {
-        final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
-        return new ContextualRecord(
-            FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>(value, null))),
-            getContext(timestamp)
-        );
-    }
-
     private static ProcessorRecordContext getContext(final long recordTimestamp) {
         return new ProcessorRecordContext(recordTimestamp, 0, 0, "topic", null);
     }
+
+
+    // to be used to generate future hex-encoded values
+//    private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
+//    private static String bytesToHex(final byte[] bytes) {
+//        final char[] hexChars = new char[bytes.length * 2];
+//        for (int j = 0; j < bytes.length; j++) {
+//            final int v = bytes[j] & 0xFF;
+//            hexChars[j * 2] = HEX_ARRAY[v >>> 4];
+//            hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
+//        }
+//        return new String(hexChars);
+//    }
+
+    private static byte[] hexStringToByteArray(final String hexString) {
+        final int len = hexString.length();
+        final byte[] data = new byte[len / 2];
+        for (int i = 0; i < len; i += 2) {
+            data[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4)
+                + Character.digit(hexString.charAt(i + 1), 16));
+        }
+        return data;
+    }
 }