You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "fqaiser94 (via GitHub)" <gi...@apache.org> on 2023/04/15 22:57:22 UTC

[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13533: KAFKA-12446: update change encoding to use varint

fqaiser94 commented on code in PR #13533:
URL: https://github.com/apache/kafka/pull/13533#discussion_r1167560778


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java:
##########
@@ -52,10 +52,10 @@ public Change<T> deserialize(final String topic, final Headers headers, final by
         // The format we need to deserialize is:
         // {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
         // {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
-        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}
+        // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}
         // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
         // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
-        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
+        // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5}

Review Comment:
   nit: the last byte is now less of a `newOldFlag` and more like a `version`? 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -113,9 +113,9 @@ public byte[] serialize(final String topic, final Headers headers, final Change<
                 throw new StreamsException("Both old and new values are not null (" + data.oldValue
                         + " : " + data.newValue + ") in ChangeSerializer, which is not allowed unless upgrading.");
             } else {
-                final int capacity = UINT32_SIZE + newDataLength + oldDataLength + NEW_OLD_FLAG_SIZE;
+                final int capacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + NEW_OLD_FLAG_SIZE;
                 buf = ByteBuffer.allocate(capacity);
-                ByteUtils.writeUnsignedInt(buf, newDataLength);
+                ByteUtils.writeVarint(newDataLength, buf);

Review Comment:
   nit: I think it would be simpler to just use `buf.putInt(newDataLength)` which always writes out 4 bytes. This is the approach taken in other similar places in the codebase such as [`CombinedKeySchema.toBytes`](https://github.com/apache/kafka/blob/9c12e462106343fbc6af5873074d48f98687af39/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java#L74), [`ProcessRecordContext.serialize`](https://github.com/apache/kafka/blob/9c12e462106343fbc6af5873074d48f98687af39/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java#L104), and [`ProcessorMetadata.serialize`](https://github.com/apache/kafka/blob/9c12e462106343fbc6af5873074d48f98687af39/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorMetadata.java#L78). 
   
   While it's nice that `ByteUtils.writeVarint` can potentially write out fewer bytes, the variability in the number of bytes used to encode an INT makes the code more complex as you now have to allocate the max-possible-size array and then later copy to a smaller array. IMO the increase in code complexity is not worth the memory optimization. 
   
   Either way, functionally speaking both approaches work so I will approve the PR regardless. 
   



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -130,7 +130,11 @@ public byte[] serialize(final String topic, final Headers headers, final Change<
             throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
         }
 
-        return buf.array();
+        final byte[] serialized = new byte[buf.position()];
+        buf.position(0);
+        buf.get(serialized);
+
+        return serialized;

Review Comment:
   then you don't need this change or any comment to explain why we need to resize to a smaller array. 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -130,7 +130,11 @@ public byte[] serialize(final String topic, final Headers headers, final Change<
             throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
         }
 
-        return buf.array();
+        final byte[] serialized = new byte[buf.position()];
+        buf.position(0);
+        buf.get(serialized);

Review Comment:
   nit: this resize-to-smaller-array-operation is only necessary in the case where we use `writeVarint`. I would add a comment here to explain that. 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -113,9 +113,9 @@ public byte[] serialize(final String topic, final Headers headers, final Change<
                 throw new StreamsException("Both old and new values are not null (" + data.oldValue
                         + " : " + data.newValue + ") in ChangeSerializer, which is not allowed unless upgrading.");
             } else {
-                final int capacity = UINT32_SIZE + newDataLength + oldDataLength + NEW_OLD_FLAG_SIZE;
+                final int capacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + NEW_OLD_FLAG_SIZE;
                 buf = ByteBuffer.allocate(capacity);
-                ByteUtils.writeUnsignedInt(buf, newDataLength);
+                ByteUtils.writeVarint(newDataLength, buf);
                 buf.put(newData).put(oldData).put((byte) 2);

Review Comment:
   or better yet, I would rewrite this maybe like this? 
   ```
   final int maxCapacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + NEW_OLD_FLAG_SIZE;
   final ByteBuffer maxCapacityBuffer = ByteBuffer.allocate(maxCapacity);
   ByteUtils.writeVarint(newDataLength, maxCapacityBuffer);
   maxCapacityBuffer.put(newData).put(oldData).put((byte) 2);
   
   final int actualCapacity = maxCapacityBuffer.position();
   buf = ByteBuffer.allocate(actualCapacity).put(maxCapacityBuffer.array(), 0, actualCapacity);
   ```
   
   (note: we obviously don't need any of this if we use the `buf.putInt` method). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org