You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/24 03:00:03 UTC

[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serial
     }
 
     /**
-     * @throws StreamsException if both old and new values of data are null, or if
-     * both values are not null
+     * @throws StreamsException if both old and new values of data are null.
      */
     @Override
     public byte[] serialize(final String topic, final Headers headers, final Change<T> data) {
-        final byte[] serializedKey;
+        final boolean oldValueIsNull = data.oldValue == null;
+        final boolean newValueIsNull = data.newValue == null;
 
-        // only one of the old / new values would be not null
-        if (data.newValue != null) {
-            if (data.oldValue != null) {
-                throw new StreamsException("Both old and new values are not null (" + data.oldValue
-                    + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.newValue);
+        // both old and new values cannot be null
+        if (oldValueIsNull && newValueIsNull) {
+            throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
         } else {
-            if (data.oldValue == null) {
-                throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.oldValue);
+            final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue);
+            final byte[] oldData = oldValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.oldValue);
+
+            final int newDataLength = newData.length;
+            final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + oldData.length;
+
+            return ByteBuffer
+                    .allocate(capacity)
+                    .putInt(newDataLength)
+                    .put(newData)
+                    .put(oldData)
+                    .array();

Review comment:
       With the change to the `ChangedSerializer` and `ChangedDeserializer` classes, I don’t think users will be able to just upgrade from a previous version of Kafka Streams easily. Not sure how these types of “breaking” changes are typically handled. Is it simply a matter of noting this in the relevant upgrade doc? Or do we want to write more code to handle upgrade scenarios?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
##########
@@ -90,14 +90,19 @@ public void process(final K key, final Change<V> change) {
 
             // if the selected repartition key or value is null, skip
             // forward oldPair first, to be consistent with reduce and aggregate
-            if (oldPair != null && oldPair.key != null && oldPair.value != null) {
-                context().forward(oldPair.key, new Change<>(null, oldPair.value));
+            final boolean oldPairNotNull = oldPair != null && oldPair.key != null && oldPair.value != null;
+            final boolean newPairNotNull = newPair != null && newPair.key != null && newPair.value != null;
+            if (oldPairNotNull && newPairNotNull && oldPair.key == newPair.key) {

Review comment:
       This fix depends on a correct implementation of `.equals()` method for the key type. 
   
Not sure if we would want to a doc somewhere to state this assumption for users?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org