You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/29 00:05:54 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

mjsax commented on code in PR #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r1058656046


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serial
     }
 
     /**
-     * @throws StreamsException if both old and new values of data are null, or if
-     * both values are not null
+     * @throws StreamsException if both old and new values of data are null.
      */
     @Override
     public byte[] serialize(final String topic, final Headers headers, final Change<T> data) {
-        final byte[] serializedKey;
+        final boolean oldValueIsNull = data.oldValue == null;
+        final boolean newValueIsNull = data.newValue == null;
 
-        // only one of the old / new values would be not null
-        if (data.newValue != null) {
-            if (data.oldValue != null) {
-                throw new StreamsException("Both old and new values are not null (" + data.oldValue
-                    + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.newValue);
+        // both old and new values cannot be null
+        if (oldValueIsNull && newValueIsNull) {
+            throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
         } else {
-            if (data.oldValue == null) {
-                throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.oldValue);
+            final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue);
+            final byte[] oldData = oldValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.oldValue);

Review Comment:
   ```suggestion
               final byte[] oldData = oldValueIsNull ? null : inner.serialize(topic, headers, data.oldValue);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serial
     }
 
     /**
-     * @throws StreamsException if both old and new values of data are null, or if
-     * both values are not null
+     * @throws StreamsException if both old and new values of data are null.
      */
     @Override
     public byte[] serialize(final String topic, final Headers headers, final Change<T> data) {
-        final byte[] serializedKey;
+        final boolean oldValueIsNull = data.oldValue == null;
+        final boolean newValueIsNull = data.newValue == null;
 
-        // only one of the old / new values would be not null
-        if (data.newValue != null) {
-            if (data.oldValue != null) {
-                throw new StreamsException("Both old and new values are not null (" + data.oldValue
-                    + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.newValue);
+        // both old and new values cannot be null
+        if (oldValueIsNull && newValueIsNull) {
+            throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
         } else {
-            if (data.oldValue == null) {
-                throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.oldValue);
+            final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue);

Review Comment:
   ```suggestion
               final byte[] newData = newValueIsNull ? null : inner.serialize(topic, headers, data.newValue);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java:
##########
@@ -90,14 +90,19 @@ public void process(final K key, final Change<V> change) {
 
             // if the selected repartition key or value is null, skip
             // forward oldPair first, to be consistent with reduce and aggregate
-            if (oldPair != null && oldPair.key != null && oldPair.value != null) {
-                context().forward(oldPair.key, new Change<>(null, oldPair.value));
+            final boolean oldPairNotNull = oldPair != null && oldPair.key != null && oldPair.value != null;
+            final boolean newPairNotNull = newPair != null && newPair.key != null && newPair.value != null;
+            if (oldPairNotNull && newPairNotNull && oldPair.key == newPair.key) {

Review Comment:
   `oldPair.key == newPair.key` is not using `equals()` but it does an object reference comparison. We need to use `oldPair.key.equals(newPiar.key)` (and test that `oldPair.key != null` to avoid a NPE.



##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -1050,8 +1050,8 @@ <h4 class="anchor-heading"><a id="streams_concepts_globalktable" class="anchor-l
                                         more than once for a key as a result of having received input tombstone records for that key (see below).</li>
                                     <li>When the first non-<code class="docutils literal"><span class="pre">null</span></code> value is received for a key (e.g.,  INSERT), then only the adder is called.</li>
                                     <li>When subsequent non-<code class="docutils literal"><span class="pre">null</span></code> values are received for a key (e.g.,  UPDATE), then (1) the subtractor is
-                                        called with the old value as stored in the table and (2) the adder is called with the new value of the
-                                        input record that was just received.  The order of execution for the subtractor and adder is not defined.</li>
+                                        first called with the old value as stored in the table and then (2) the adder is called with the new value of the
+                                        input record that was just received.</li>

Review Comment:
   Well, for the regular case in which the key changes, the order is still not defined...
   
   We would need to be more precise and say:
   > If and only if the extracted grouping key of the old and new value is the same, the subtractor will be called before the adder. The detection of this case depends on the correct implementation of the <code>equals()</code> method of the extracted key type.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java:
##########
@@ -45,16 +46,17 @@ public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final De
 
     @Override
     public Change<T> deserialize(final String topic, final Headers headers, final byte[] data) {
+        final ByteBuffer buffer = Serdes.ByteBuffer().deserializer().deserialize(topic, data);
+        final int newDataLength = buffer.getInt();
+        final int oldDataLength = data.length - newDataLength - NEW_DATA_LENGTH_BYTES_SIZE;
+        final byte[] newData = new byte[newDataLength];
+        final byte[] oldData = new byte[oldDataLength];
+        buffer.get(newData);
+        buffer.get(oldData);
 
-        final byte[] bytes = new byte[data.length - NEWFLAG_SIZE];
-
-        System.arraycopy(data, 0, bytes, 0, bytes.length);
-
-        if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
-            return new Change<>(inner.deserialize(topic, headers, bytes), null);
-        } else {
-            return new Change<>(null, inner.deserialize(topic, headers, bytes));
-        }
+        return new Change<>(
+                newDataLength > 0 ? inner.deserialize(topic, headers, newData) : null,

Review Comment:
   And empty BytesBuffer and a `null` is not the same, and we should distinguish between both. Cf my other comment in the serializer.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java:
##########
@@ -18,13 +18,14 @@
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
 
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
 public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, Void, T> {
 
-    private static final int NEWFLAG_SIZE = 1;
+    private static final int NEW_DATA_LENGTH_BYTES_SIZE = Integer.BYTES;

Review Comment:
   Why do we rename this? (I am just wondering what this is actually...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org