You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/24 02:49:10 UTC

[GitHub] [kafka] fqaiser94 opened a new pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

fqaiser94 opened a new pull request #10747:
URL: https://github.com/apache/kafka/pull/10747


   # Problem
   For context, this issue was initially raised in the following [thread](https://lists.apache.org/thread.html/rc3c1d07375e25341ab0467e4f8526a88f9fcc825a766d15922a9ed7d%40%3Cusers.kafka.apache.org%3E ) on the Kafka users mailing list. During `KTable.groupBy`, we write into a repartition topic. Since the grouping key can change, we need to send separate events for the oldValue and the newValue to downstream nodes (where they will be “subtracted” and “added” respectively from/to the aggregate for the old key and the new key respectively). 
   
   However, sending the oldValue and the newValue as separate events is not strictly necessary when the grouping key does not change and doing so poses two challenges for users: 
   1. Firstly, the resulting KTable (i.e. the result of `KTable.groupBy(???).aggregate(???)`) can briefly be in an “inconsistent” state where the oldValue has been “subtracted” from the aggregate for the key but the newValue has not yet been “added” to the aggregate of the key because each event (oldValue, newValue) is processed separately.  
   2. Secondly, if users fail to correctly configure their producers correctly to avoid re-ordering during `send()`, it’s possible the newValue may be sent (and therefore processed by the aggregator) before the oldValue. If the user’s`adder` and `subtractor` functions are non-commutative, this would put the aggregate in a permanently “inconsistent” state. 
   
   Whilst there are ways to get around this issue by dropping down to the Processor API level, it would be nicer if this was handled by Kafka Streams more seamlessly. 
   
   # Proposed solution
   If the grouping key has not changed, the oldValue and newValue events are guaranteed to be processed by the same processor. 
As such, we should be able to send them as a single `Change<T>` event. The subtractor and adder functions can then be executed (in that order) and the KTable can be updated in a single “atomic” operation. In this way, we are able to remove any possibility of ending up in an “inconsistent” state. Also, note that sending the oldValue and newValue in the same event ensures that they can’t be re-ordered relative to each other (irrespective of how a user has configured producer). 
   
   This PR is an implementation of this idea but I have some concerns which I’m not sure how to handle: 
   1. Matthias previously mentioned that this fix depends on a correct implementation of `.equals()` method for the key type. 
Not sure what we can do about this other than perhaps add a doc somewhere to state this assumption for users? 
   2. With the change to the `ChangeSerializer` and `ChangeDeserializer` classes, I don’t think you will be able to just upgrade from a previous version of Kafka Streams easily. Not sure how these types of “breaking” changes are typically handled. Is it simply a matter of noting this in the relevant upgrade doc? Or do we want to write  more code to handle this? 
   
   # Why is the linked ticket KAFKA-12446?
   I’ve chosen KAFKA-12446 as the ticket number because it’s highly related but to be clear, this PR is doing much more than what the ticket is actually proposing. I can create a separate ticket for this but wanted to first see if there is any appetite for these stronger guarantees I'm proposing. 
If not, I’m happy to cut this PR down to just what is being asked for in the ticket (which is basically to just publicly document the existing behaviour). 
Please feel free to let me know if I’m going about this the wrong way. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

Posted by GitBox <gi...@apache.org>.
fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serial
     }
 
     /**
-     * @throws StreamsException if both old and new values of data are null, or if
-     * both values are not null
+     * @throws StreamsException if both old and new values of data are null.
      */
     @Override
     public byte[] serialize(final String topic, final Headers headers, final Change<T> data) {
-        final byte[] serializedKey;
+        final boolean oldValueIsNull = data.oldValue == null;
+        final boolean newValueIsNull = data.newValue == null;
 
-        // only one of the old / new values would be not null
-        if (data.newValue != null) {
-            if (data.oldValue != null) {
-                throw new StreamsException("Both old and new values are not null (" + data.oldValue
-                    + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.newValue);
+        // both old and new values cannot be null
+        if (oldValueIsNull && newValueIsNull) {
+            throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
         } else {
-            if (data.oldValue == null) {
-                throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.oldValue);
+            final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue);
+            final byte[] oldData = oldValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.oldValue);
+
+            final int newDataLength = newData.length;
+            final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + oldData.length;
+
+            return ByteBuffer
+                    .allocate(capacity)
+                    .putInt(newDataLength)
+                    .put(newData)
+                    .put(oldData)
+                    .array();

Review comment:
       With the change to the `ChangedSerializer` and `ChangedDeserializer` classes, I don’t think users will be able to just upgrade from a previous version of Kafka Streams easily. Any "inflight" messages written by older library versions will fail to deserialize correctly after the upgrade. 
   
   Not sure how these types of “breaking” changes are typically handled. 
   
   1. Is it simply a matter of noting this in the relevant upgrade doc? 
   2. Or do we want to write more code to handle upgrade scenarios? 
   3. Or find a more backwards compatible way of writing this serde?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

Posted by GitBox <gi...@apache.org>.
fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657657



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
##########
@@ -90,14 +90,19 @@ public void process(final K key, final Change<V> change) {
 
             // if the selected repartition key or value is null, skip
             // forward oldPair first, to be consistent with reduce and aggregate
-            if (oldPair != null && oldPair.key != null && oldPair.value != null) {
-                context().forward(oldPair.key, new Change<>(null, oldPair.value));
+            final boolean oldPairNotNull = oldPair != null && oldPair.key != null && oldPair.value != null;
+            final boolean newPairNotNull = newPair != null && newPair.key != null && newPair.value != null;
+            if (oldPairNotNull && newPairNotNull && oldPair.key == newPair.key) {

Review comment:
       As noted by Matthias on the mailing list thread, this fix depends on a correct implementation of `.equals()` method for the key type. Would we need to document this assumption somewhere for users?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

Posted by GitBox <gi...@apache.org>.
fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serial
     }
 
     /**
-     * @throws StreamsException if both old and new values of data are null, or if
-     * both values are not null
+     * @throws StreamsException if both old and new values of data are null.
      */
     @Override
     public byte[] serialize(final String topic, final Headers headers, final Change<T> data) {
-        final byte[] serializedKey;
+        final boolean oldValueIsNull = data.oldValue == null;
+        final boolean newValueIsNull = data.newValue == null;
 
-        // only one of the old / new values would be not null
-        if (data.newValue != null) {
-            if (data.oldValue != null) {
-                throw new StreamsException("Both old and new values are not null (" + data.oldValue
-                    + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.newValue);
+        // both old and new values cannot be null
+        if (oldValueIsNull && newValueIsNull) {
+            throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
         } else {
-            if (data.oldValue == null) {
-                throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.oldValue);
+            final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue);
+            final byte[] oldData = oldValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.oldValue);
+
+            final int newDataLength = newData.length;
+            final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + oldData.length;
+
+            return ByteBuffer
+                    .allocate(capacity)
+                    .putInt(newDataLength)
+                    .put(newData)
+                    .put(oldData)
+                    .array();

Review comment:
       With the change to the `ChangedSerializer` and `ChangedDeserializer` classes, I don’t think users will be able to just upgrade from a previous version of Kafka Streams easily. Any "inflight" messages written by older library versions will fail to deserialize correctly after the upgrade. 
   
   Not sure how these types of “breaking” changes are typically handled. 
   
   1. Is it simply a matter of noting this in the relevant upgrade doc i.e. users need to do an application-reset? 
   2. Or do we want to write more code to handle upgrade scenarios? 
   3. Or find a more backwards compatible way of writing this serde?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

Posted by GitBox <gi...@apache.org>.
fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serial
     }
 
     /**
-     * @throws StreamsException if both old and new values of data are null, or if
-     * both values are not null
+     * @throws StreamsException if both old and new values of data are null.
      */
     @Override
     public byte[] serialize(final String topic, final Headers headers, final Change<T> data) {
-        final byte[] serializedKey;
+        final boolean oldValueIsNull = data.oldValue == null;
+        final boolean newValueIsNull = data.newValue == null;
 
-        // only one of the old / new values would be not null
-        if (data.newValue != null) {
-            if (data.oldValue != null) {
-                throw new StreamsException("Both old and new values are not null (" + data.oldValue
-                    + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.newValue);
+        // both old and new values cannot be null
+        if (oldValueIsNull && newValueIsNull) {
+            throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
         } else {
-            if (data.oldValue == null) {
-                throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.oldValue);
+            final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue);
+            final byte[] oldData = oldValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.oldValue);
+
+            final int newDataLength = newData.length;
+            final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + oldData.length;
+
+            return ByteBuffer
+                    .allocate(capacity)
+                    .putInt(newDataLength)
+                    .put(newData)
+                    .put(oldData)
+                    .array();

Review comment:
       With the change to the `ChangedSerializer` and `ChangedDeserializer` classes, I don’t think users will be able to just upgrade from a previous version of Kafka Streams easily. Any "inflight" messages from older library versions will fail to deserialize correctly. 
   
   Not sure how these types of “breaking” changes are typically handled. 
   
   1. Is it simply a matter of noting this in the relevant upgrade doc? 
   2. Or do we want to write more code to handle upgrade scenarios? 
   3. Or find a more backwards compatible way of writing this serde?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

Posted by GitBox <gi...@apache.org>.
fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serial
     }
 
     /**
-     * @throws StreamsException if both old and new values of data are null, or if
-     * both values are not null
+     * @throws StreamsException if both old and new values of data are null.
      */
     @Override
     public byte[] serialize(final String topic, final Headers headers, final Change<T> data) {
-        final byte[] serializedKey;
+        final boolean oldValueIsNull = data.oldValue == null;
+        final boolean newValueIsNull = data.newValue == null;
 
-        // only one of the old / new values would be not null
-        if (data.newValue != null) {
-            if (data.oldValue != null) {
-                throw new StreamsException("Both old and new values are not null (" + data.oldValue
-                    + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.newValue);
+        // both old and new values cannot be null
+        if (oldValueIsNull && newValueIsNull) {
+            throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
         } else {
-            if (data.oldValue == null) {
-                throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.oldValue);
+            final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue);
+            final byte[] oldData = oldValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.oldValue);
+
+            final int newDataLength = newData.length;
+            final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + oldData.length;
+
+            return ByteBuffer
+                    .allocate(capacity)
+                    .putInt(newDataLength)
+                    .put(newData)
+                    .put(oldData)
+                    .array();

Review comment:
       With the change to the `ChangedSerializer` and `ChangedDeserializer` classes, I don’t think users will be able to just upgrade from a previous version of Kafka Streams easily. Not sure how these types of “breaking” changes are typically handled. Is it simply a matter of noting this in the relevant upgrade doc? Or do we want to write more code to handle upgrade scenarios?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
##########
@@ -90,14 +90,19 @@ public void process(final K key, final Change<V> change) {
 
             // if the selected repartition key or value is null, skip
             // forward oldPair first, to be consistent with reduce and aggregate
-            if (oldPair != null && oldPair.key != null && oldPair.value != null) {
-                context().forward(oldPair.key, new Change<>(null, oldPair.value));
+            final boolean oldPairNotNull = oldPair != null && oldPair.key != null && oldPair.value != null;
+            final boolean newPairNotNull = newPair != null && newPair.key != null && newPair.value != null;
+            if (oldPairNotNull && newPairNotNull && oldPair.key == newPair.key) {

Review comment:
       This fix depends on a correct implementation of `.equals()` method for the key type. 
   
Not sure if we would want to a doc somewhere to state this assumption for users?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

Posted by GitBox <gi...@apache.org>.
fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serial
     }
 
     /**
-     * @throws StreamsException if both old and new values of data are null, or if
-     * both values are not null
+     * @throws StreamsException if both old and new values of data are null.
      */
     @Override
     public byte[] serialize(final String topic, final Headers headers, final Change<T> data) {
-        final byte[] serializedKey;
+        final boolean oldValueIsNull = data.oldValue == null;
+        final boolean newValueIsNull = data.newValue == null;
 
-        // only one of the old / new values would be not null
-        if (data.newValue != null) {
-            if (data.oldValue != null) {
-                throw new StreamsException("Both old and new values are not null (" + data.oldValue
-                    + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.newValue);
+        // both old and new values cannot be null
+        if (oldValueIsNull && newValueIsNull) {
+            throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
         } else {
-            if (data.oldValue == null) {
-                throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.oldValue);
+            final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue);
+            final byte[] oldData = oldValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.oldValue);
+
+            final int newDataLength = newData.length;
+            final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + oldData.length;
+
+            return ByteBuffer
+                    .allocate(capacity)
+                    .putInt(newDataLength)
+                    .put(newData)
+                    .put(oldData)
+                    .array();

Review comment:
       With the change to the `ChangedSerializer` and `ChangedDeserializer` classes, I don’t think users will be able to just upgrade from a previous version of Kafka Streams easily. Any "inflight" messages from older library versions will fail to deserialize correctly. Not sure how these types of “breaking” changes are typically handled. Is it simply a matter of noting this in the relevant upgrade doc? Or do we want to write more code to handle upgrade scenarios? Or find a more backwards compatible way of writing this serde?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

Posted by GitBox <gi...@apache.org>.
fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serial
     }
 
     /**
-     * @throws StreamsException if both old and new values of data are null, or if
-     * both values are not null
+     * @throws StreamsException if both old and new values of data are null.
      */
     @Override
     public byte[] serialize(final String topic, final Headers headers, final Change<T> data) {
-        final byte[] serializedKey;
+        final boolean oldValueIsNull = data.oldValue == null;
+        final boolean newValueIsNull = data.newValue == null;
 
-        // only one of the old / new values would be not null
-        if (data.newValue != null) {
-            if (data.oldValue != null) {
-                throw new StreamsException("Both old and new values are not null (" + data.oldValue
-                    + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.newValue);
+        // both old and new values cannot be null
+        if (oldValueIsNull && newValueIsNull) {
+            throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
         } else {
-            if (data.oldValue == null) {
-                throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.oldValue);
+            final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue);
+            final byte[] oldData = oldValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.oldValue);
+
+            final int newDataLength = newData.length;
+            final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + oldData.length;
+
+            return ByteBuffer
+                    .allocate(capacity)
+                    .putInt(newDataLength)
+                    .put(newData)
+                    .put(oldData)
+                    .array();

Review comment:
       With the change to the `ChangedSerializer` and `ChangedDeserializer` classes, I don’t think users will be able to just upgrade from a previous version of Kafka Streams easily. Any messages that were "inflight" prior to the upgrade will fail to deserialize correctly. Not sure how these types of “breaking” changes are typically handled. Is it simply a matter of noting this in the relevant upgrade doc? Or do we want to write more code to handle upgrade scenarios?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org