You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/04/13 22:33:30 UTC

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13564: KAFKA-14834: [8/N] Propagate `isLatest` as part of `Change`

vcrfxia commented on code in PR #13564:
URL: https://github.com/apache/kafka/pull/13564#discussion_r1166087199


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java:
##########
@@ -42,12 +48,13 @@ public boolean equals(final Object o) {
             return false;
         }
         final Change<?> change = (Change<?>) o;
-        return Objects.equals(newValue, change.newValue) &&
-                Objects.equals(oldValue, change.oldValue);
+        return Objects.equals(newValue, change.newValue)
+            && Objects.equals(oldValue, change.oldValue)
+            && isLatest == change.isLatest;

Review Comment:
   I'm on the fence about updating the equals method here. I'd like for it to be updated in this way but because serialization does not depend on this new boolean, it could be the case that round-trip serialization causes equals() to no longer evaluate to true, which seems confusing. 
   
   Maybe it doesn't matter if the equals method isn't called anywhere (I couldn't find any usages). Curious to hear reviewer opinions.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java:
##########
@@ -116,10 +118,13 @@ public void process(final Record<KIn, Change<VIn>> record) {
             }
 
             // update the store with the new value
-            store.put(record.key(), newAgg, newTimestamp);
-            tupleForwarder.maybeForward(
-                record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null))
-                    .withTimestamp(newTimestamp));
+            final long putReturnCode = store.put(record.key(), newAgg, newTimestamp);
+            // if not put to store, do not forward downstream either
+            if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) {

Review Comment:
   Some processors, including this one, guarantee that their timestamps are in non-decreasing order (per key), which means `putReturnCode` will always equal `PUT_RETURN_CODE_IS_LATEST`, so having these extra comparisons here is technically unnecessary, but it seems nice to have them anyway for consistency with other processors (and in case this guarantee ever changes, though it's unlikely). Happy to remove the redundancy if that's preferable, though.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java:
##########
@@ -22,10 +22,16 @@
 
     public final T newValue;
     public final T oldValue;
+    public final boolean isLatest;
 
     public Change(final T newValue, final T oldValue) {
+        this(newValue, oldValue, true);

Review Comment:
   This two-arg constructor is currently still being called from places such as the windowed aggregate processors and caching state stores, since versioned tables do not come into play here and therefore all Changes are considered the latest. If we think it'd be better to encode true in these places and remove the two-arg constructor, I can do this cleanup in a follow-up PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org