You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/11 10:20:37 UTC
[kafka] branch 2.0 updated: MINOR: Align KTableAgg and KTableReduce
(#6712)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new f81bad2 MINOR: Align KTableAgg and KTableReduce (#6712)
f81bad2 is described below
commit f81bad2526bbf85e190491bfa073f70099d92d35
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Sat May 11 11:54:58 2019 +0200
MINOR: Align KTableAgg and KTableReduce (#6712)
Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Jeff Kim <ki...@gmail.com>, Guozhang Wang <gu...@confluent.io>
---
.../streams/kstream/internals/KStreamReduce.java | 1 -
.../streams/kstream/internals/KTableAggregate.java | 27 ++++++++++++++--------
.../streams/kstream/internals/KTableReduce.java | 23 ++++++++++--------
3 files changed, 31 insertions(+), 20 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 9f404ea..8d078d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -59,7 +59,6 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
-
store = (KeyValueStore<K, V>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index b60f9ab..b00a09c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -75,22 +75,29 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
throw new StreamsException("Record key for KTable aggregate operator with state " + storeName + " should not be null.");
}
- T oldAgg = store.get(key);
-
- if (oldAgg == null) {
- oldAgg = initializer.apply();
- }
-
- T newAgg = oldAgg;
+ final T oldAgg = store.get(key);
+ final T intermediateAgg;
// first try to remove the old value
- if (value.oldValue != null) {
- newAgg = remove.apply(key, value.oldValue, newAgg);
+ if (value.oldValue != null && oldAgg != null) {
+ intermediateAgg = remove.apply(key, value.oldValue, oldAgg);
+ } else {
+ intermediateAgg = oldAgg;
}
// then try to add the new value
+ final T newAgg;
if (value.newValue != null) {
- newAgg = add.apply(key, value.newValue, newAgg);
+ final T initializedAgg;
+ if (intermediateAgg == null) {
+ initializedAgg = initializer.apply();
+ } else {
+ initializedAgg = intermediateAgg;
+ }
+
+ newAgg = add.apply(key, value.newValue, initializedAgg);
+ } else {
+ newAgg = intermediateAgg;
}
// update the store with the new value
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 069b360..bee8987 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -71,20 +71,25 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
}
final V oldAgg = store.get(key);
- V newAgg = oldAgg;
+ final V intermediateAgg;
- // first try to add the new value
+ // first try to remove the old value
+ if (value.oldValue != null && oldAgg != null) {
+ intermediateAgg = removeReducer.apply(oldAgg, value.oldValue);
+ } else {
+ intermediateAgg = oldAgg;
+ }
+
+ // then try to add the new value
+ final V newAgg;
if (value.newValue != null) {
- if (newAgg == null) {
+ if (intermediateAgg == null) {
newAgg = value.newValue;
} else {
- newAgg = addReducer.apply(newAgg, value.newValue);
+ newAgg = addReducer.apply(intermediateAgg, value.newValue);
}
- }
-
- // then try to remove the old value
- if (value.oldValue != null) {
- newAgg = removeReducer.apply(newAgg, value.oldValue);
+ } else {
+ newAgg = intermediateAgg;
}
// update the store with the new value