You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/06/27 03:20:54 UTC
[kafka] branch 2.5 updated: KAFKA-10173: Fix suppress changelog
binary schema compatibility (#8905)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 1d07fb8 KAFKA-10173: Fix suppress changelog binary schema compatibility (#8905)
1d07fb8 is described below
commit 1d07fb8c583e24deb09b941cf9e9d5a493100fe6
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Jun 26 21:41:51 2020 -0500
KAFKA-10173: Fix suppress changelog binary schema compatibility (#8905)
We inadvertently changed the binary schema of the suppress buffer changelog
in 2.4.0 without bumping the schema version number. As a result, it is impossible
to upgrade from 2.3.x to 2.4+ if you are using suppression.
* Refactor the schema compatibility test to use serialized data from older versions
as a more foolproof compatibility test.
* Refactor the upgrade system test to use the smoke test application so that we
actually exercise a significant portion of the Streams API during upgrade testing
* Add more recent versions to the upgrade system test matrix
* Fix the compatibility bug by bumping the schema version to 3
Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Guozhang Wang <wa...@gmail.com>
---
.../java/org/apache/kafka/common/utils/Utils.java | 32 ++
.../org/apache/kafka/common/utils/UtilsTest.java | 73 +++-
.../streams/kstream/internals/FullChangeSerde.java | 43 +--
.../internals/ProcessorRecordContext.java | 23 +-
.../kafka/streams/state/internals/BufferValue.java | 27 +-
.../streams/state/internals/ContextualRecord.java | 32 +-
.../InMemoryTimeOrderedKeyValueBuffer.java | 155 ++++-----
...yValueBufferChangelogDeserializationHelper.java | 158 +++++++++
.../kstream/internals/FullChangeSerdeTest.java | 39 ++-
.../internals/TimeOrderedKeyValueBufferTest.java | 366 +++++++++++++++++----
10 files changed, 690 insertions(+), 258 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index e2160d9..548d40c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -30,6 +30,7 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
@@ -277,6 +278,37 @@ public final class Utils {
}
/**
+ * Starting from the current position, read an integer indicating the size of the byte array to read,
+ * then read the array. Consumes the buffer: upon returning, the buffer's position is after the array
+ * that is returned.
+ * @param buffer The buffer to read a size-prefixed array from
+ * @return The array
+ */
+ public static byte[] getNullableSizePrefixedArray(final ByteBuffer buffer) {
+ final int size = buffer.getInt();
+ return getNullableArray(buffer, size);
+ }
+
+ /**
+ * Read a byte array of the given size. Consumes the buffer: upon returning, the buffer's position
+ * is after the array that is returned.
+ * @param buffer The buffer to read a size-prefixed array from
+ * @param size The number of bytes to read out of the buffer
+ * @return The array
+ */
+ public static byte[] getNullableArray(final ByteBuffer buffer, final int size) {
+ if (size > buffer.remaining()) {
+ // preemptively throw this when the read is doomed to fail, so we don't have to allocate the array.
+ throw new BufferUnderflowException();
+ }
+ final byte[] oldBytes = size == -1 ? null : new byte[size];
+ if (oldBytes != null) {
+ buffer.get(oldBytes);
+ }
+ return oldBytes;
+ }
+
+ /**
* Returns a copy of src byte array
* @param src The byte array to copy
* @return The copy
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 983232a..d4785d0 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
+import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
@@ -51,6 +52,8 @@ import static org.apache.kafka.common.utils.Utils.validHostPattern;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -70,7 +73,7 @@ public class UtilsTest {
cases.put("a-little-bit-long-string".getBytes(), -985981536);
cases.put("a-little-bit-longer-string".getBytes(), -1486304829);
cases.put("lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8".getBytes(), -58897971);
- cases.put(new byte[]{'a', 'b', 'c'}, 479470107);
+ cases.put(new byte[] {'a', 'b', 'c'}, 479470107);
for (Map.Entry c : cases.entrySet()) {
assertEquals((int) c.getValue(), murmur2((byte[]) c.getKey()));
@@ -204,6 +207,65 @@ public class UtilsTest {
}
@Test
+ public void getNullableSizePrefixedArrayExact() {
+ byte[] input = {0, 0, 0, 2, 1, 0};
+ final ByteBuffer buffer = ByteBuffer.wrap(input);
+ final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
+ assertArrayEquals(new byte[] {1, 0}, array);
+ assertEquals(6, buffer.position());
+ assertFalse(buffer.hasRemaining());
+ }
+
+ @Test
+ public void getNullableSizePrefixedArrayExactEmpty() {
+ byte[] input = {0, 0, 0, 0};
+ final ByteBuffer buffer = ByteBuffer.wrap(input);
+ final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
+ assertArrayEquals(new byte[] {}, array);
+ assertEquals(4, buffer.position());
+ assertFalse(buffer.hasRemaining());
+ }
+
+ @Test
+ public void getNullableSizePrefixedArrayRemainder() {
+ byte[] input = {0, 0, 0, 2, 1, 0, 9};
+ final ByteBuffer buffer = ByteBuffer.wrap(input);
+ final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
+ assertArrayEquals(new byte[] {1, 0}, array);
+ assertEquals(6, buffer.position());
+ assertTrue(buffer.hasRemaining());
+ }
+
+ @Test
+ public void getNullableSizePrefixedArrayNull() {
+ // -1
+ byte[] input = {-1, -1, -1, -1};
+ final ByteBuffer buffer = ByteBuffer.wrap(input);
+ final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
+ assertNull(array);
+ assertEquals(4, buffer.position());
+ assertFalse(buffer.hasRemaining());
+ }
+
+ @Test
+ public void getNullableSizePrefixedArrayInvalid() {
+ // -2
+ byte[] input = {-1, -1, -1, -2};
+ final ByteBuffer buffer = ByteBuffer.wrap(input);
+ assertThrows(NegativeArraySizeException.class, () -> Utils.getNullableSizePrefixedArray(buffer));
+ }
+
+ @Test
+ public void getNullableSizePrefixedArrayUnderflow() {
+ // Integer.MAX_VALUE
+ byte[] input = {127, -1, -1, -1};
+ final ByteBuffer buffer = ByteBuffer.wrap(input);
+ // note, we get a buffer underflow exception instead of an OOME, even though the encoded size
+ // would be 2,147,483,647 aka 2.1 GB, probably larger than the available heap
+ assertThrows(BufferUnderflowException.class, () -> Utils.getNullableSizePrefixedArray(buffer));
+ }
+
+ @Test
public void utf8ByteArraySerde() {
String utf8String = "A\u00ea\u00f1\u00fcC";
byte[] utf8Bytes = utf8String.getBytes(StandardCharsets.UTF_8);
@@ -414,7 +476,7 @@ public class UtilsTest {
String expectedBufferContent = fileChannelMockExpectReadWithRandomBytes(channelMock, bufferSize);
Utils.readFullyOrFail(channelMock, buffer, 0L, "test");
assertEquals("The buffer should be populated correctly", expectedBufferContent,
- new String(buffer.array()));
+ new String(buffer.array()));
assertFalse("The buffer should be filled", buffer.hasRemaining());
verify(channelMock, atLeastOnce()).read(any(), anyLong());
}
@@ -431,7 +493,7 @@ public class UtilsTest {
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
Utils.readFully(channelMock, buffer, 0L);
assertEquals("The buffer should be populated correctly.", expectedBufferContent,
- new String(buffer.array()));
+ new String(buffer.array()));
assertFalse("The buffer should be filled", buffer.hasRemaining());
verify(channelMock, atLeastOnce()).read(any(), anyLong());
}
@@ -480,7 +542,7 @@ public class UtilsTest {
*
* @param channelMock The mocked FileChannel object
* @param bufferSize The buffer size
- * @return Expected buffer string
+ * @return Expected buffer string
* @throws IOException If an I/O error occurs
*/
private String fileChannelMockExpectReadWithRandomBytes(final FileChannel channelMock,
@@ -517,8 +579,9 @@ public class UtilsTest {
@Override
public void close() throws IOException {
closed = true;
- if (closeException != null)
+ if (closeException != null) {
throw closeException;
+ }
}
static TestCloseable[] createCloseables(boolean... exceptionOnClose) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index 5d7c7e3..3a34394 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
public final class FullChangeSerde<T> {
private final Serde<T> inner;
@@ -69,33 +70,6 @@ public final class FullChangeSerde<T> {
}
/**
- * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
- * so that we can produce the legacy format to test that we can still deserialize it.
- */
- public static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {
- if (serialChange == null) {
- return null;
- }
-
- final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length;
- final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length;
-
- final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize));
-
-
- buffer.putInt(oldSize);
- if (serialChange.oldValue != null) {
- buffer.put(serialChange.oldValue);
- }
-
- buffer.putInt(newSize);
- if (serialChange.newValue != null) {
- buffer.put(serialChange.newValue);
- }
- return buffer.array();
- }
-
- /**
* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still
* need to be able to read it (so that we can load the state store from previously-written changelog records).
*/
@@ -104,19 +78,8 @@ public final class FullChangeSerde<T> {
return null;
}
final ByteBuffer buffer = ByteBuffer.wrap(data);
-
- final int oldSize = buffer.getInt();
- final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize];
- if (oldBytes != null) {
- buffer.get(oldBytes);
- }
-
- final int newSize = buffer.getInt();
- final byte[] newBytes = newSize == -1 ? null : new byte[newSize];
- if (newBytes != null) {
- buffer.get(newBytes);
- }
-
+ final byte[] oldBytes = getNullableSizePrefixedArray(buffer);
+ final byte[] newBytes = getNullableSizePrefixedArray(buffer);
return new Change<>(newBytes, oldBytes);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 5662417..5dd0062 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -26,6 +26,8 @@ import java.nio.ByteBuffer;
import java.util.Objects;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
public class ProcessorRecordContext implements RecordContext {
@@ -161,12 +163,10 @@ public class ProcessorRecordContext implements RecordContext {
public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
final long timestamp = buffer.getLong();
final long offset = buffer.getLong();
- final int topicSize = buffer.getInt();
final String topic;
{
- // not handling the null topic condition, because we believe the topic will never be null when we serialize
- final byte[] topicBytes = new byte[topicSize];
- buffer.get(topicBytes);
+ // we believe the topic will never be null when we serialize
+ final byte[] topicBytes = requireNonNull(getNullableSizePrefixedArray(buffer));
topic = new String(topicBytes, UTF_8);
}
final int partition = buffer.getInt();
@@ -177,19 +177,8 @@ public class ProcessorRecordContext implements RecordContext {
} else {
final Header[] headerArr = new Header[headerCount];
for (int i = 0; i < headerCount; i++) {
- final int keySize = buffer.getInt();
- final byte[] keyBytes = new byte[keySize];
- buffer.get(keyBytes);
-
- final int valueSize = buffer.getInt();
- final byte[] valueBytes;
- if (valueSize == -1) {
- valueBytes = null;
- } else {
- valueBytes = new byte[valueSize];
- buffer.get(valueBytes);
- }
-
+ final byte[] keyBytes = requireNonNull(getNullableSizePrefixedArray(buffer));
+ final byte[] valueBytes = getNullableSizePrefixedArray(buffer);
headerArr[i] = new RecordHeader(new String(keyBytes, UTF_8), valueBytes);
}
headers = new RecordHeaders(headerArr);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
index b52ec24..f27ab19 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
@@ -22,6 +22,9 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
+import static org.apache.kafka.common.utils.Utils.getNullableArray;
+import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
+
public final class BufferValue {
private static final int NULL_VALUE_SENTINEL = -1;
private static final int OLD_PREV_DUPLICATE_VALUE_SENTINEL = -2;
@@ -67,35 +70,21 @@ public final class BufferValue {
static BufferValue deserialize(final ByteBuffer buffer) {
final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);
- final byte[] priorValue = extractValue(buffer);
+ final byte[] priorValue = getNullableSizePrefixedArray(buffer);
final byte[] oldValue;
final int oldValueLength = buffer.getInt();
- if (oldValueLength == NULL_VALUE_SENTINEL) {
- oldValue = null;
- } else if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
+ if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
oldValue = priorValue;
} else {
- oldValue = new byte[oldValueLength];
- buffer.get(oldValue);
+ oldValue = getNullableArray(buffer, oldValueLength);
}
- final byte[] newValue = extractValue(buffer);
+ final byte[] newValue = getNullableSizePrefixedArray(buffer);
return new BufferValue(priorValue, oldValue, newValue, context);
}
- private static byte[] extractValue(final ByteBuffer buffer) {
- final int valueLength = buffer.getInt();
- if (valueLength == NULL_VALUE_SENTINEL) {
- return null;
- } else {
- final byte[] value = new byte[valueLength];
- buffer.get(value);
- return value;
- }
- }
-
ByteBuffer serialize(final int endPadding) {
final int sizeOfValueLength = Integer.BYTES;
@@ -120,7 +109,7 @@ public final class BufferValue {
if (oldValue == null) {
buffer.putInt(NULL_VALUE_SENTINEL);
- } else if (priorValue == oldValue) {
+ } else if (Arrays.equals(priorValue, oldValue)) {
buffer.putInt(OLD_PREV_DUPLICATE_VALUE_SENTINEL);
} else {
buffer.putInt(sizeOfOldValue);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
index 3c24f52..a26b437 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
+import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
+
public class ContextualRecord {
private final byte[] value;
private final ProcessorRecordContext recordContext;
@@ -43,36 +45,10 @@ public class ContextualRecord {
return (value == null ? 0 : value.length) + recordContext.residentMemorySizeEstimate();
}
- ByteBuffer serialize(final int endPadding) {
- final byte[] serializedContext = recordContext.serialize();
-
- final int sizeOfContext = serializedContext.length;
- final int sizeOfValueLength = Integer.BYTES;
- final int sizeOfValue = value == null ? 0 : value.length;
- final ByteBuffer buffer = ByteBuffer.allocate(sizeOfContext + sizeOfValueLength + sizeOfValue + endPadding);
-
- buffer.put(serializedContext);
- if (value == null) {
- buffer.putInt(-1);
- } else {
- buffer.putInt(value.length);
- buffer.put(value);
- }
-
- return buffer;
- }
-
static ContextualRecord deserialize(final ByteBuffer buffer) {
final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);
-
- final int valueLength = buffer.getInt();
- if (valueLength == -1) {
- return new ContextualRecord(null, context);
- } else {
- final byte[] value = new byte[valueLength];
- buffer.get(value);
- return new ContextualRecord(value, context);
- }
+ final byte[] value = getNullableSizePrefixedArray(buffer);
+ return new ContextualRecord(value, context);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 4043cea..55921a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -38,9 +38,11 @@ import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.DeserializationResult;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -54,14 +56,19 @@ import java.util.function.Consumer;
import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV0;
+import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV1;
+import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV3;
+import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2;
public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> {
private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
- private static final RecordHeaders V_1_CHANGELOG_HEADERS =
- new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
- private static final RecordHeaders V_2_CHANGELOG_HEADERS =
- new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
+ private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1};
+ private static final byte[] V_2_CHANGELOG_HEADER_VALUE = {(byte) 2};
+ private static final byte[] V_3_CHANGELOG_HEADER_VALUE = {(byte) 3};
+ static final RecordHeaders CHANGELOG_HEADERS =
+ new RecordHeaders(new Header[] {new RecordHeader("v", V_3_CHANGELOG_HEADER_VALUE)});
private static final String METRIC_SCOPE = "in-memory-suppression";
private final Map<Bytes, BufferKey> index = new HashMap<>();
@@ -259,12 +266,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
final int sizeOfBufferTime = Long.BYTES;
final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
buffer.putLong(bufferKey.time());
-
collector.send(
changelogTopic,
key,
buffer.array(),
- V_2_CHANGELOG_HEADERS,
+ CHANGELOG_HEADERS,
partition,
null,
KEY_SERIALIZER,
@@ -286,6 +292,15 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch) {
for (final ConsumerRecord<byte[], byte[]> record : batch) {
+ if (record.partition() != partition) {
+ throw new IllegalStateException(
+ String.format(
+ "record partition [%d] is being restored by the wrong suppress partition [%d]",
+ record.partition(),
+ partition
+ )
+ );
+ }
final Bytes key = Bytes.wrap(record.key());
if (record.value() == null) {
// This was a tombstone. Delete the record.
@@ -299,92 +314,63 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
minTimestamp = sortedMap.isEmpty() ? Long.MAX_VALUE : sortedMap.firstKey().time();
}
}
-
- if (record.partition() != partition) {
- throw new IllegalStateException(
- String.format(
- "record partition [%d] is being restored by the wrong suppress partition [%d]",
- record.partition(),
- partition
- )
- );
- }
} else {
- if (record.headers().lastHeader("v") == null) {
- // in this case, the changelog value is just the serialized record value
- final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
- final long time = timeAndValue.getLong();
- final byte[] changelogValue = new byte[record.value().length - 8];
- timeAndValue.get(changelogValue);
-
- final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(changelogValue));
-
- final ProcessorRecordContext recordContext = new ProcessorRecordContext(
- record.timestamp(),
- record.offset(),
- record.partition(),
- record.topic(),
- record.headers()
- );
-
- cleanPut(
- time,
- key,
- new BufferValue(
- index.containsKey(key)
- ? internalPriorValueForBuffered(key)
- : change.oldValue,
- change.oldValue,
- change.newValue,
- recordContext
- )
- );
- } else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
- // in this case, the changelog value is a serialized ContextualRecord
- final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
- final long time = timeAndValue.getLong();
- final byte[] changelogValue = new byte[record.value().length - 8];
- timeAndValue.get(changelogValue);
-
- final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
- final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
-
- cleanPut(
- time,
- key,
- new BufferValue(
- index.containsKey(key)
- ? internalPriorValueForBuffered(key)
- : change.oldValue,
- change.oldValue,
- change.newValue,
- contextualRecord.recordContext()
- )
- );
- } else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
- // in this case, the changelog value is a serialized BufferValue
-
- final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value());
- final BufferValue bufferValue = BufferValue.deserialize(valueAndTime);
- final long time = valueAndTime.getLong();
- cleanPut(time, key, bufferValue);
+ final Header versionHeader = record.headers().lastHeader("v");
+ if (versionHeader == null) {
+ // Version 0:
+ // value:
+ // - buffer time
+ // - old value
+ // - new value
+ final byte[] previousBufferedValue = index.containsKey(key)
+ ? internalPriorValueForBuffered(key)
+ : null;
+ final DeserializationResult deserializationResult = deserializeV0(record, key, previousBufferedValue);
+ cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
+ } else if (Arrays.equals(versionHeader.value(), V_3_CHANGELOG_HEADER_VALUE)) {
+ // Version 3:
+ // value:
+ // - record context
+ // - prior value
+ // - old value
+ // - new value
+ // - buffer time
+ final DeserializationResult deserializationResult = deserializeV3(record, key);
+ cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
+
+ } else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) {
+ // Version 2:
+ // value:
+ // - record context
+ // - old value
+ // - new value
+ // - prior value
+ // - buffer time
+ // NOTE: 2.4.0, 2.4.1, and 2.5.0 actually encode Version 3 formatted data,
+ // but still set the Version 2 flag, so to deserialize, we have to duck type.
+ final DeserializationResult deserializationResult = duckTypeV2(record, key);
+ cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
+ } else if (Arrays.equals(versionHeader.value(), V_1_CHANGELOG_HEADER_VALUE)) {
+ // Version 1:
+ // value:
+ // - buffer time
+ // - record context
+ // - old value
+ // - new value
+ final byte[] previousBufferedValue = index.containsKey(key)
+ ? internalPriorValueForBuffered(key)
+ : null;
+ final DeserializationResult deserializationResult = deserializeV1(record, key, previousBufferedValue);
+ cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
} else {
throw new IllegalArgumentException("Restoring apparently invalid changelog record: " + record);
}
}
- if (record.partition() != partition) {
- throw new IllegalStateException(
- String.format(
- "record partition [%d] is being restored by the wrong suppress partition [%d]",
- record.partition(),
- partition
- )
- );
- }
}
updateBufferMetrics();
}
+
@Override
public void evictWhile(final Supplier<Boolean> predicate,
final Consumer<Eviction<K, V>> callback) {
@@ -481,8 +467,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
final BufferValue buffered = getBuffered(serializedKey);
final byte[] serializedPriorValue;
if (buffered == null) {
- final V priorValue = value.oldValue;
- serializedPriorValue = (priorValue == null) ? null : valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
+ serializedPriorValue = serialChange.oldValue;
} else {
serializedPriorValue = buffered.priorValue();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java
new file mode 100644
index 0000000..74489c2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+
+import java.nio.ByteBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+final class TimeOrderedKeyValueBufferChangelogDeserializationHelper {
+ private TimeOrderedKeyValueBufferChangelogDeserializationHelper() {}
+
+ static final class DeserializationResult {
+ private final long time;
+ private final Bytes key;
+ private final BufferValue bufferValue;
+
+ private DeserializationResult(final long time, final Bytes key, final BufferValue bufferValue) {
+ this.time = time;
+ this.key = key;
+ this.bufferValue = bufferValue;
+ }
+
+ long time() {
+ return time;
+ }
+
+ Bytes key() {
+ return key;
+ }
+
+ BufferValue bufferValue() {
+ return bufferValue;
+ }
+ }
+
+ static DeserializationResult deserializeV0(final ConsumerRecord<byte[], byte[]> record,
+ final Bytes key,
+ final byte[] previousBufferedValue) {
+
+ final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
+ final long time = timeAndValue.getLong();
+ final byte[] changelogValue = new byte[record.value().length - 8];
+ timeAndValue.get(changelogValue);
+
+ final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(changelogValue));
+
+ final ProcessorRecordContext recordContext = new ProcessorRecordContext(
+ record.timestamp(),
+ record.offset(),
+ record.partition(),
+ record.topic(),
+ record.headers()
+ );
+
+ return new DeserializationResult(
+ time,
+ key,
+ new BufferValue(
+ previousBufferedValue == null ? change.oldValue : previousBufferedValue,
+ change.oldValue,
+ change.newValue,
+ recordContext
+ )
+ );
+ }
+
+ static DeserializationResult deserializeV1(final ConsumerRecord<byte[], byte[]> record,
+ final Bytes key,
+ final byte[] previousBufferedValue) {
+ final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
+ final long time = timeAndValue.getLong();
+ final byte[] changelogValue = new byte[record.value().length - 8];
+ timeAndValue.get(changelogValue);
+
+ final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
+ final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
+
+ return new DeserializationResult(
+ time,
+ key,
+ new BufferValue(
+ previousBufferedValue == null ? change.oldValue : previousBufferedValue,
+ change.oldValue,
+ change.newValue,
+ contextualRecord.recordContext()
+ )
+ );
+ }
+
+ static DeserializationResult duckTypeV2(final ConsumerRecord<byte[], byte[]> record, final Bytes key) {
+ DeserializationResult deserializationResult = null;
+ RuntimeException v2DeserializationException = null;
+ RuntimeException v3DeserializationException = null;
+ try {
+ deserializationResult = deserializeV2(record, key);
+ } catch (final RuntimeException e) {
+ v2DeserializationException = e;
+ }
+ // versions 2.4.0, 2.4.1, and 2.5.0 would have erroneously encoded a V3 record with the
+ // V2 header, so we'll try duck-typing to see if this is decodable as V3
+ if (deserializationResult == null) {
+ try {
+ deserializationResult = deserializeV3(record, key);
+ } catch (final RuntimeException e) {
+ v3DeserializationException = e;
+ }
+ }
+
+ if (deserializationResult == null) {
+ // ok, it wasn't V3 either. Throw both exceptions:
+ final RuntimeException exception =
+ new RuntimeException("Couldn't deserialize record as v2 or v3: " + record,
+ v2DeserializationException);
+ exception.addSuppressed(v3DeserializationException);
+ throw exception;
+ }
+ return deserializationResult;
+ }
+
+ private static DeserializationResult deserializeV2(final ConsumerRecord<byte[], byte[]> record,
+ final Bytes key) {
+ final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value());
+ final ContextualRecord contextualRecord = ContextualRecord.deserialize(valueAndTime);
+ final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
+ final byte[] priorValue = Utils.getNullableSizePrefixedArray(valueAndTime);
+ final long time = valueAndTime.getLong();
+ final BufferValue bufferValue = new BufferValue(priorValue, change.oldValue, change.newValue, contextualRecord.recordContext());
+ return new DeserializationResult(time, key, bufferValue);
+ }
+
+ static DeserializationResult deserializeV3(final ConsumerRecord<byte[], byte[]> record, final Bytes key) {
+ final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value());
+ final BufferValue bufferValue = BufferValue.deserialize(valueAndTime);
+ final long time = valueAndTime.getLong();
+ return new DeserializationResult(time, key, bufferValue);
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
index ac6762f..e7e0c88 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.junit.Test;
+import java.nio.ByteBuffer;
+
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@@ -26,10 +28,37 @@ import static org.hamcrest.core.Is.is;
public class FullChangeSerdeTest {
private final FullChangeSerde<String> serde = FullChangeSerde.wrap(Serdes.String());
+ /**
+ * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
+ * so that we can produce the legacy format to test that we can still deserialize it.
+ */
+ private static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {
+ if (serialChange == null) {
+ return null;
+ }
+
+ final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length;
+ final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length;
+
+ final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize));
+
+
+ buffer.putInt(oldSize);
+ if (serialChange.oldValue != null) {
+ buffer.put(serialChange.oldValue);
+ }
+
+ buffer.putInt(newSize);
+ if (serialChange.newValue != null) {
+ buffer.put(serialChange.newValue);
+ }
+ return buffer.array();
+ }
+
@Test
public void shouldRoundTripNull() {
assertThat(serde.serializeParts(null, null), nullValue());
- assertThat(FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(null), nullValue());
+ assertThat(mergeChangeArraysIntoSingleLegacyFormattedArray(null), nullValue());
assertThat(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(null), nullValue());
assertThat(serde.deserializeParts(null, null), nullValue());
}
@@ -47,7 +76,7 @@ public class FullChangeSerdeTest {
is(new Change<String>(null, null))
);
- final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(new Change<>(null, null));
+ final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(new Change<>(null, null));
assertThat(
FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat),
is(new Change<byte[]>(null, null))
@@ -57,7 +86,7 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripOldNull() {
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", null));
- final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
+ final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
serde.deserializeParts(null, decomposedLegacyFormat),
@@ -68,7 +97,7 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripNewNull() {
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>(null, "old"));
- final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
+ final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
serde.deserializeParts(null, decomposedLegacyFormat),
@@ -79,7 +108,7 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripChange() {
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", "old"));
- final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
+ final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
serde.deserializeParts(null, decomposedLegacyFormat),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 25a44c4..a054ac9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.Change;
-import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
@@ -56,14 +55,13 @@ import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.CHANGELOG_HEADERS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<String, String>> {
- private static final RecordHeaders V_2_CHANGELOG_HEADERS =
- new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
private static final String APP_ID = "test-app";
private final Function<String, B> bufferSupplier;
@@ -73,7 +71,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
@Override
public byte[] serialize(final String topic, final String data) {
if (data == null) {
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("null data not allowed");
}
return super.serialize(topic, data);
}
@@ -347,14 +345,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
null,
"zxcv",
new KeyValue<>(1L, getBufferValue("3gon4i", 1)),
- V_2_CHANGELOG_HEADERS
+ CHANGELOG_HEADERS
),
new ProducerRecord<>(APP_ID + "-" + testName + "-changelog",
0,
null,
"asdf",
new KeyValue<>(2L, getBufferValue("2093j", 0)),
- V_2_CHANGELOG_HEADERS
+ CHANGELOG_HEADERS
)
)));
@@ -362,7 +360,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
}
@Test
- public void shouldRestoreOldFormat() {
+ public void shouldRestoreOldUnversionedFormat() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
@@ -372,12 +370,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
- final FullChangeSerde<String> serializer = FullChangeSerde.wrap(Serdes.String());
+ // These serialized formats were captured by running version 2.1 code.
+ // They verify that an upgrade from 2.1 will work.
+ // Do not change them.
+ final String toDeleteBinaryValue = "0000000000000000FFFFFFFF00000006646F6F6D6564";
+ final String asdfBinaryValue = "0000000000000002FFFFFFFF0000000471776572";
+ final String zxcvBinaryValue1 = "00000000000000010000000870726576696F757300000005656F34696D";
+ final String zxcvBinaryValue2 = "000000000000000100000005656F34696D000000046E657874";
- final byte[] todeleteValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("doomed", null)));
- final byte[] asdfValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("qwer", null)));
- final byte[] zxcvValue1 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("eo4im", "previous")));
- final byte[] zxcvValue2 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("next", "eo4im")));
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
0,
@@ -388,7 +388,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"todelete".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array()),
+ hexStringToByteArray(toDeleteBinaryValue)),
new ConsumerRecord<>("changelog-topic",
0,
1,
@@ -398,7 +398,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"asdf".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES + asdfValue.length).putLong(2L).put(asdfValue).array()),
+ hexStringToByteArray(asdfBinaryValue)),
new ConsumerRecord<>("changelog-topic",
0,
2,
@@ -408,7 +408,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"zxcv".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).putLong(1L).put(zxcvValue1).array()),
+ hexStringToByteArray(zxcvBinaryValue1)),
new ConsumerRecord<>("changelog-topic",
0,
3,
@@ -418,7 +418,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"zxcv".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).putLong(1L).put(zxcvValue2).array())
+ hexStringToByteArray(zxcvBinaryValue2))
));
assertThat(buffer.numRecords(), is(3));
@@ -486,17 +486,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
- final byte[] todeleteValue = getContextualRecord("doomed", 0).serialize(0).array();
- final byte[] asdfValue = getContextualRecord("qwer", 1).serialize(0).array();
- final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
- final byte[] zxcvValue1 = new ContextualRecord(
- FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))),
- getContext(2L)
- ).serialize(0).array();
- final byte[] zxcvValue2 = new ContextualRecord(
- FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))),
- getContext(3L)
- ).serialize(0).array();
+ // These serialized formats were captured by running version 2.2 code.
+ // They verify that an upgrade from 2.2 will work.
+ // Do not change them.
+ final String toDeleteBinary = "00000000000000000000000000000000000000000000000000000005746F70696300000000FFFFFFFF0000000EFFFFFFFF00000006646F6F6D6564";
+ final String asdfBinary = "00000000000000020000000000000001000000000000000000000005746F70696300000000FFFFFFFF0000000CFFFFFFFF0000000471776572";
+ final String zxcvBinary1 = "00000000000000010000000000000002000000000000000000000005746F70696300000000FFFFFFFF000000150000000870726576696F757300000005336F34696D";
+ final String zxcvBinary2 = "00000000000000010000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000001100000005336F34696D000000046E657874";
+
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
0,
@@ -507,7 +504,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"todelete".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array(),
+ hexStringToByteArray(toDeleteBinary),
v1FlagHeaders),
new ConsumerRecord<>("changelog-topic",
0,
@@ -518,7 +515,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"asdf".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES + asdfValue.length).putLong(2L).put(asdfValue).array(),
+ hexStringToByteArray(asdfBinary),
v1FlagHeaders),
new ConsumerRecord<>("changelog-topic",
0,
@@ -529,7 +526,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"zxcv".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).putLong(1L).put(zxcvValue1).array(),
+ hexStringToByteArray(zxcvBinary1),
v1FlagHeaders),
new ConsumerRecord<>("changelog-topic",
0,
@@ -540,7 +537,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"zxcv".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).putLong(1L).put(zxcvValue2).array(),
+ hexStringToByteArray(zxcvBinary2),
v1FlagHeaders)
));
@@ -596,6 +593,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
cleanup(context, buffer);
}
+
@Test
public void shouldRestoreV2Format() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
@@ -609,22 +607,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final RecordHeaders v2FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
- final byte[] todeleteValue = getBufferValue("doomed", 0).serialize(0).array();
- final byte[] asdfValue = getBufferValue("qwer", 1).serialize(0).array();
- final byte[] zxcvValue1 =
- new BufferValue(
- Serdes.String().serializer().serialize(null, "previous"),
- Serdes.String().serializer().serialize(null, "IGNORED"),
- Serdes.String().serializer().serialize(null, "3o4im"),
- getContext(2L)
- ).serialize(0).array();
- final byte[] zxcvValue2 =
- new BufferValue(
- Serdes.String().serializer().serialize(null, "previous"),
- Serdes.String().serializer().serialize(null, "3o4im"),
- Serdes.String().serializer().serialize(null, "next"),
- getContext(3L)
- ).serialize(0).array();
+ // These serialized formats were captured by running version 2.3 code.
+ // They verify that an upgrade from 2.3 will work.
+ // Do not change them.
+ final String toDeleteBinary = "0000000000000000000000000000000000000005746F70696300000000FFFFFFFF0000000EFFFFFFFF00000006646F6F6D6564FFFFFFFF0000000000000000";
+ final String asdfBinary = "0000000000000001000000000000000000000005746F70696300000000FFFFFFFF0000000CFFFFFFFF0000000471776572FFFFFFFF0000000000000002";
+ final String zxcvBinary1 = "0000000000000002000000000000000000000005746F70696300000000FFFFFFFF000000140000000749474E4F52454400000005336F34696D0000000870726576696F75730000000000000001";
+ final String zxcvBinary2 = "0000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000001100000005336F34696D000000046E6578740000000870726576696F75730000000000000001";
+
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
0,
@@ -635,7 +625,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"todelete".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES + todeleteValue.length).put(todeleteValue).putLong(0L).array(),
+ hexStringToByteArray(toDeleteBinary),
v2FlagHeaders),
new ConsumerRecord<>("changelog-topic",
0,
@@ -646,7 +636,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"asdf".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES + asdfValue.length).put(asdfValue).putLong(2L).array(),
+ hexStringToByteArray(asdfBinary),
v2FlagHeaders),
new ConsumerRecord<>("changelog-topic",
0,
@@ -657,7 +647,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"zxcv".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).put(zxcvValue1).putLong(1L).array(),
+ hexStringToByteArray(zxcvBinary1),
v2FlagHeaders),
new ConsumerRecord<>("changelog-topic",
0,
@@ -668,7 +658,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"zxcv".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).put(zxcvValue2).putLong(1L).array(),
+ hexStringToByteArray(zxcvBinary2),
v2FlagHeaders)
));
@@ -725,6 +715,249 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
}
@Test
+ public void shouldRestoreV3FormatWithV2Header() {
+ // versions 2.4.0, 2.4.1, and 2.5.0 would have erroneously encoded a V3 record with the
+ // V2 header, so we need to be sure to handle this case as well.
+ // Note the data is the same as the V3 test.
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+
+ final RecordBatchingStateRestoreCallback stateRestoreCallback =
+ (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
+
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
+
+ final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
+
+ // These serialized formats were captured by running version 2.4 code.
+ // They verify that an upgrade from 2.4 will work.
+ // Do not change them.
+ final String toDeleteBinary = "0000000000000000000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000006646F6F6D65640000000000000000";
+ final String asdfBinary = "0000000000000001000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000004717765720000000000000002";
+ final String zxcvBinary1 = "0000000000000002000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F75730000000749474E4F52454400000005336F34696D0000000000000001";
+ final String zxcvBinary2 = "0000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F757300000005336F34696D000000046E6578740000000000000001";
+
+ stateRestoreCallback.restoreBatch(asList(
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 0,
+ 999,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "todelete".getBytes(UTF_8),
+ hexStringToByteArray(toDeleteBinary),
+ headers),
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 1,
+ 9999,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "asdf".getBytes(UTF_8),
+ hexStringToByteArray(asdfBinary),
+ headers),
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 2,
+ 99,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "zxcv".getBytes(UTF_8),
+ hexStringToByteArray(zxcvBinary1),
+ headers),
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 2,
+ 100,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "zxcv".getBytes(UTF_8),
+ hexStringToByteArray(zxcvBinary2),
+ headers)
+ ));
+
+ assertThat(buffer.numRecords(), is(3));
+ assertThat(buffer.minTimestamp(), is(0L));
+ assertThat(buffer.bufferSize(), is(142L));
+
+ stateRestoreCallback.restoreBatch(singletonList(
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 3,
+ 3,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "todelete".getBytes(UTF_8),
+ null)
+ ));
+
+ assertThat(buffer.numRecords(), is(2));
+ assertThat(buffer.minTimestamp(), is(1L));
+ assertThat(buffer.bufferSize(), is(95L));
+
+ assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
+ assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
+ assertThat(buffer.priorValueForBuffered("zxcv"), is(Maybe.defined(ValueAndTimestamp.make("previous", -1))));
+
+ // flush the buffer into a list in buffer order so we can make assertions about the contents.
+
+ final List<Eviction<String, String>> evicted = new LinkedList<>();
+ buffer.evictWhile(() -> true, evicted::add);
+
+ // Several things to note:
+ // * The buffered records are ordered according to their buffer time (serialized in the value of the changelog)
+ // * The record timestamps are properly restored, and not conflated with the record's buffer time.
+ // * The keys and values are properly restored
+ // * The record topic is set to the original input topic, *not* the changelog topic
+ // * The record offset preserves the original input record's offset, *not* the offset of the changelog record
+
+
+ assertThat(evicted, is(asList(
+ new Eviction<>(
+ "zxcv",
+ new Change<>("next", "3o4im"),
+ getContext(3L)),
+ new Eviction<>(
+ "asdf",
+ new Change<>("qwer", null),
+ getContext(1L)
+ ))));
+
+ cleanup(context, buffer);
+ }
+
+ @Test
+ public void shouldRestoreV3Format() {
+ final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
+ final MockInternalProcessorContext context = makeContext();
+ buffer.init(context, buffer);
+
+ final RecordBatchingStateRestoreCallback stateRestoreCallback =
+ (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
+
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
+
+ final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 3})});
+
+ // These serialized formats were captured by running version 2.4 code.
+ // They verify that an upgrade from 2.4 will work.
+ // Do not change them.
+ final String toDeleteBinary = "0000000000000000000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000006646F6F6D65640000000000000000";
+ final String asdfBinary = "0000000000000001000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000004717765720000000000000002";
+ final String zxcvBinary1 = "0000000000000002000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F75730000000749474E4F52454400000005336F34696D0000000000000001";
+ final String zxcvBinary2 = "0000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F757300000005336F34696D000000046E6578740000000000000001";
+
+ stateRestoreCallback.restoreBatch(asList(
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 0,
+ 999,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "todelete".getBytes(UTF_8),
+ hexStringToByteArray(toDeleteBinary),
+ headers),
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 1,
+ 9999,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "asdf".getBytes(UTF_8),
+ hexStringToByteArray(asdfBinary),
+ headers),
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 2,
+ 99,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "zxcv".getBytes(UTF_8),
+ hexStringToByteArray(zxcvBinary1),
+ headers),
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 2,
+ 100,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "zxcv".getBytes(UTF_8),
+ hexStringToByteArray(zxcvBinary2),
+ headers)
+ ));
+
+ assertThat(buffer.numRecords(), is(3));
+ assertThat(buffer.minTimestamp(), is(0L));
+ assertThat(buffer.bufferSize(), is(142L));
+
+ stateRestoreCallback.restoreBatch(singletonList(
+ new ConsumerRecord<>("changelog-topic",
+ 0,
+ 3,
+ 3,
+ TimestampType.CREATE_TIME,
+ -1L,
+ -1,
+ -1,
+ "todelete".getBytes(UTF_8),
+ null)
+ ));
+
+ assertThat(buffer.numRecords(), is(2));
+ assertThat(buffer.minTimestamp(), is(1L));
+ assertThat(buffer.bufferSize(), is(95L));
+
+ assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
+ assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
+ assertThat(buffer.priorValueForBuffered("zxcv"), is(Maybe.defined(ValueAndTimestamp.make("previous", -1))));
+
+ // flush the buffer into a list in buffer order so we can make assertions about the contents.
+
+ final List<Eviction<String, String>> evicted = new LinkedList<>();
+ buffer.evictWhile(() -> true, evicted::add);
+
+ // Several things to note:
+ // * The buffered records are ordered according to their buffer time (serialized in the value of the changelog)
+ // * The record timestamps are properly restored, and not conflated with the record's buffer time.
+ // * The keys and values are properly restored
+ // * The record topic is set to the original input topic, *not* the changelog topic
+ // * The record offset preserves the original input record's offset, *not* the offset of the changelog record
+
+
+ assertThat(evicted, is(asList(
+ new Eviction<>(
+ "zxcv",
+ new Change<>("next", "3o4im"),
+ getContext(3L)),
+ new Eviction<>(
+ "asdf",
+ new Change<>("qwer", null),
+ getContext(1L)
+ ))));
+
+ cleanup(context, buffer);
+ }
+
+ @Test
public void shouldNotRestoreUnrecognizedVersionRecord() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
@@ -780,15 +1013,30 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
);
}
- private static ContextualRecord getContextualRecord(final String value, final long timestamp) {
- final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
- return new ContextualRecord(
- FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>(value, null))),
- getContext(timestamp)
- );
- }
-
private static ProcessorRecordContext getContext(final long recordTimestamp) {
return new ProcessorRecordContext(recordTimestamp, 0, 0, "topic", null);
}
+
+
+ // to be used to generate future hex-encoded values
+// private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
+// private static String bytesToHex(final byte[] bytes) {
+// final char[] hexChars = new char[bytes.length * 2];
+// for (int j = 0; j < bytes.length; j++) {
+// final int v = bytes[j] & 0xFF;
+// hexChars[j * 2] = HEX_ARRAY[v >>> 4];
+// hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
+// }
+// return new String(hexChars);
+// }
+
+ private static byte[] hexStringToByteArray(final String hexString) {
+ final int len = hexString.length();
+ final byte[] data = new byte[len / 2];
+ for (int i = 0; i < len; i += 2) {
+ data[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4)
+ + Character.digit(hexString.charAt(i + 1), 16));
+ }
+ return data;
+ }
}