You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "wcarlson5 (via GitHub)" <gi...@apache.org> on 2023/04/18 16:55:39 UTC

[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13533: KAFKA-12446: update change encoding to use varint

wcarlson5 commented on code in PR #13533:
URL: https://github.com/apache/kafka/pull/13533#discussion_r1170320445


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -104,33 +104,40 @@ public byte[] serialize(final String topic, final Headers headers, final Change<
         final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;
 
         // The serialization format is:
-        // {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
-        // {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
-        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}
-        final ByteBuffer buf;
+        // {BYTE_ARRAY oldValue}{BYTE encodingFlag=0}
+        // {BYTE_ARRAY newValue}{BYTE encodingFlag=1}
+        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE encodingFlag=2}

Review Comment:
   UINT32 or VARINT?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java:
##########
@@ -141,15 +141,15 @@ private static byte[] serializeVersions3Through5(final String topic, final Chang
         final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;
 
         // The serialization format is:
-        // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
-        // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
-        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
+        // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=3}
+        // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE encodingFlag=4}
+        // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=5}
         final ByteBuffer buf;
         final byte isLatest = data.isLatest ? (byte) 1 : (byte) 0;
         if (newValueIsNotNull && oldValueIsNotNull) {
-            final int capacity = UINT32_SIZE + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
+            final int capacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;

Review Comment:
   You changed the `NEW_OLD_FLAG_SIZE` elsewhere to `ENCODING_FLAG_SIZE` can we change that here too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org