You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/15 19:14:46 UTC

[GitHub] [kafka] fqaiser94 commented on a diff in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

fqaiser94 commented on code in PR #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r1070669075


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java:
##########
@@ -44,18 +42,30 @@ public void setIfUnset(final SerdeGetter getter) {
         }
     }
 
-    @Override
-    public Change<T> deserialize(final String topic, final Headers headers, final byte[] data) {
-
-        final byte[] bytes = new byte[data.length - NEWFLAG_SIZE];
-
-        System.arraycopy(data, 0, bytes, 0, bytes.length);
-
-        if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
-            return new Change<>(inner.deserialize(topic, headers, bytes), null);
+    private byte[] getData(final ByteBuffer buffer) {
+        final boolean dataIsNull = buffer.get() == (byte) 1;
+        final byte[] data;
+        if (dataIsNull) {
+            data = null;
         } else {
-            return new Change<>(null, inner.deserialize(topic, headers, bytes));
+            final int dataLength = buffer.getInt();
+            data = new byte[dataLength];
+            buffer.get(data);
         }
+        return data;
+    }
+
+    @Override
+    public Change<T> deserialize(final String topic, final Headers headers, final byte[] data) {
+        // The format we need to deserialize is:

Review Comment:
   > For backward compatibility, we cannot just change the format.
   
   Agreed, this was actually my first [concern](https://github.com/apache/kafka/pull/10747#discussion_r637657442) when I raised the initial PR XD
   
   > I guess, we can extend the format to add oldNewFlag = 2 
   
   Sounds good, implemented that in my latest changes. 
   
   > Thus, I think we need do a two round rolling bounce upgrade... In the first round, we still use only the old formats, and only update the version. In the second round, we change a config to enable the new format. I think we can re-use the existing upgrade_from config. 
   
   Will need to think about this a little more but I _think_ this makes sense to me. 
   Would the code changes required to enable this look similar to what's already been done in `SubscriptionWrapperSerde`? e.g.  https://github.com/apache/kafka/blob/8d2e157b37902055bd5a8f4bd0f6ac29080910eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java#L68



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org