You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/24 22:30:05 UTC

[kafka] branch trunk updated: KAFKA-12446: update change encoding to use varint (#13533)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2557a4b842b KAFKA-12446: update change encoding to use varint (#13533)
2557a4b842b is described below

commit 2557a4b842b07ac796193bd9a3ef6b724dc995cf
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Mon Apr 24 15:29:57 2023 -0700

    KAFKA-12446: update change encoding to use varint (#13533)
    
    KIP-904 had the goal in mind to save space when encoding the size on a byte array. However, using UINT32 does not achieve this goal. This PR changes the encoding to VARINT instead.
    
    Reviewers: Victoria Xia <vi...@confluent.io>,  Farooq Qaiser <fq...@gmail.com>, Walker Carlson <wc...@confluent.io>
---
 .../kstream/internals/ChangedDeserializer.java     | 122 +++++++++++----------
 .../kstream/internals/ChangedSerializer.java       |  37 ++++---
 .../kstream/internals/ChangedSerdeTest.java        |  24 ++--
 3 files changed, 103 insertions(+), 80 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index b5b2c6aa3c4..3ae46843289 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -26,7 +26,7 @@ import java.nio.ByteBuffer;
 
 public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, Void, T> {
 
-    private static final int NEW_OLD_FLAG_SIZE = 1;
+    private static final int ENCODING_FLAG_SIZE = 1;
     private static final int IS_LATEST_FLAG_SIZE = 1;
 
     private Deserializer<T> inner;
@@ -50,64 +50,76 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>>, Wrapping
     @Override
     public Change<T> deserialize(final String topic, final Headers headers, final byte[] data) {
         // The format we need to deserialize is:
-        // {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
-        // {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
-        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}
-        // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
-        // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
-        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
+        // {BYTE_ARRAY oldValue}{BYTE encodingFlag=0}
+        // {BYTE_ARRAY newValue}{BYTE encodingFlag=1}
+        // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE encodingFlag=2}
+        // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=3}
+        // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE encodingFlag=4}
+        // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=5}
         final ByteBuffer buffer = ByteBuffer.wrap(data);
-        final byte newOldFlag = buffer.get(data.length - NEW_OLD_FLAG_SIZE);
+        final byte encodingFlag = buffer.get(data.length - ENCODING_FLAG_SIZE);
 
         final byte[] newData;
         final byte[] oldData;
         final boolean isLatest;
-        if (newOldFlag == (byte) 0) {
-            newData = null;
-            final int oldDataLength = data.length - NEW_OLD_FLAG_SIZE;
-            oldData = new byte[oldDataLength];
-            buffer.get(oldData);
-            isLatest = true;
-        } else if (newOldFlag == (byte) 1) {
-            oldData = null;
-            final int newDataLength = data.length - NEW_OLD_FLAG_SIZE;
-            newData = new byte[newDataLength];
-            buffer.get(newData);
-            isLatest = true;
-        } else if (newOldFlag == (byte) 2) {
-            final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer));
-            newData = new byte[newDataLength];
-
-            final int oldDataLength = data.length - Integer.BYTES - newDataLength - NEW_OLD_FLAG_SIZE;
-            oldData = new byte[oldDataLength];
-
-            buffer.get(newData);
-            buffer.get(oldData);
-            isLatest = true;
-        } else if (newOldFlag == (byte) 3) {
-            newData = null;
-            final int oldDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
-            oldData = new byte[oldDataLength];
-            buffer.get(oldData);
-            isLatest = readIsLatestFlag(buffer);
-        } else if (newOldFlag == (byte) 4) {
-            oldData = null;
-            final int newDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
-            newData = new byte[newDataLength];
-            buffer.get(newData);
-            isLatest = readIsLatestFlag(buffer);
-        } else if (newOldFlag == (byte) 5) {
-            final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer));
-            newData = new byte[newDataLength];
-
-            final int oldDataLength = data.length - Integer.BYTES - newDataLength - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
-            oldData = new byte[oldDataLength];
-
-            buffer.get(newData);
-            buffer.get(oldData);
-            isLatest = readIsLatestFlag(buffer);
-        } else {
-            throw new StreamsException("Encountered unknown byte value `" + newOldFlag + "` for oldNewFlag in ChangedDeserializer.");
+        switch (encodingFlag) {
+            case (byte) 0: {
+                newData = null;
+                final int oldDataLength = data.length - ENCODING_FLAG_SIZE;
+                oldData = new byte[oldDataLength];
+                buffer.get(oldData);
+                isLatest = true;
+                break;
+            }
+            case (byte) 1: {
+                oldData = null;
+                final int newDataLength = data.length - ENCODING_FLAG_SIZE;
+                newData = new byte[newDataLength];
+                buffer.get(newData);
+                isLatest = true;
+                break;
+            }
+            case (byte) 2: {
+                final int newDataLength = ByteUtils.readVarint(buffer);
+                newData = new byte[newDataLength];
+                buffer.get(newData);
+
+                final int oldDataLength = buffer.capacity() - buffer.position() - ENCODING_FLAG_SIZE;
+                oldData = new byte[oldDataLength];
+                buffer.get(oldData);
+                isLatest = true;
+                break;
+            }
+            case (byte) 3: {
+                newData = null;
+                final int oldDataLength = data.length - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE;
+                oldData = new byte[oldDataLength];
+                buffer.get(oldData);
+                isLatest = readIsLatestFlag(buffer);
+                break;
+            }
+            case (byte) 4: {
+                oldData = null;
+                final int newDataLength = data.length - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE;
+                newData = new byte[newDataLength];
+                buffer.get(newData);
+                isLatest = readIsLatestFlag(buffer);
+                break;
+            }
+            case (byte) 5: {
+                final int newDataLength = ByteUtils.readVarint(buffer);
+                newData = new byte[newDataLength];
+                buffer.get(newData);
+
+                final int oldDataLength = buffer.capacity() - buffer.position() - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE;
+                oldData = new byte[oldDataLength];
+                buffer.get(oldData);
+
+                isLatest = readIsLatestFlag(buffer);
+                break;
+            }
+            default:
+                throw new StreamsException("Encountered unknown byte value `" + encodingFlag + "` for encodingFlag in ChangedDeserializer.");
         }
 
         return new Change<>(
@@ -117,7 +129,7 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>>, Wrapping
     }
 
     private boolean readIsLatestFlag(final ByteBuffer buffer) {
-        final byte isLatestFlag = buffer.get(buffer.capacity() - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE);
+        final byte isLatestFlag = buffer.get(buffer.capacity() - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE);
         if (isLatestFlag == (byte) 1) {
             return true;
         } else if (isLatestFlag == (byte) 0) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 15ba0bfcbd6..8a11575392f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -28,8 +28,8 @@ import java.util.Map;
 
 public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNullableSerializer<Change<T>, Void, T> {
 
-    private static final int NEW_OLD_FLAG_SIZE = 1;
-    private static final int UINT32_SIZE = 4;
+    private static final int ENCODING_FLAG_SIZE = 1;
+    private static final int MAX_VARINT_LENGTH = 5;
     private Serializer<T> inner;
     private boolean isUpgrade;
 
@@ -104,33 +104,40 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNull
         final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;
 
         // The serialization format is:
-        // {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
-        // {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
-        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}
-        final ByteBuffer buf;
+        // {BYTE_ARRAY oldValue}{BYTE encodingFlag=0}
+        // {BYTE_ARRAY newValue}{BYTE encodingFlag=1}
+        // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE encodingFlag=2}
         if (newValueIsNotNull && oldValueIsNotNull) {
             if (isUpgrade) {
                 throw new StreamsException("Both old and new values are not null (" + data.oldValue
                         + " : " + data.newValue + ") in ChangeSerializer, which is not allowed unless upgrading.");
             } else {
-                final int capacity = UINT32_SIZE + newDataLength + oldDataLength + NEW_OLD_FLAG_SIZE;
-                buf = ByteBuffer.allocate(capacity);
-                ByteUtils.writeUnsignedInt(buf, newDataLength);
+                final int capacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + ENCODING_FLAG_SIZE;
+                final ByteBuffer buf = ByteBuffer.allocate(capacity);
+                ByteUtils.writeVarint(newDataLength, buf);
                 buf.put(newData).put(oldData).put((byte) 2);
+
+                final byte[] serialized = new byte[buf.position()];
+                buf.position(0);
+                buf.get(serialized);
+
+                return serialized;
             }
         } else if (newValueIsNotNull) {
-            final int capacity = newDataLength + NEW_OLD_FLAG_SIZE;
-            buf = ByteBuffer.allocate(capacity);
+            final int capacity = newDataLength + ENCODING_FLAG_SIZE;
+            final ByteBuffer buf = ByteBuffer.allocate(capacity);
             buf.put(newData).put((byte) 1);
+
+            return buf.array();
         } else if (oldValueIsNotNull) {
-            final int capacity = oldDataLength + NEW_OLD_FLAG_SIZE;
-            buf = ByteBuffer.allocate(capacity);
+            final int capacity = oldDataLength + ENCODING_FLAG_SIZE;
+            final ByteBuffer buf = ByteBuffer.allocate(capacity);
             buf.put(oldData).put((byte) 0);
+
+            return buf.array();
         } else {
             throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
         }
-
-        return buf.array();
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
index 475aa53ef0b..e0c66abe5ef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
@@ -43,9 +43,9 @@ public class ChangedSerdeTest {
     private static final ChangedDeserializer<String> CHANGED_STRING_DESERIALIZER =
             new ChangedDeserializer<>(Serdes.String().deserializer());
 
-    private static final int NEW_OLD_FLAG_SIZE = 1;
+    private static final int ENCODING_FLAG_SIZE = 1;
     private static final int IS_LATEST_FLAG_SIZE = 1;
-    private static final int UINT32_SIZE = 4;
+    private static final int MAX_VARINT_LENGTH = 5;
 
     final String nonNullNewValue = "hello";
     final String nonNullOldValue = "world";
@@ -141,29 +141,33 @@ public class ChangedSerdeTest {
         final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;
 
         // The serialization format is:
-        // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
-        // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
-        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
+        // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=3}
+        // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE encodingFlag=4}
+        // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=5}
         final ByteBuffer buf;
         final byte isLatest = data.isLatest ? (byte) 1 : (byte) 0;
         if (newValueIsNotNull && oldValueIsNotNull) {
-            final int capacity = UINT32_SIZE + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
+            final int capacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + ENCODING_FLAG_SIZE;
             buf = ByteBuffer.allocate(capacity);
-            ByteUtils.writeUnsignedInt(buf, newDataLength);
+            ByteUtils.writeVarint(newDataLength, buf);
             buf.put(newData).put(oldData).put(isLatest).put((byte) 5);
         } else if (newValueIsNotNull) {
-            final int capacity = newDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
+            final int capacity = newDataLength + IS_LATEST_FLAG_SIZE + ENCODING_FLAG_SIZE;
             buf = ByteBuffer.allocate(capacity);
             buf.put(newData).put(isLatest).put((byte) 4);
         } else if (oldValueIsNotNull) {
-            final int capacity = oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
+            final int capacity = oldDataLength + IS_LATEST_FLAG_SIZE + ENCODING_FLAG_SIZE;
             buf = ByteBuffer.allocate(capacity);
             buf.put(oldData).put(isLatest).put((byte) 3);
         } else {
             throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
         }
 
-        return buf.array();
+        final byte[] serialized = new byte[buf.position()];
+        buf.position(0);
+        buf.get(serialized);
+
+        return serialized;
     }
 
     private static void checkRoundTripForReservedVersion(final Change<String> data) {