You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/06/13 13:48:41 UTC
[kafka] branch trunk updated: KAFKA-8452: Compressed BufferValue
(#6848)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e54ab29 KAFKA-8452: Compressed BufferValue (#6848)
e54ab29 is described below
commit e54ab292e7e2fd1d18e82387a586969ba57eb7ea
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Jun 13 08:48:15 2019 -0500
KAFKA-8452: Compressed BufferValue (#6848)
De-duplicate the common case in which the prior value is the same as the old value.
Reviewers: Sophie Blee-Goldman <so...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
.../kafka/streams/kstream/internals/Change.java | 2 +-
.../streams/kstream/internals/FullChangeSerde.java | 153 +++++++---------
.../internals/ProcessorRecordContext.java | 2 +-
.../kafka/streams/state/internals/BufferValue.java | 137 +++++++++++---
.../streams/state/internals/ContextualRecord.java | 4 +-
.../InMemoryTimeOrderedKeyValueBuffer.java | 71 ++++---
.../streams/state/internals/LRUCacheEntry.java | 2 +-
.../kstream/internals/FullChangeSerdeTest.java | 132 +++-----------
.../KTableSuppressProcessorMetricsTest.java | 15 +-
.../suppress/KTableSuppressProcessorTest.java | 3 +-
.../kstream/internals/suppress/SuppressSuite.java | 14 +-
.../internals/ProcessorRecordContextTest.java | 10 +-
.../streams/state/internals/BufferValueTest.java | 203 +++++++++++++++++++++
.../internals/TimeOrderedKeyValueBufferTest.java | 78 ++++----
14 files changed, 518 insertions(+), 308 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
index c9a18de..f28a16d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
@@ -30,7 +30,7 @@ public class Change<T> {
@Override
public String toString() {
- return "(" + newValue + "<-" + oldValue + ")";
+ return "(" + String.valueOf(newValue) + "<-" + String.valueOf(oldValue) + ")";
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index 30d55be..f28f9e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -21,19 +21,15 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
-import java.util.Map;
import static java.util.Objects.requireNonNull;
-public final class FullChangeSerde<T> implements Serde<Change<T>> {
+public final class FullChangeSerde<T> {
private final Serde<T> inner;
- @SuppressWarnings("unchecked")
- public static <T> FullChangeSerde<T> castOrWrap(final Serde<T> serde) {
+ public static <T> FullChangeSerde<T> wrap(final Serde<T> serde) {
if (serde == null) {
return null;
- } else if (serde instanceof FullChangeSerde) {
- return (FullChangeSerde<T>) serde;
} else {
return new FullChangeSerde<>(serde);
}
@@ -47,98 +43,81 @@ public final class FullChangeSerde<T> implements Serde<Change<T>> {
return inner;
}
- @Override
- public void configure(final Map<String, ?> configs, final boolean isKey) {
- inner.configure(configs, isKey);
+ public Change<byte[]> serializeParts(final String topic, final Change<T> data) {
+ if (data == null) {
+ return null;
+ }
+ final Serializer<T> innerSerializer = innerSerde().serializer();
+ final byte[] oldBytes = data.oldValue == null ? null : innerSerializer.serialize(topic, data.oldValue);
+ final byte[] newBytes = data.newValue == null ? null : innerSerializer.serialize(topic, data.newValue);
+ return new Change<>(newBytes, oldBytes);
}
- @Override
- public void close() {
- inner.close();
- }
- @Override
- public Serializer<Change<T>> serializer() {
- final Serializer<T> innerSerializer = inner.serializer();
-
- return new Serializer<Change<T>>() {
- @Override
- public void configure(final Map<String, ?> configs, final boolean isKey) {
- innerSerializer.configure(configs, isKey);
- }
-
- @Override
- public byte[] serialize(final String topic, final Change<T> data) {
- if (data == null) {
- return null;
- }
- final byte[] oldBytes = data.oldValue == null ? null : innerSerializer.serialize(topic, data.oldValue);
- final int oldSize = oldBytes == null ? -1 : oldBytes.length;
- final byte[] newBytes = data.newValue == null ? null : innerSerializer.serialize(topic, data.newValue);
- final int newSize = newBytes == null ? -1 : newBytes.length;
-
- final ByteBuffer buffer = ByteBuffer.wrap(
- new byte[4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize == -1 ? 0 : newSize)]
- );
- buffer.putInt(oldSize);
- if (oldBytes != null) {
- buffer.put(oldBytes);
- }
- buffer.putInt(newSize);
- if (newBytes != null) {
- buffer.put(newBytes);
- }
- return buffer.array();
- }
-
- @Override
- public void close() {
- innerSerializer.close();
- }
- };
+ public Change<T> deserializeParts(final String topic, final Change<byte[]> serialChange) {
+ if (serialChange == null) {
+ return null;
+ }
+ final Deserializer<T> innerDeserializer = innerSerde().deserializer();
+
+ final T oldValue =
+ serialChange.oldValue == null ? null : innerDeserializer.deserialize(topic, serialChange.oldValue);
+ final T newValue =
+ serialChange.newValue == null ? null : innerDeserializer.deserialize(topic, serialChange.newValue);
+
+ return new Change<>(newValue, oldValue);
}
- @Override
- public Deserializer<Change<T>> deserializer() {
- final Deserializer<T> innerDeserializer = inner.deserializer();
- return new Deserializer<Change<T>>() {
- @Override
- public void configure(final Map<String, ?> configs, final boolean isKey) {
- innerDeserializer.configure(configs, isKey);
- }
-
- @Override
- public Change<T> deserialize(final String topic, final byte[] data) {
- if (data == null) {
- return null;
- }
- final ByteBuffer buffer = ByteBuffer.wrap(data);
-
- final byte[] oldBytes = extractOldValuePart(buffer);
- final T oldValue = oldBytes == null ? null : innerDeserializer.deserialize(topic, oldBytes);
-
- final int newSize = buffer.getInt();
- final byte[] newBytes = newSize == -1 ? null : new byte[newSize];
- if (newBytes != null) {
- buffer.get(newBytes);
- }
- final T newValue = newBytes == null ? null : innerDeserializer.deserialize(topic, newBytes);
- return new Change<>(newValue, oldValue);
- }
-
- @Override
- public void close() {
- innerDeserializer.close();
- }
- };
+ /**
+ * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
+ * so that we can produce the legacy format to test that we can still deserialize it.
+ */
+ public static byte[] composeLegacyFormat(final Change<byte[]> serialChange) {
+ if (serialChange == null) {
+ return null;
+ }
+
+ final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length;
+ final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length;
+
+ final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize));
+
+
+ buffer.putInt(oldSize);
+ if (serialChange.oldValue != null) {
+ buffer.put(serialChange.oldValue);
+ }
+
+ buffer.putInt(newSize);
+ if (serialChange.newValue != null) {
+ buffer.put(serialChange.newValue);
+ }
+ return buffer.array();
}
- public static byte[] extractOldValuePart(final ByteBuffer buffer) {
+ /**
+ * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still
+ * need to be able to read it (so that we can load the state store from previously-written changelog records).
+ */
+ public static Change<byte[]> decomposeLegacyFormat(final byte[] data) {
+ if (data == null) {
+ return null;
+ }
+ final ByteBuffer buffer = ByteBuffer.wrap(data);
+
final int oldSize = buffer.getInt();
final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize];
if (oldBytes != null) {
buffer.get(oldBytes);
}
- return oldBytes;
+
+ final int newSize = buffer.getInt();
+ final byte[] newBytes = newSize == -1 ? null : new byte[newSize];
+ if (newBytes != null) {
+ buffer.get(newBytes);
+ }
+
+ return new Change<>(newBytes, oldBytes);
}
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 1b22482..5662417 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -73,7 +73,7 @@ public class ProcessorRecordContext implements RecordContext {
return headers;
}
- public long sizeBytes() {
+ public long residentMemorySizeEstimate() {
long size = 0;
size += Long.BYTES; // value.context.timestamp
size += Long.BYTES; // value.context.offset
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
index 816894e..f1990c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
@@ -16,60 +16,133 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
public final class BufferValue {
- private final ContextualRecord record;
+ private static final int NULL_VALUE_SENTINEL = -1;
+ private static final int OLD_PREV_DUPLICATE_VALUE_SENTINEL = -2;
private final byte[] priorValue;
+ private final byte[] oldValue;
+ private final byte[] newValue;
+ private final ProcessorRecordContext recordContext;
+
+ BufferValue(final byte[] priorValue, final byte[] oldValue, final byte[] newValue, final ProcessorRecordContext recordContext) {
+ this.oldValue = oldValue;
+ this.newValue = newValue;
+ this.recordContext = recordContext;
+
+ // This de-duplicates the prior and old references.
+ // If they were already the same reference, the comparison is trivially fast, so we don't specifically check
+ // for that case.
+ if (Arrays.equals(priorValue, oldValue)) {
+ this.priorValue = oldValue;
+ } else {
+ this.priorValue = priorValue;
+ }
+ }
- BufferValue(final ContextualRecord record,
- final byte[] priorValue) {
- this.record = record;
- this.priorValue = priorValue;
+ byte[] priorValue() {
+ return priorValue;
}
- ContextualRecord record() {
- return record;
+ byte[] oldValue() {
+ return oldValue;
}
- byte[] priorValue() {
- return priorValue;
+ byte[] newValue() {
+ return newValue;
+ }
+
+ ProcessorRecordContext context() {
+ return recordContext;
}
static BufferValue deserialize(final ByteBuffer buffer) {
- final ContextualRecord record = ContextualRecord.deserialize(buffer);
+ final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);
+
+ final byte[] priorValue = extractValue(buffer);
+
+ final byte[] oldValue;
+ final int oldValueLength = buffer.getInt();
+ if (oldValueLength == NULL_VALUE_SENTINEL) {
+ oldValue = null;
+ } else if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
+ oldValue = priorValue;
+ } else {
+ oldValue = new byte[oldValueLength];
+ buffer.get(oldValue);
+ }
+
+ final byte[] newValue = extractValue(buffer);
+
+ return new BufferValue(priorValue, oldValue, newValue, context);
+ }
- final int priorValueLength = buffer.getInt();
- if (priorValueLength == -1) {
- return new BufferValue(record, null);
+ private static byte[] extractValue(final ByteBuffer buffer) {
+ final int valueLength = buffer.getInt();
+ if (valueLength == NULL_VALUE_SENTINEL) {
+ return null;
} else {
- final byte[] priorValue = new byte[priorValueLength];
- buffer.get(priorValue);
- return new BufferValue(record, priorValue);
+ final byte[] value = new byte[valueLength];
+ buffer.get(value);
+ return value;
}
}
ByteBuffer serialize(final int endPadding) {
- final int sizeOfPriorValueLength = Integer.BYTES;
+ final int sizeOfValueLength = Integer.BYTES;
+
final int sizeOfPriorValue = priorValue == null ? 0 : priorValue.length;
+ final int sizeOfOldValue = oldValue == null || priorValue == oldValue ? 0 : oldValue.length;
+ final int sizeOfNewValue = newValue == null ? 0 : newValue.length;
+
+ final byte[] serializedContext = recordContext.serialize();
+
+ final ByteBuffer buffer = ByteBuffer.allocate(
+ serializedContext.length
+ + sizeOfValueLength + sizeOfPriorValue
+ + sizeOfValueLength + sizeOfOldValue
+ + sizeOfValueLength + sizeOfNewValue
+ + endPadding
+ );
- final ByteBuffer buffer = record.serialize(sizeOfPriorValueLength + sizeOfPriorValue + endPadding);
+ buffer.put(serializedContext);
- if (priorValue == null) {
- buffer.putInt(-1);
+ addValue(buffer, priorValue);
+
+ if (oldValue == null) {
+ buffer.putInt(NULL_VALUE_SENTINEL);
+ } else if (priorValue == oldValue) {
+ buffer.putInt(OLD_PREV_DUPLICATE_VALUE_SENTINEL);
} else {
- buffer.putInt(priorValue.length);
- buffer.put(priorValue);
+ buffer.putInt(sizeOfOldValue);
+ buffer.put(oldValue);
}
+ addValue(buffer, newValue);
+
return buffer;
}
- long sizeBytes() {
- return (priorValue == null ? 0 : priorValue.length) + record.sizeBytes();
+ private static void addValue(final ByteBuffer buffer, final byte[] value) {
+ if (value == null) {
+ buffer.putInt(NULL_VALUE_SENTINEL);
+ } else {
+ buffer.putInt(value.length);
+ buffer.put(value);
+ }
+ }
+
+ long residentMemorySizeEstimate() {
+ return (priorValue == null ? 0 : priorValue.length)
+ + (oldValue == null || priorValue == oldValue ? 0 : oldValue.length)
+ + (newValue == null ? 0 : newValue.length)
+ + recordContext.residentMemorySizeEstimate();
}
@Override
@@ -77,22 +150,28 @@ public final class BufferValue {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final BufferValue that = (BufferValue) o;
- return Objects.equals(record, that.record) &&
- Arrays.equals(priorValue, that.priorValue);
+ return Arrays.equals(priorValue, that.priorValue) &&
+ Arrays.equals(oldValue, that.oldValue) &&
+ Arrays.equals(newValue, that.newValue) &&
+ Objects.equals(recordContext, that.recordContext);
}
@Override
public int hashCode() {
- int result = Objects.hash(record);
+ int result = Objects.hash(recordContext);
result = 31 * result + Arrays.hashCode(priorValue);
+ result = 31 * result + Arrays.hashCode(oldValue);
+ result = 31 * result + Arrays.hashCode(newValue);
return result;
}
@Override
public String toString() {
return "BufferValue{" +
- "record=" + record +
- ", priorValue=" + Arrays.toString(priorValue) +
+ "priorValue=" + Arrays.toString(priorValue) +
+ ", oldValue=" + Arrays.toString(oldValue) +
+ ", newValue=" + Arrays.toString(newValue) +
+ ", recordContext=" + recordContext +
'}';
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
index 3cd2c37..3c24f52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
@@ -39,8 +39,8 @@ public class ContextualRecord {
return value;
}
- long sizeBytes() {
- return (value == null ? 0 : value.length) + recordContext.sizeBytes();
+ long residentMemorySizeEstimate() {
+ return (value == null ? 0 : value.length) + recordContext.residentMemorySizeEstimate();
}
ByteBuffer serialize(final int endPadding) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 6c5022f..6c6ef36 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -159,7 +159,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
this.storeName = storeName;
this.loggingEnabled = loggingEnabled;
this.keySerde = keySerde;
- this.valueSerde = FullChangeSerde.castOrWrap(valueSerde);
+ this.valueSerde = FullChangeSerde.wrap(valueSerde);
}
@Override
@@ -176,7 +176,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
@Override
public void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde) {
this.keySerde = this.keySerde == null ? keySerde : this.keySerde;
- this.valueSerde = this.valueSerde == null ? FullChangeSerde.castOrWrap(valueSerde) : this.valueSerde;
+ this.valueSerde = this.valueSerde == null ? FullChangeSerde.wrap(valueSerde) : this.valueSerde;
}
@Override
@@ -296,21 +296,26 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
final byte[] changelogValue = new byte[record.value().length - 8];
timeAndValue.get(changelogValue);
+ final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(changelogValue));
+
+ final ProcessorRecordContext recordContext = new ProcessorRecordContext(
+ record.timestamp(),
+ record.offset(),
+ record.partition(),
+ record.topic(),
+ record.headers()
+ );
+
cleanPut(
time,
key,
new BufferValue(
- new ContextualRecord(
- changelogValue,
- new ProcessorRecordContext(
- record.timestamp(),
- record.offset(),
- record.partition(),
- record.topic(),
- record.headers()
- )
- ),
- inferPriorValue(key, changelogValue)
+ index.containsKey(key)
+ ? internalPriorValueForBuffered(key)
+ : change.oldValue,
+ change.oldValue,
+ change.newValue,
+ recordContext
)
);
} else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
@@ -321,7 +326,20 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
timeAndValue.get(changelogValue);
final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
- cleanPut(time, key, new BufferValue(contextualRecord, inferPriorValue(key, contextualRecord.value())));
+ final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(contextualRecord.value()));
+
+ cleanPut(
+ time,
+ key,
+ new BufferValue(
+ index.containsKey(key)
+ ? internalPriorValueForBuffered(key)
+ : change.oldValue,
+ change.oldValue,
+ change.newValue,
+ contextualRecord.recordContext()
+ )
+ );
} else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
// in this case, the changelog value is a serialized BufferValue
@@ -346,13 +364,6 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
updateBufferMetrics();
}
- private byte[] inferPriorValue(final Bytes key, final byte[] serializedChange) {
- return index.containsKey(key)
- ? internalPriorValueForBuffered(key)
- : FullChangeSerde.extractOldValuePart(ByteBuffer.wrap(serializedChange));
- }
-
-
@Override
public void evictWhile(final Supplier<Boolean> predicate,
final Consumer<Eviction<K, V>> callback) {
@@ -375,9 +386,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
}
final K key = keySerde.deserializer().deserialize(changelogTopic, next.getKey().key().get());
final BufferValue bufferValue = next.getValue();
- final ContextualRecord record = bufferValue.record();
- final Change<V> value = valueSerde.deserializer().deserialize(changelogTopic, record.value());
- callback.accept(new Eviction<>(key, value, record.recordContext()));
+ final Change<V> value = valueSerde.deserializeParts(
+ changelogTopic,
+ new Change<>(bufferValue.newValue(), bufferValue.oldValue())
+ );
+ callback.accept(new Eviction<>(key, value, bufferValue.context()));
delegate.remove();
index.remove(next.getKey().key());
@@ -442,7 +455,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
requireNonNull(recordContext, "recordContext cannot be null");
final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
- final byte[] serializedValue = valueSerde.serializer().serialize(changelogTopic, value);
+ final Change<byte[]> serialChange = valueSerde.serializeParts(changelogTopic, value);
final BufferValue buffered = getBuffered(serializedKey);
final byte[] serializedPriorValue;
@@ -453,7 +466,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
serializedPriorValue = buffered.priorValue();
}
- cleanPut(time, serializedKey, new BufferValue(new ContextualRecord(serializedValue, recordContext), serializedPriorValue));
+ cleanPut(
+ time,
+ serializedKey,
+ new BufferValue(serializedPriorValue, serialChange.oldValue, serialChange.newValue, recordContext)
+ );
dirtyKeys.add(serializedKey);
updateBufferMetrics();
}
@@ -504,7 +521,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
size += 8; // buffer time
size += key.get().length;
if (value != null) {
- size += value.sizeBytes();
+ size += value.residentMemorySizeEstimate();
}
return size;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index f454862..0f1a1ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -50,7 +50,7 @@ class LRUCacheEntry {
this.isDirty = isDirty;
this.sizeBytes = 1 + // isDirty
- record.sizeBytes();
+ record.residentMemorySizeEstimate();
}
void markClean() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
index ddba05e..97e6c06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
@@ -16,146 +16,74 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.easymock.EasyMock;
import org.junit.Test;
-import static java.util.Collections.emptyMap;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
public class FullChangeSerdeTest {
- private final FullChangeSerde<String> serde = FullChangeSerde.castOrWrap(Serdes.String());
+ private final FullChangeSerde<String> serde = FullChangeSerde.wrap(Serdes.String());
@Test
public void shouldRoundTripNull() {
- final byte[] serialized = serde.serializer().serialize(null, null);
- assertThat(
- serde.deserializer().deserialize(null, serialized),
- nullValue()
- );
+ assertThat(serde.serializeParts(null, null), nullValue());
+ assertThat(FullChangeSerde.composeLegacyFormat(null), nullValue());
+ assertThat(FullChangeSerde.decomposeLegacyFormat(null), nullValue());
+ assertThat(serde.deserializeParts(null, null), nullValue());
}
@Test
public void shouldRoundTripNullChange() {
- final byte[] serialized = serde.serializer().serialize(null, new Change<>(null, null));
assertThat(
- serde.deserializer().deserialize(null, serialized),
- is(new Change<>(null, null))
+ serde.serializeParts(null, new Change<>(null, null)),
+ is(new Change<byte[]>(null, null))
+ );
+
+ assertThat(
+ serde.deserializeParts(null, new Change<>(null, null)),
+ is(new Change<String>(null, null))
+ );
+
+ final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(new Change<>(null, null));
+ assertThat(
+ FullChangeSerde.decomposeLegacyFormat(legacyFormat),
+ is(new Change<byte[]>(null, null))
);
}
@Test
public void shouldRoundTripOldNull() {
- final byte[] serialized = serde.serializer().serialize(null, new Change<>("new", null));
+ final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", null));
+ final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
+ final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
assertThat(
- serde.deserializer().deserialize(null, serialized),
+ serde.deserializeParts(null, decomposedLegacyFormat),
is(new Change<>("new", null))
);
}
@Test
public void shouldRoundTripNewNull() {
- final byte[] serialized = serde.serializer().serialize(null, new Change<>(null, "old"));
+ final Change<byte[]> serialized = serde.serializeParts(null, new Change<>(null, "old"));
+ final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
+ final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
assertThat(
- serde.deserializer().deserialize(null, serialized),
+ serde.deserializeParts(null, decomposedLegacyFormat),
is(new Change<>(null, "old"))
);
}
@Test
public void shouldRoundTripChange() {
- final byte[] serialized = serde.serializer().serialize(null, new Change<>("new", "old"));
+ final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", "old"));
+ final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
+ final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
assertThat(
- serde.deserializer().deserialize(null, serialized),
+ serde.deserializeParts(null, decomposedLegacyFormat),
is(new Change<>("new", "old"))
);
}
-
- @Test
- public void shouldConfigureSerde() {
- final Serde<Void> mock = EasyMock.mock(Serde.class);
- mock.configure(emptyMap(), false);
- EasyMock.expectLastCall();
- EasyMock.replay(mock);
- final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock);
- serde.configure(emptyMap(), false);
- EasyMock.verify(mock);
- }
-
- @Test
- public void shouldCloseSerde() {
- final Serde<Void> mock = EasyMock.mock(Serde.class);
- mock.close();
- EasyMock.expectLastCall();
- EasyMock.replay(mock);
- final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock);
- serde.close();
- EasyMock.verify(mock);
- }
-
- @Test
- public void shouldConfigureSerializer() {
- final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
- final Serializer<Void> mockSerializer = EasyMock.mock(Serializer.class);
- EasyMock.expect(mockSerde.serializer()).andReturn(mockSerializer);
- EasyMock.replay(mockSerde);
- mockSerializer.configure(emptyMap(), false);
- EasyMock.expectLastCall();
- EasyMock.replay(mockSerializer);
- final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer();
- serializer.configure(emptyMap(), false);
- EasyMock.verify(mockSerde);
- EasyMock.verify(mockSerializer);
- }
-
- @Test
- public void shouldCloseSerializer() {
- final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
- final Serializer<Void> mockSerializer = EasyMock.mock(Serializer.class);
- EasyMock.expect(mockSerde.serializer()).andReturn(mockSerializer);
- EasyMock.replay(mockSerde);
- mockSerializer.close();
- EasyMock.expectLastCall();
- EasyMock.replay(mockSerializer);
- final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer();
- serializer.close();
- EasyMock.verify(mockSerde);
- EasyMock.verify(mockSerializer);
- }
-
- @Test
- public void shouldConfigureDeserializer() {
- final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
- final Deserializer<Void> mockDeserializer = EasyMock.mock(Deserializer.class);
- EasyMock.expect(mockSerde.deserializer()).andReturn(mockDeserializer);
- EasyMock.replay(mockSerde);
- mockDeserializer.configure(emptyMap(), false);
- EasyMock.expectLastCall();
- EasyMock.replay(mockDeserializer);
- final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer();
- serializer.configure(emptyMap(), false);
- EasyMock.verify(mockSerde);
- EasyMock.verify(mockDeserializer);
- }
-
- @Test
- public void shouldCloseDeserializer() {
- final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
- final Deserializer<Void> mockDeserializer = EasyMock.mock(Deserializer.class);
- EasyMock.expect(mockSerde.deserializer()).andReturn(mockDeserializer);
- EasyMock.replay(mockSerde);
- mockDeserializer.close();
- EasyMock.expectLastCall();
- EasyMock.replay(mockDeserializer);
- final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer();
- serializer.close();
- EasyMock.verify(mockSerde);
- EasyMock.verify(mockDeserializer);
- }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 96ee735..ef46663 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.internals.Change;
-import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateStore;
@@ -139,7 +138,7 @@ public class KTableSuppressProcessorMetricsTest {
final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(
storeName, Serdes.String(),
- FullChangeSerde.castOrWrap(Serdes.Long())
+ Serdes.Long()
)
.withLoggingDisabled()
.build();
@@ -169,9 +168,9 @@ public class KTableSuppressProcessorMetricsTest {
verifyMetric(metrics, EVICTION_RATE_METRIC, is(0.0));
verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(0.0));
- verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(29.5));
- verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(59.0));
- verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(59.0));
+ verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(21.5));
+ verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(43.0));
+ verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(43.0));
verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(0.5));
verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(1.0));
@@ -185,9 +184,9 @@ public class KTableSuppressProcessorMetricsTest {
verifyMetric(metrics, EVICTION_RATE_METRIC, greaterThan(0.0));
verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(1.0));
- verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(57.0));
- verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(55.0));
- verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(114.0));
+ verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(41.0));
+ verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(39.0));
+ verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(82.0));
verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(1.0));
verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(2.0));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index d8cb858..1d1d6fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
-import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
@@ -75,7 +74,7 @@ public class KTableSuppressProcessorTest {
final String storeName = "test-store";
- final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valueSerde))
+ final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, valueSerde)
.withLoggingDisabled()
.build();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java
index 3aef6d0..a323b9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java
@@ -19,8 +19,10 @@ package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest;
import org.apache.kafka.streams.integration.SuppressionIntegrationTest;
import org.apache.kafka.streams.kstream.SuppressedTest;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest;
import org.apache.kafka.streams.kstream.internals.SuppressScenarioTest;
import org.apache.kafka.streams.kstream.internals.SuppressTopologyTest;
+import org.apache.kafka.streams.state.internals.BufferValueTest;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBufferTest;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferTest;
import org.junit.runner.RunWith;
@@ -30,21 +32,25 @@ import org.junit.runners.Suite;
* This suite runs all the tests related to the Suppression feature.
*
* It can be used from an IDE to selectively just run these tests when developing code related to Suppress.
- *
+ *
* If desired, it can also be added to a Gradle build task, although this isn't strictly necessary, since all
* these tests are already included in the `:streams:test` task.
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
+ BufferValueTest.class,
KTableSuppressProcessorMetricsTest.class,
KTableSuppressProcessorTest.class,
SuppressScenarioTest.class,
SuppressTopologyTest.class,
SuppressedTest.class,
- SuppressionIntegrationTest.class,
- SuppressionDurabilityIntegrationTest.class,
InMemoryTimeOrderedKeyValueBufferTest.class,
- TimeOrderedKeyValueBufferTest.class
+ TimeOrderedKeyValueBufferTest.class,
+ FullChangeSerdeTest.class,
+ SuppressionIntegrationTest.class,
+ SuppressionDurabilityIntegrationTest.class
})
public class SuppressSuite {
}
+
+
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
index 1ea646f..83ab127 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
@@ -37,7 +37,7 @@ public class ProcessorRecordContextTest {
null
);
- assertEquals(MIN_SIZE, context.sizeBytes());
+ assertEquals(MIN_SIZE, context.residentMemorySizeEstimate());
}
@Test
@@ -50,7 +50,7 @@ public class ProcessorRecordContextTest {
new RecordHeaders()
);
- assertEquals(MIN_SIZE, context.sizeBytes());
+ assertEquals(MIN_SIZE, context.residentMemorySizeEstimate());
}
@Test
@@ -63,7 +63,7 @@ public class ProcessorRecordContextTest {
null
);
- assertEquals(MIN_SIZE + 5L, context.sizeBytes());
+ assertEquals(MIN_SIZE + 5L, context.residentMemorySizeEstimate());
}
@Test
@@ -78,7 +78,7 @@ public class ProcessorRecordContextTest {
headers
);
- assertEquals(MIN_SIZE + 10L + 12L, context.sizeBytes());
+ assertEquals(MIN_SIZE + 10L + 12L, context.residentMemorySizeEstimate());
}
@Test
@@ -93,6 +93,6 @@ public class ProcessorRecordContextTest {
headers
);
- assertEquals(MIN_SIZE + 10L, context.sizeBytes());
+ assertEquals(MIN_SIZE + 10L, context.residentMemorySizeEstimate());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
new file mode 100644
index 0000000..d663461
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+public class BufferValueTest {
+ @Test
+ public void shouldDeduplicateNullValues() {
+ final BufferValue bufferValue = new BufferValue(null, null, null, null);
+ assertSame(bufferValue.priorValue(), bufferValue.oldValue());
+ }
+
+ @Test
+ public void shouldDeduplicateIndenticalValues() {
+ final byte[] bytes = {(byte) 0};
+ final BufferValue bufferValue = new BufferValue(bytes, bytes, null, null);
+ assertSame(bufferValue.priorValue(), bufferValue.oldValue());
+ }
+
+ @Test
+ public void shouldDeduplicateEqualValues() {
+ final BufferValue bufferValue = new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 0}, null, null);
+ assertSame(bufferValue.priorValue(), bufferValue.oldValue());
+ }
+
+ @Test
+ public void shouldStoreDifferentValues() {
+ final byte[] priorValue = {(byte) 0};
+ final byte[] oldValue = {(byte) 1};
+ final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
+ assertSame(priorValue, bufferValue.priorValue());
+ assertSame(oldValue, bufferValue.oldValue());
+ }
+
+ @Test
+ public void shouldStoreDifferentValuesWithPriorNull() {
+ final byte[] priorValue = null;
+ final byte[] oldValue = {(byte) 1};
+ final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
+ assertNull(bufferValue.priorValue());
+ assertSame(oldValue, bufferValue.oldValue());
+ }
+
+ @Test
+ public void shouldStoreDifferentValuesWithOldNull() {
+ final byte[] priorValue = {(byte) 0};
+ final byte[] oldValue = null;
+ final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
+ assertSame(priorValue, bufferValue.priorValue());
+ assertNull(bufferValue.oldValue());
+ }
+
+ @Test
+ public void shouldAccountForDeduplicationInSizeEstimate() {
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+ assertEquals(25L, new BufferValue(null, null, null, context).residentMemorySizeEstimate());
+ assertEquals(26L, new BufferValue(new byte[] {(byte) 0}, null, null, context).residentMemorySizeEstimate());
+ assertEquals(26L, new BufferValue(null, new byte[] {(byte) 0}, null, context).residentMemorySizeEstimate());
+ assertEquals(26L, new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 0}, null, context).residentMemorySizeEstimate());
+ assertEquals(27L, new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 1}, null, context).residentMemorySizeEstimate());
+
+ // new value should get counted, but doesn't get deduplicated
+ assertEquals(28L, new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 1}, new byte[] {(byte) 0}, context).residentMemorySizeEstimate());
+ }
+
+ @Test
+ public void shouldSerializeNulls() {
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+ final byte[] serializedContext = context.serialize();
+ final byte[] bytes = new BufferValue(null, null, null, context).serialize(0).array();
+ final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
+
+ assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3).putInt(-1).putInt(-1).putInt(-1).array()));
+ }
+
+ @Test
+ public void shouldSerializePrior() {
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+ final byte[] serializedContext = context.serialize();
+ final byte[] priorValue = {(byte) 5};
+ final byte[] bytes = new BufferValue(priorValue, null, null, context).serialize(0).array();
+ final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
+
+ assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(1).put(priorValue).putInt(-1).putInt(-1).array()));
+ }
+
+ @Test
+ public void shouldSerializeOld() {
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+ final byte[] serializedContext = context.serialize();
+ final byte[] oldValue = {(byte) 5};
+ final byte[] bytes = new BufferValue(null, oldValue, null, context).serialize(0).array();
+ final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
+
+ assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(-1).putInt(1).put(oldValue).putInt(-1).array()));
+ }
+
+ @Test
+ public void shouldSerializeNew() {
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+ final byte[] serializedContext = context.serialize();
+ final byte[] newValue = {(byte) 5};
+ final byte[] bytes = new BufferValue(null, null, newValue, context).serialize(0).array();
+ final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
+
+ assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(-1).putInt(-1).putInt(1).put(newValue).array()));
+ }
+
+ @Test
+ public void shouldCompactDuplicates() {
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+ final byte[] serializedContext = context.serialize();
+ final byte[] duplicate = {(byte) 5};
+ final byte[] bytes = new BufferValue(duplicate, duplicate, null, context).serialize(0).array();
+ final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
+
+ assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(1).put(duplicate).putInt(-2).putInt(-1).array()));
+ }
+
+ @Test
+ public void shouldDeserializePrior() {
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+ final byte[] serializedContext = context.serialize();
+ final byte[] priorValue = {(byte) 5};
+ final ByteBuffer serialValue =
+ ByteBuffer
+ .allocate(serializedContext.length + Integer.BYTES * 3 + priorValue.length)
+ .put(serializedContext).putInt(1).put(priorValue).putInt(-1).putInt(-1);
+ serialValue.position(0);
+
+ final BufferValue deserialize = BufferValue.deserialize(serialValue);
+ assertThat(deserialize, is(new BufferValue(priorValue, null, null, context)));
+ }
+
+ @Test
+ public void shouldDeserializeOld() {
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+ final byte[] serializedContext = context.serialize();
+ final byte[] oldValue = {(byte) 5};
+ final ByteBuffer serialValue =
+ ByteBuffer
+ .allocate(serializedContext.length + Integer.BYTES * 3 + oldValue.length)
+ .put(serializedContext).putInt(-1).putInt(1).put(oldValue).putInt(-1);
+ serialValue.position(0);
+
+ assertThat(BufferValue.deserialize(serialValue), is(new BufferValue(null, oldValue, null, context)));
+ }
+
+ @Test
+ public void shouldDeserializeNew() {
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+ final byte[] serializedContext = context.serialize();
+ final byte[] newValue = {(byte) 5};
+ final ByteBuffer serialValue =
+ ByteBuffer
+ .allocate(serializedContext.length + Integer.BYTES * 3 + newValue.length)
+ .put(serializedContext).putInt(-1).putInt(-1).putInt(1).put(newValue);
+ serialValue.position(0);
+
+ assertThat(BufferValue.deserialize(serialValue), is(new BufferValue(null, null, newValue, context)));
+ }
+
+ @Test
+ public void shouldDeserializeCompactedDuplicates() {
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+ final byte[] serializedContext = context.serialize();
+ final byte[] duplicate = {(byte) 5};
+ final ByteBuffer serialValue =
+ ByteBuffer
+ .allocate(serializedContext.length + Integer.BYTES * 3 + duplicate.length)
+ .put(serializedContext).putInt(1).put(duplicate).putInt(-2).putInt(-1);
+ serialValue.position(0);
+
+ final BufferValue bufferValue = BufferValue.deserialize(serialValue);
+ assertThat(bufferValue, is(new BufferValue(duplicate, duplicate, null, context)));
+ assertSame(bufferValue.priorValue(), bufferValue.oldValue());
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 941832b..5c9cbf9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
@@ -190,11 +189,11 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
putRecord(buffer, context, 0L, 0L, "asdf", "23roni");
- assertThat(buffer.bufferSize(), is(51L));
+ assertThat(buffer.bufferSize(), is(43L));
putRecord(buffer, context, 1L, 0L, "asdf", "3l");
- assertThat(buffer.bufferSize(), is(47L));
+ assertThat(buffer.bufferSize(), is(39L));
putRecord(buffer, context, 0L, 0L, "zxcv", "qfowin");
- assertThat(buffer.bufferSize(), is(98L));
+ assertThat(buffer.bufferSize(), is(82L));
cleanup(context, buffer);
}
@@ -218,12 +217,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4");
assertThat(buffer.numRecords(), is(1));
- assertThat(buffer.bufferSize(), is(50L));
+ assertThat(buffer.bufferSize(), is(42L));
assertThat(buffer.minTimestamp(), is(1L));
putRecord(buffer, context, 0L, 0L, "asdf", "3ng");
assertThat(buffer.numRecords(), is(2));
- assertThat(buffer.bufferSize(), is(98L));
+ assertThat(buffer.bufferSize(), is(82L));
assertThat(buffer.minTimestamp(), is(0L));
final AtomicInteger callbackCount = new AtomicInteger(0);
@@ -232,14 +231,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
case 1: {
assertThat(kv.key(), is("asdf"));
assertThat(buffer.numRecords(), is(2));
- assertThat(buffer.bufferSize(), is(98L));
+ assertThat(buffer.bufferSize(), is(82L));
assertThat(buffer.minTimestamp(), is(0L));
break;
}
case 2: {
assertThat(kv.key(), is("zxcv"));
assertThat(buffer.numRecords(), is(1));
- assertThat(buffer.bufferSize(), is(50L));
+ assertThat(buffer.bufferSize(), is(42L));
assertThat(buffer.minTimestamp(), is(1L));
break;
}
@@ -361,12 +360,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
- final Serializer<Change<String>> serializer = FullChangeSerde.castOrWrap(Serdes.String()).serializer();
+ final FullChangeSerde<String> serializer = FullChangeSerde.wrap(Serdes.String());
- final byte[] todeleteValue = serializer.serialize(null, new Change<>("doomed", null));
- final byte[] asdfValue = serializer.serialize(null, new Change<>("qwer", null));
- final byte[] zxcvValue1 = serializer.serialize(null, new Change<>("eo4im", "previous"));
- final byte[] zxcvValue2 = serializer.serialize(null, new Change<>("next", "eo4im"));
+ final byte[] todeleteValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("doomed", null)));
+ final byte[] asdfValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("qwer", null)));
+ final byte[] zxcvValue1 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("eo4im", "previous")));
+ final byte[] zxcvValue2 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("next", "eo4im")));
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
0,
@@ -412,7 +411,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
assertThat(buffer.numRecords(), is(3));
assertThat(buffer.minTimestamp(), is(0L));
- assertThat(buffer.bufferSize(), is(196L));
+ assertThat(buffer.bufferSize(), is(172L));
stateRestoreCallback.restoreBatch(singletonList(
new ConsumerRecord<>("changelog-topic",
@@ -429,7 +428,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
assertThat(buffer.numRecords(), is(2));
assertThat(buffer.minTimestamp(), is(1L));
- assertThat(buffer.bufferSize(), is(131L));
+ assertThat(buffer.bufferSize(), is(115L));
assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
@@ -477,14 +476,13 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final byte[] todeleteValue = getContextualRecord("doomed", 0).serialize(0).array();
final byte[] asdfValue = getContextualRecord("qwer", 1).serialize(0).array();
- final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String());
+ final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
final byte[] zxcvValue1 = new ContextualRecord(
- fullChangeSerde.serializer().serialize(null, new Change<>("3o4im", "previous")),
+ FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))),
getContext(2L)
).serialize(0).array();
- final FullChangeSerde<String> fullChangeSerde1 = FullChangeSerde.castOrWrap(Serdes.String());
final byte[] zxcvValue2 = new ContextualRecord(
- fullChangeSerde1.serializer().serialize(null, new Change<>("next", "3o4im")),
+ FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))),
getContext(3L)
).serialize(0).array();
stateRestoreCallback.restoreBatch(asList(
@@ -536,7 +534,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
assertThat(buffer.numRecords(), is(3));
assertThat(buffer.minTimestamp(), is(0L));
- assertThat(buffer.bufferSize(), is(166L));
+ assertThat(buffer.bufferSize(), is(142L));
stateRestoreCallback.restoreBatch(singletonList(
new ConsumerRecord<>("changelog-topic",
@@ -553,7 +551,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
assertThat(buffer.numRecords(), is(2));
assertThat(buffer.minTimestamp(), is(1L));
- assertThat(buffer.bufferSize(), is(111L));
+ assertThat(buffer.bufferSize(), is(95L));
assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
@@ -601,23 +599,21 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final byte[] todeleteValue = getBufferValue("doomed", 0).serialize(0).array();
final byte[] asdfValue = getBufferValue("qwer", 1).serialize(0).array();
- final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String());
+ final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
final byte[] zxcvValue1 =
new BufferValue(
- new ContextualRecord(
- fullChangeSerde.serializer().serialize(null, new Change<>("3o4im", "IGNORED")),
- getContext(2L)
- ),
- Serdes.String().serializer().serialize(null, "previous")
+ Serdes.String().serializer().serialize(null, "previous"),
+ Serdes.String().serializer().serialize(null, "IGNORED"),
+ Serdes.String().serializer().serialize(null, "3o4im"),
+ getContext(2L)
).serialize(0).array();
- final FullChangeSerde<String> fullChangeSerde1 = FullChangeSerde.castOrWrap(Serdes.String());
+ final FullChangeSerde<String> fullChangeSerde1 = FullChangeSerde.wrap(Serdes.String());
final byte[] zxcvValue2 =
new BufferValue(
- new ContextualRecord(
- fullChangeSerde1.serializer().serialize(null, new Change<>("next", "3o4im")),
- getContext(3L)
- ),
- Serdes.String().serializer().serialize(null, "previous")
+ Serdes.String().serializer().serialize(null, "previous"),
+ Serdes.String().serializer().serialize(null, "3o4im"),
+ Serdes.String().serializer().serialize(null, "next"),
+ getContext(3L)
).serialize(0).array();
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
@@ -668,7 +664,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
assertThat(buffer.numRecords(), is(3));
assertThat(buffer.minTimestamp(), is(0L));
- assertThat(buffer.bufferSize(), is(166L));
+ assertThat(buffer.bufferSize(), is(142L));
stateRestoreCallback.restoreBatch(singletonList(
new ConsumerRecord<>("changelog-topic",
@@ -685,7 +681,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
assertThat(buffer.numRecords(), is(2));
assertThat(buffer.minTimestamp(), is(1L));
- assertThat(buffer.bufferSize(), is(111L));
+ assertThat(buffer.bufferSize(), is(95L));
assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
@@ -766,14 +762,18 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
}
private static BufferValue getBufferValue(final String value, final long timestamp) {
- final ContextualRecord contextualRecord = getContextualRecord(value, timestamp);
- return new BufferValue(contextualRecord, null);
+ return new BufferValue(
+ null,
+ null,
+ Serdes.String().serializer().serialize(null, value),
+ getContext(timestamp)
+ );
}
private static ContextualRecord getContextualRecord(final String value, final long timestamp) {
- final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String());
+ final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
return new ContextualRecord(
- fullChangeSerde.serializer().serialize(null, new Change<>(value, null)),
+ FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>(value, null))),
getContext(timestamp)
);
}