You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/06/19 16:45:58 UTC

[kafka] branch trunk updated: KAFKA-8452: Compressed BufferValue review follow-up (#6940)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a7e771c  KAFKA-8452: Compressed BufferValue review follow-up (#6940)
a7e771c is described below

commit a7e771c6da72bb7f3c5c5cbab3dc9c4fd403f866
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Jun 19 11:45:35 2019 -0500

    KAFKA-8452: Compressed BufferValue review follow-up (#6940)
    
    Belatedly address a few code review comments from #6848
    
    Reviewers: Bill Bejeck <bb...@gmail.com>
---
 .../kafka/streams/kstream/internals/Change.java      |  2 +-
 .../streams/kstream/internals/FullChangeSerde.java   |  4 ++--
 .../kafka/streams/state/internals/BufferValue.java   |  5 ++++-
 .../internals/InMemoryTimeOrderedKeyValueBuffer.java |  4 ++--
 .../kstream/internals/FullChangeSerdeTest.java       | 20 ++++++++++----------
 .../streams/state/internals/BufferValueTest.java     |  4 ++++
 .../internals/TimeOrderedKeyValueBufferTest.java     | 14 +++++++-------
 7 files changed, 30 insertions(+), 23 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
index f28a16d..c9a18de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
@@ -30,7 +30,7 @@ public class Change<T> {
 
     @Override
     public String toString() {
-        return "(" + String.valueOf(newValue) + "<-" + String.valueOf(oldValue) + ")";
+        return "(" + newValue + "<-" + oldValue + ")";
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index f28f9e7..5d7c7e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -72,7 +72,7 @@ public final class FullChangeSerde<T> {
      * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
      * so that we can produce the legacy format to test that we can still deserialize it.
      */
-    public static byte[] composeLegacyFormat(final Change<byte[]> serialChange) {
+    public static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {
         if (serialChange == null) {
             return null;
         }
@@ -99,7 +99,7 @@ public final class FullChangeSerde<T> {
      * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still
      * need to be able to read it (so that we can load the state store from previously-written changelog records).
      */
-    public static Change<byte[]> decomposeLegacyFormat(final byte[] data) {
+    public static Change<byte[]> decomposeLegacyFormattedArrayIntoChangeArrays(final byte[] data) {
         if (data == null) {
             return null;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
index f1990c7..b52ec24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
@@ -30,7 +30,10 @@ public final class BufferValue {
     private final byte[] newValue;
     private final ProcessorRecordContext recordContext;
 
-    BufferValue(final byte[] priorValue, final byte[] oldValue, final byte[] newValue, final ProcessorRecordContext recordContext) {
+    BufferValue(final byte[] priorValue,
+                final byte[] oldValue,
+                final byte[] newValue,
+                final ProcessorRecordContext recordContext) {
         this.oldValue = oldValue;
         this.newValue = newValue;
         this.recordContext = recordContext;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 6c6ef36..cecfa4d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -296,7 +296,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
                     final byte[] changelogValue = new byte[record.value().length - 8];
                     timeAndValue.get(changelogValue);
 
-                    final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(changelogValue));
+                    final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(changelogValue));
 
                     final ProcessorRecordContext recordContext = new ProcessorRecordContext(
                         record.timestamp(),
@@ -326,7 +326,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
                     timeAndValue.get(changelogValue);
 
                     final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
-                    final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(contextualRecord.value()));
+                    final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
 
                     cleanPut(
                         time,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
index 97e6c06..ac6762f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
@@ -29,8 +29,8 @@ public class FullChangeSerdeTest {
     @Test
     public void shouldRoundTripNull() {
         assertThat(serde.serializeParts(null, null), nullValue());
-        assertThat(FullChangeSerde.composeLegacyFormat(null), nullValue());
-        assertThat(FullChangeSerde.decomposeLegacyFormat(null), nullValue());
+        assertThat(FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(null), nullValue());
+        assertThat(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(null), nullValue());
         assertThat(serde.deserializeParts(null, null), nullValue());
     }
 
@@ -47,9 +47,9 @@ public class FullChangeSerdeTest {
             is(new Change<String>(null, null))
         );
 
-        final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(new Change<>(null, null));
+        final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(new Change<>(null, null));
         assertThat(
-            FullChangeSerde.decomposeLegacyFormat(legacyFormat),
+            FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat),
             is(new Change<byte[]>(null, null))
         );
     }
@@ -57,8 +57,8 @@ public class FullChangeSerdeTest {
     @Test
     public void shouldRoundTripOldNull() {
         final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", null));
-        final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
-        final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
+        final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
+        final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
         assertThat(
             serde.deserializeParts(null, decomposedLegacyFormat),
             is(new Change<>("new", null))
@@ -68,8 +68,8 @@ public class FullChangeSerdeTest {
     @Test
     public void shouldRoundTripNewNull() {
         final Change<byte[]> serialized = serde.serializeParts(null, new Change<>(null, "old"));
-        final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
-        final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
+        final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
+        final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
         assertThat(
             serde.deserializeParts(null, decomposedLegacyFormat),
             is(new Change<>(null, "old"))
@@ -79,8 +79,8 @@ public class FullChangeSerdeTest {
     @Test
     public void shouldRoundTripChange() {
         final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", "old"));
-        final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
-        final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
+        final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
+        final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
         assertThat(
             serde.deserializeParts(null, decomposedLegacyFormat),
             is(new Change<>("new", "old"))
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
index d663461..ad9b5f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 
@@ -55,6 +56,7 @@ public class BufferValueTest {
         final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
         assertSame(priorValue, bufferValue.priorValue());
         assertSame(oldValue, bufferValue.oldValue());
+        assertNotEquals(bufferValue.priorValue(), bufferValue.oldValue());
     }
 
     @Test
@@ -64,6 +66,7 @@ public class BufferValueTest {
         final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
         assertNull(bufferValue.priorValue());
         assertSame(oldValue, bufferValue.oldValue());
+        assertNotEquals(bufferValue.priorValue(), bufferValue.oldValue());
     }
 
     @Test
@@ -73,6 +76,7 @@ public class BufferValueTest {
         final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
         assertSame(priorValue, bufferValue.priorValue());
         assertNull(bufferValue.oldValue());
+        assertNotEquals(bufferValue.priorValue(), bufferValue.oldValue());
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 5c9cbf9..57816c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -362,10 +362,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         final FullChangeSerde<String> serializer = FullChangeSerde.wrap(Serdes.String());
 
-        final byte[] todeleteValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("doomed", null)));
-        final byte[] asdfValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("qwer", null)));
-        final byte[] zxcvValue1 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("eo4im", "previous")));
-        final byte[] zxcvValue2 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("next", "eo4im")));
+        final byte[] todeleteValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("doomed", null)));
+        final byte[] asdfValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("qwer", null)));
+        final byte[] zxcvValue1 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("eo4im", "previous")));
+        final byte[] zxcvValue2 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("next", "eo4im")));
         stateRestoreCallback.restoreBatch(asList(
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -478,11 +478,11 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         final byte[] asdfValue = getContextualRecord("qwer", 1).serialize(0).array();
         final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
         final byte[] zxcvValue1 = new ContextualRecord(
-            FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))),
+            FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))),
             getContext(2L)
         ).serialize(0).array();
         final byte[] zxcvValue2 = new ContextualRecord(
-            FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))),
+            FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))),
             getContext(3L)
         ).serialize(0).array();
         stateRestoreCallback.restoreBatch(asList(
@@ -773,7 +773,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     private static ContextualRecord getContextualRecord(final String value, final long timestamp) {
         final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
         return new ContextualRecord(
-            FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>(value, null))),
+            FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>(value, null))),
             getContext(timestamp)
         );
     }