You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/11 10:06:33 UTC

[kafka] branch 2.2 updated: MINOR: Align KTableAgg and KTableReduce (#6712)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 4ed5efe  MINOR: Align KTableAgg and KTableReduce (#6712)
4ed5efe is described below

commit 4ed5efe7ba69f6dbed7286eb19b4551fbe3fdda2
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Sat May 11 11:54:58 2019 +0200

    MINOR: Align KTableAgg and KTableReduce (#6712)
    
    Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Jeff Kim <ki...@gmail.com>, Guozhang Wang <gu...@confluent.io>
---
 .../streams/kstream/internals/KStreamReduce.java   |  1 -
 .../streams/kstream/internals/KTableAggregate.java | 27 ++++++++++++++--------
 .../streams/kstream/internals/KTableReduce.java    | 23 ++++++++++--------
 3 files changed, 31 insertions(+), 20 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 09e4fab..cd67283 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -59,7 +59,6 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
-
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index b04a729..8e3f22f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -75,22 +75,29 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
                 throw new StreamsException("Record key for KTable aggregate operator with state " + storeName + " should not be null.");
             }
 
-            T oldAgg = store.get(key);
-
-            if (oldAgg == null) {
-                oldAgg = initializer.apply();
-            }
-
-            T newAgg = oldAgg;
+            final T oldAgg = store.get(key);
+            final T intermediateAgg;
 
             // first try to remove the old value
-            if (value.oldValue != null) {
-                newAgg = remove.apply(key, value.oldValue, newAgg);
+            if (value.oldValue != null && oldAgg != null) {
+                intermediateAgg = remove.apply(key, value.oldValue, oldAgg);
+            } else {
+                intermediateAgg = oldAgg;
             }
 
             // then try to add the new value
+            final T newAgg;
             if (value.newValue != null) {
-                newAgg = add.apply(key, value.newValue, newAgg);
+                final T initializedAgg;
+                if (intermediateAgg == null) {
+                    initializedAgg = initializer.apply();
+                } else {
+                    initializedAgg = intermediateAgg;
+                }
+
+                newAgg = add.apply(key, value.newValue, initializedAgg);
+            } else {
+                newAgg = intermediateAgg;
             }
 
             // update the store with the new value
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 38c5a11..70db6443 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -71,20 +71,25 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
             }
 
             final V oldAgg = store.get(key);
-            V newAgg = oldAgg;
+            final V intermediateAgg;
 
-            // first try to add the new value
+            // first try to remove the old value
+            if (value.oldValue != null && oldAgg != null) {
+                intermediateAgg = removeReducer.apply(oldAgg, value.oldValue);
+            } else {
+                intermediateAgg = oldAgg;
+            }
+
+            // then try to add the new value
+            final V newAgg;
             if (value.newValue != null) {
-                if (newAgg == null) {
+                if (intermediateAgg == null) {
                     newAgg = value.newValue;
                 } else {
-                    newAgg = addReducer.apply(newAgg, value.newValue);
+                    newAgg = addReducer.apply(intermediateAgg, value.newValue);
                 }
-            }
-
-            // then try to remove the old value
-            if (value.oldValue != null) {
-                newAgg = removeReducer.apply(newAgg, value.oldValue);
+            } else {
+                newAgg = intermediateAgg;
             }
 
             // update the store with the new value