You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/04 23:21:18 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

mjsax commented on code in PR #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r1061951302


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java:
##########
@@ -24,8 +24,6 @@
 
 public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, Void, T> {
 
-    private static final int NEWFLAG_SIZE = 1;

Review Comment:
   Ah. Re-reading the code I remember... We only encode the new or the old value in the bytes, and add an additional flag at the end, to indicate if it is the new or old value... Guess we should keep this variable?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java:
##########
@@ -44,18 +42,30 @@ public void setIfUnset(final SerdeGetter getter) {
         }
     }
 
-    @Override
-    public Change<T> deserialize(final String topic, final Headers headers, final byte[] data) {
-
-        final byte[] bytes = new byte[data.length - NEWFLAG_SIZE];
-
-        System.arraycopy(data, 0, bytes, 0, bytes.length);
-
-        if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
-            return new Change<>(inner.deserialize(topic, headers, bytes), null);
+    private byte[] getData(final ByteBuffer buffer) {
+        final boolean dataIsNull = buffer.get() == (byte) 1;
+        final byte[] data;
+        if (dataIsNull) {
+            data = null;
         } else {
-            return new Change<>(null, inner.deserialize(topic, headers, bytes));
+            final int dataLength = buffer.getInt();
+            data = new byte[dataLength];
+            buffer.get(data);
         }
+        return data;
+    }
+
+    @Override
+    public Change<T> deserialize(final String topic, final Headers headers, final byte[] data) {
+        // The format we need to deserialize is:

Review Comment:
   For backward compatibility, we cannot just change the format. We need to keep the old format as `{BYTE_ARRAY oldOrNewValue}{BYTE oldNewFlag}` with `oldNewFlag == 0` if the encoded bytes represent the old value (and the the new value is `null`) and `oldNewFlag == 1` if the encoded bytes represent the new value (and thus the old value is `null`).
   
   I guess, we can extend the format to add `oldNewFlag = 2` implying that we have serialized both old and new value in the byte array using format: `{INT newDataLenght}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{INT newOldFlag=2}` ?
   
   (Note, we don't need to store old value length, as we can compute it as `data.length - newDataLength -1`.
   
   Ie, we need to handle tree cases now... The two old/existing ones, plus the new one. This actually also raises one more question about backward compatibility -- if we update a single KS instance, and it starts writing the new format, there might still be an old instance that does not understand the new format... Thus, I think we need do a two round rolling bounce upgrade... In the first round, we still use only the old formats, and only update the version. In the second round, we change a config to enable the new format. I think we can re-use the existing `upgrade_from` config. However, for this kind of change, we would actually need a KIP... Are you familiar with the KIP process?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org