You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/06/19 16:45:58 UTC
[kafka] branch trunk updated: KAFKA-8452: Compressed BufferValue
review follow-up (#6940)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a7e771c KAFKA-8452: Compressed BufferValue review follow-up (#6940)
a7e771c is described below
commit a7e771c6da72bb7f3c5c5cbab3dc9c4fd403f866
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Jun 19 11:45:35 2019 -0500
KAFKA-8452: Compressed BufferValue review follow-up (#6940)
Belatedly address a few code review comments from #6848
Reviewers: Bill Bejeck <bb...@gmail.com>
---
.../kafka/streams/kstream/internals/Change.java | 2 +-
.../streams/kstream/internals/FullChangeSerde.java | 4 ++--
.../kafka/streams/state/internals/BufferValue.java | 5 ++++-
.../internals/InMemoryTimeOrderedKeyValueBuffer.java | 4 ++--
.../kstream/internals/FullChangeSerdeTest.java | 20 ++++++++++----------
.../streams/state/internals/BufferValueTest.java | 4 ++++
.../internals/TimeOrderedKeyValueBufferTest.java | 14 +++++++-------
7 files changed, 30 insertions(+), 23 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
index f28a16d..c9a18de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
@@ -30,7 +30,7 @@ public class Change<T> {
@Override
public String toString() {
- return "(" + String.valueOf(newValue) + "<-" + String.valueOf(oldValue) + ")";
+ return "(" + newValue + "<-" + oldValue + ")";
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index f28f9e7..5d7c7e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -72,7 +72,7 @@ public final class FullChangeSerde<T> {
* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
* so that we can produce the legacy format to test that we can still deserialize it.
*/
- public static byte[] composeLegacyFormat(final Change<byte[]> serialChange) {
+ public static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {
if (serialChange == null) {
return null;
}
@@ -99,7 +99,7 @@ public final class FullChangeSerde<T> {
* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still
* need to be able to read it (so that we can load the state store from previously-written changelog records).
*/
- public static Change<byte[]> decomposeLegacyFormat(final byte[] data) {
+ public static Change<byte[]> decomposeLegacyFormattedArrayIntoChangeArrays(final byte[] data) {
if (data == null) {
return null;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
index f1990c7..b52ec24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
@@ -30,7 +30,10 @@ public final class BufferValue {
private final byte[] newValue;
private final ProcessorRecordContext recordContext;
- BufferValue(final byte[] priorValue, final byte[] oldValue, final byte[] newValue, final ProcessorRecordContext recordContext) {
+ BufferValue(final byte[] priorValue,
+ final byte[] oldValue,
+ final byte[] newValue,
+ final ProcessorRecordContext recordContext) {
this.oldValue = oldValue;
this.newValue = newValue;
this.recordContext = recordContext;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 6c6ef36..cecfa4d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -296,7 +296,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
final byte[] changelogValue = new byte[record.value().length - 8];
timeAndValue.get(changelogValue);
- final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(changelogValue));
+ final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(changelogValue));
final ProcessorRecordContext recordContext = new ProcessorRecordContext(
record.timestamp(),
@@ -326,7 +326,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
timeAndValue.get(changelogValue);
final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
- final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(contextualRecord.value()));
+ final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
cleanPut(
time,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
index 97e6c06..ac6762f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
@@ -29,8 +29,8 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripNull() {
assertThat(serde.serializeParts(null, null), nullValue());
- assertThat(FullChangeSerde.composeLegacyFormat(null), nullValue());
- assertThat(FullChangeSerde.decomposeLegacyFormat(null), nullValue());
+ assertThat(FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(null), nullValue());
+ assertThat(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(null), nullValue());
assertThat(serde.deserializeParts(null, null), nullValue());
}
@@ -47,9 +47,9 @@ public class FullChangeSerdeTest {
is(new Change<String>(null, null))
);
- final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(new Change<>(null, null));
+ final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(new Change<>(null, null));
assertThat(
- FullChangeSerde.decomposeLegacyFormat(legacyFormat),
+ FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat),
is(new Change<byte[]>(null, null))
);
}
@@ -57,8 +57,8 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripOldNull() {
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", null));
- final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
- final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
+ final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
+ final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
serde.deserializeParts(null, decomposedLegacyFormat),
is(new Change<>("new", null))
@@ -68,8 +68,8 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripNewNull() {
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>(null, "old"));
- final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
- final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
+ final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
+ final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
serde.deserializeParts(null, decomposedLegacyFormat),
is(new Change<>(null, "old"))
@@ -79,8 +79,8 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripChange() {
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", "old"));
- final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
- final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
+ final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
+ final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
serde.deserializeParts(null, decomposedLegacyFormat),
is(new Change<>("new", "old"))
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
index d663461..ad9b5f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
@@ -55,6 +56,7 @@ public class BufferValueTest {
final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
assertSame(priorValue, bufferValue.priorValue());
assertSame(oldValue, bufferValue.oldValue());
+ assertNotEquals(bufferValue.priorValue(), bufferValue.oldValue());
}
@Test
@@ -64,6 +66,7 @@ public class BufferValueTest {
final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
assertNull(bufferValue.priorValue());
assertSame(oldValue, bufferValue.oldValue());
+ assertNotEquals(bufferValue.priorValue(), bufferValue.oldValue());
}
@Test
@@ -73,6 +76,7 @@ public class BufferValueTest {
final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
assertSame(priorValue, bufferValue.priorValue());
assertNull(bufferValue.oldValue());
+ assertNotEquals(bufferValue.priorValue(), bufferValue.oldValue());
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 5c9cbf9..57816c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -362,10 +362,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final FullChangeSerde<String> serializer = FullChangeSerde.wrap(Serdes.String());
- final byte[] todeleteValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("doomed", null)));
- final byte[] asdfValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("qwer", null)));
- final byte[] zxcvValue1 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("eo4im", "previous")));
- final byte[] zxcvValue2 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("next", "eo4im")));
+ final byte[] todeleteValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("doomed", null)));
+ final byte[] asdfValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("qwer", null)));
+ final byte[] zxcvValue1 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("eo4im", "previous")));
+ final byte[] zxcvValue2 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("next", "eo4im")));
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
0,
@@ -478,11 +478,11 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final byte[] asdfValue = getContextualRecord("qwer", 1).serialize(0).array();
final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
final byte[] zxcvValue1 = new ContextualRecord(
- FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))),
+ FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))),
getContext(2L)
).serialize(0).array();
final byte[] zxcvValue2 = new ContextualRecord(
- FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))),
+ FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))),
getContext(3L)
).serialize(0).array();
stateRestoreCallback.restoreBatch(asList(
@@ -773,7 +773,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
private static ContextualRecord getContextualRecord(final String value, final long timestamp) {
final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
return new ContextualRecord(
- FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>(value, null))),
+ FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>(value, null))),
getContext(timestamp)
);
}