You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/02 21:01:59 UTC
kafka git commit: KAFKA-3121: Refactor KStream Aggregate to be
Lambda-able.
Repository: kafka
Updated Branches:
refs/heads/trunk 8189f9d58 -> 95174337c
KAFKA-3121: Refactor KStream Aggregate to be Lambda-able.
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Matsuda <ya...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #839 from guozhangwang/K3121s2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/95174337
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/95174337
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/95174337
Branch: refs/heads/trunk
Commit: 95174337c2f6cda90e213e5c3a73fc89854f42a7
Parents: 8189f9d
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Feb 2 12:01:47 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Feb 2 12:01:47 2016 -0800
----------------------------------------------------------------------
.../examples/pageview/PageViewTypedJob.java | 3 +-
.../examples/pageview/PageViewUnTypedJob.java | 4 +--
.../examples/wordcount/WordCountJob.java | 4 +--
.../kafka/streams/kstream/Aggregator.java | 16 +--------
.../org/apache/kafka/streams/kstream/Count.java | 36 --------------------
.../kafka/streams/kstream/Initializer.java | 23 +++++++++++++
.../apache/kafka/streams/kstream/KStream.java | 24 +++++++++----
.../apache/kafka/streams/kstream/KTable.java | 26 ++++++++++++--
.../apache/kafka/streams/kstream/SumAsLong.java | 36 --------------------
.../kstream/internals/KStreamAggregate.java | 13 ++++---
.../streams/kstream/internals/KStreamImpl.java | 27 +++++++++++++--
.../kstream/internals/KTableAggregate.java | 17 +++++----
.../streams/kstream/internals/KTableImpl.java | 36 ++++++++++++++++++--
.../kstream/internals/KStreamAggregateTest.java | 23 ++++++-------
.../kstream/internals/KTableAggregateTest.java | 23 ++++++++-----
15 files changed, 175 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
index 7f11512..358cbe8 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.Count;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
@@ -99,7 +98,7 @@ public class PageViewTypedJob {
return viewByRegion;
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
- .aggregateByKey(new Count<String, PageViewByRegion>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+ .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
stringSerializer, longSerializer,
stringDeserializer, longDeserializer)
.toStream()
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
index 013332e..2fdfa97 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
@@ -31,7 +31,7 @@ import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Count;
+import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
@@ -78,7 +78,7 @@ public class PageViewUnTypedJob {
.put("region", region);
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
- .aggregateByKey(new Count<String, JsonNode>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+ .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
stringSerializer, longSerializer,
stringDeserializer, longDeserializer)
.toStream()
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
index c66e965..b922695 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
@@ -31,7 +31,7 @@ import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Count;
+import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -79,7 +79,7 @@ public class WordCountJob {
return new KeyValue<String, String>(value, value);
}
})
- .aggregateByKey(new Count<>(), UnlimitedWindows.of("Counts").startOn(0L),
+ .countByKey(UnlimitedWindows.of("Counts").startOn(0L),
stringSerializer, longSerializer,
stringDeserializer, longDeserializer)
.toStream()
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index c601024..e3eb18f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -18,20 +18,6 @@
package org.apache.kafka.streams.kstream;
public interface Aggregator<K, V, T> {
- /**
- * Set the initial aggregate value
- */
- T initialValue(K aggKey);
- /**
- * When a new record with the aggregate key is added,
- * updating the aggregate value for this key
- */
- T add(K aggKey, V value, T aggregate);
-
- /**
- * when an old record with the aggregate key is removed,
- * updating the aggregate value for this key
- */
- T remove(K aggKey, V value, T aggregate);
+ T apply(K aggKey, V value, T aggregate);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
deleted file mode 100644
index 8780cc7..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public class Count<K, V> implements Aggregator<K, V, Long> {
-
- @Override
- public Long initialValue(K aggKey) {
- return 0L;
- }
-
- @Override
- public Long add(K aggKey, V value, Long aggregate) {
- return aggregate + 1L;
- }
-
- @Override
- public Long remove(K aggKey, V value, Long aggregate) {
- return aggregate - 1L;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
new file mode 100644
index 0000000..0aeddc9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface Initializer<T> {
+
+ T apply();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 26f04f0..f6fa48d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -274,24 +274,36 @@ public interface KStream<K, V> {
* @param windows the specification of the aggregation window
*/
<W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
- Windows<W> windows,
- Serializer<K> keySerializer,
- Serializer<V> aggValueSerializer,
- Deserializer<K> keyDeserializer,
- Deserializer<V> aggValueDeserializer);
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Serializer<V> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> aggValueDeserializer);
/**
* Aggregate values of this stream by key on a window basis.
*
+ * @param initializer the class of Initializer
* @param aggregator the class of Aggregator
* @param windows the specification of the aggregation window
* @param <T> the value type of the aggregated table
*/
- <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator,
+ <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
+ Aggregator<K, V, T> aggregator,
Windows<W> windows,
Serializer<K> keySerializer,
Serializer<T> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<T> aggValueDeserializer);
+ /**
+ * Count number of messages of this stream by key on a window basis.
+ *
+ * @param windows the specification of the aggregation window
+ */
+ <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
+ Serializer<K> keySerializer,
+ Serializer<Long> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<Long> aggValueDeserializer);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index feb28ab..5cd9d9c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -162,14 +162,18 @@ public interface KTable<K, V> {
/**
* Aggregate values of this table by the selected key.
*
- * @param aggregator the class of Aggregator
+ * @param initializer the class of Initializer
+ * @param add the class of Aggregator
+ * @param remove the class of Aggregator
* @param selector the KeyValue mapper that select the aggregate key
* @param name the name of the resulted table
* @param <K1> the key type of the aggregated table
* @param <V1> the value type of the aggregated table
* @return the instance of KTable
*/
- <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator,
+ <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
+ Aggregator<K1, V1, T> add,
+ Aggregator<K1, V1, T> remove,
KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
Serializer<K1> keySerializer,
Serializer<V1> valueSerializer,
@@ -178,4 +182,22 @@ public interface KTable<K, V> {
Deserializer<V1> valueDeserializer,
Deserializer<T> aggValueDeserializer,
String name);
+
+ /**
+ * Count number of records of this table by the selected key.
+ *
+ * @param selector the KeyValue mapper that select the aggregate key
+ * @param name the name of the resulted table
+ * @param <K1> the key type of the aggregated table
+ * @param <V1> the value type of the aggregated table
+ * @return the instance of KTable
+ */
+ <K1, V1> KTable<K1, Long> count(KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+ Serializer<K1> keySerializer,
+ Serializer<V1> valueSerializer,
+ Serializer<Long> aggValueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V1> valueDeserializer,
+ Deserializer<Long> aggValueDeserializer,
+ String name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
deleted file mode 100644
index 1f8df04..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public class SumAsLong<K> implements Aggregator<K, Long, Long> {
-
- @Override
- public Long initialValue(K aggKey) {
- return 0L;
- }
-
- @Override
- public Long add(K aggKey, Long value, Long aggregate) {
- return aggregate + value;
- }
-
- @Override
- public Long remove(K aggKey, Long value, Long aggregate) {
- return aggregate - value;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 49f3e71..b64277c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
@@ -35,13 +36,15 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
private final String storeName;
private final Windows<W> windows;
+ private final Initializer<T> initializer;
private final Aggregator<K, V, T> aggregator;
private boolean sendOldValues = false;
- public KStreamAggregate(Windows<W> windows, String storeName, Aggregator<K, V, T> aggregator) {
+ public KStreamAggregate(Windows<W> windows, String storeName, Initializer<T> initializer, Aggregator<K, V, T> aggregator) {
this.windows = windows;
this.storeName = storeName;
+ this.initializer = initializer;
this.aggregator = aggregator;
}
@@ -97,10 +100,10 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
T oldAgg = entry.value;
if (oldAgg == null)
- oldAgg = aggregator.initialValue(key);
+ oldAgg = initializer.apply();
// try to add the new new value (there will never be old value)
- T newAgg = aggregator.add(key, value, oldAgg);
+ T newAgg = aggregator.apply(key, value, oldAgg);
// update the store with the new value
windowStore.put(key, newAgg, window.start());
@@ -119,8 +122,8 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
// create the new window for the rest of unmatched window that do not exist yet
for (long windowStartMs : matchedWindows.keySet()) {
- T oldAgg = aggregator.initialValue(key);
- T newAgg = aggregator.add(key, value, oldAgg);
+ T oldAgg = initializer.apply();
+ T newAgg = aggregator.apply(key, value, oldAgg);
windowStore.put(key, newAgg, windowStartMs);
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 73f7266..79a3115 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
@@ -421,7 +422,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
@Override
- public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator,
+ public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
+ Aggregator<K, V, T> aggregator,
Windows<W> windows,
Serializer<K> keySerializer,
Serializer<T> aggValueSerializer,
@@ -434,7 +436,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
String selectName = topology.newName(SELECT_NAME);
ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
- ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregator);
+ ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), initializer, aggregator);
StateStoreSupplier aggregateStore = Stores.create(windows.name())
.withKeys(keySerializer, keyDeserializer)
@@ -451,4 +453,25 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
// return the KTable representation with the intermediate topic as the sources
return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes);
}
+
+ @Override
+ public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
+ Serializer<K> keySerializer,
+ Serializer<Long> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<Long> aggValueDeserializer) {
+ return this.aggregateByKey(
+ new Initializer<Long>() {
+ @Override
+ public Long apply() {
+ return 0L;
+ }
+ },
+ new Aggregator<K, V, Long>() {
+ @Override
+ public Long apply(K aggKey, V value, Long aggregate) {
+ return aggregate + 1L;
+ }
+ }, windows, keySerializer, aggValueSerializer, keyDeserializer, aggValueDeserializer);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 1730a8c..6ce776a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -26,13 +27,17 @@ import org.apache.kafka.streams.state.KeyValueStore;
public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {
private final String storeName;
- private final Aggregator<K, V, T> aggregator;
+ private final Initializer<T> initializer;
+ private final Aggregator<K, V, T> add;
+ private final Aggregator<K, V, T> remove;
private boolean sendOldValues = false;
- public KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) {
+ public KTableAggregate(String storeName, Initializer<T> initializer, Aggregator<K, V, T> add, Aggregator<K, V, T> remove) {
this.storeName = storeName;
- this.aggregator = aggregator;
+ this.initializer = initializer;
+ this.add = add;
+ this.remove = remove;
}
@Override
@@ -62,18 +67,18 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
T oldAgg = store.get(key);
if (oldAgg == null)
- oldAgg = aggregator.initialValue(key);
+ oldAgg = initializer.apply();
T newAgg = oldAgg;
// first try to remove the old value
if (value.oldValue != null) {
- newAgg = aggregator.remove(key, value.oldValue, newAgg);
+ newAgg = remove.apply(key, value.oldValue, newAgg);
}
// then try to add the new new value
if (value.newValue != null) {
- newAgg = aggregator.add(key, value.newValue, newAgg);
+ newAgg = add.apply(key, value.newValue, newAgg);
}
// update the store with the new value
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 4398e3f..9853737 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
@@ -246,7 +247,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@Override
- public <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator,
+ public <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
+ Aggregator<K1, V1, T> add,
+ Aggregator<K1, V1, T> remove,
KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
Serializer<K1> keySerializer,
Serializer<V1> valueSerializer,
@@ -268,7 +271,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
- ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregator);
+ ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, initializer, add, remove);
StateStoreSupplier aggregateStore = Stores.create(name)
.withKeys(keySerializer, keyDeserializer)
@@ -296,6 +299,35 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@Override
+ public <K1, V1> KTable<K1, Long> count(KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+ Serializer<K1> keySerializer,
+ Serializer<V1> valueSerializer,
+ Serializer<Long> aggValueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V1> valueDeserializer,
+ Deserializer<Long> aggValueDeserializer,
+ String name) {
+ return this.aggregate(
+ new Initializer<Long>() {
+ @Override
+ public Long apply() {
+ return 0L;
+ }
+ },
+ new Aggregator<K1, V1, Long>() {
+ @Override
+ public Long apply(K1 aggKey, V1 value, Long aggregate) {
+ return aggregate + 1L;
+ }
+ }, new Aggregator<K1, V1, Long>() {
+ @Override
+ public Long apply(K1 aggKey, V1 value, Long aggregate) {
+ return aggregate - 1L;
+ }
+ }, selector, keySerializer, valueSerializer, aggValueSerializer, keyDeserializer, valueDeserializer, aggValueDeserializer, name);
+ }
+
+ @Override
public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer,
Reducer<V1> removeReducer,
KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
index 8a81113..93c5df6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
@@ -43,21 +44,19 @@ public class KStreamAggregateTest {
private final Serializer<String> strSerializer = new StringSerializer();
private final Deserializer<String> strDeserializer = new StringDeserializer();
- private class StringCanonizer implements Aggregator<String, String, String> {
+ private class StringAdd implements Aggregator<String, String, String> {
@Override
- public String initialValue(String aggKey) {
- return "0";
- }
-
- @Override
- public String add(String aggKey, String value, String aggregate) {
+ public String apply(String aggKey, String value, String aggregate) {
return aggregate + "+" + value;
}
+ }
+
+ private class StringInit implements Initializer<String> {
@Override
- public String remove(String aggKey, String value, String aggregate) {
- return aggregate + "-" + value;
+ public String apply() {
+ return "0";
}
}
@@ -70,7 +69,7 @@ public class KStreamAggregateTest {
String topic1 = "topic1";
KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1);
- KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringCanonizer(),
+ KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringInit(), new StringAdd(),
HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
strSerializer,
strSerializer,
@@ -149,7 +148,7 @@ public class KStreamAggregateTest {
String topic2 = "topic2";
KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1);
- KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringCanonizer(),
+ KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringInit(), new StringAdd(),
HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
strSerializer,
strSerializer,
@@ -160,7 +159,7 @@ public class KStreamAggregateTest {
table1.toStream().process(proc1);
KStream<String, String> stream2 = builder.stream(strDeserializer, strDeserializer, topic2);
- KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringCanonizer(),
+ KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringInit(), new StringAdd(),
HoppingWindows.of("topic2-Canonized").with(10L).every(5L),
strSerializer,
strSerializer,
http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 439aa09..59711db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
@@ -40,21 +41,27 @@ public class KTableAggregateTest {
private final Serializer<String> strSerializer = new StringSerializer();
private final Deserializer<String> strDeserializer = new StringDeserializer();
- private class StringCanonizer implements Aggregator<String, String, String> {
+ private class StringAdd implements Aggregator<String, String, String> {
@Override
- public String initialValue(String aggKey) {
- return "0";
+ public String apply(String aggKey, String value, String aggregate) {
+ return aggregate + "+" + value;
}
+ }
+
+ private class StringRemove implements Aggregator<String, String, String> {
@Override
- public String add(String aggKey, String value, String aggregate) {
- return aggregate + "+" + value;
+ public String apply(String aggKey, String value, String aggregate) {
+ return aggregate + "-" + value;
}
+ }
+
+ private class StringInit implements Initializer<String> {
@Override
- public String remove(String aggKey, String value, String aggregate) {
- return aggregate + "-" + value;
+ public String apply() {
+ return "0";
}
}
@@ -67,7 +74,7 @@ public class KTableAggregateTest {
String topic1 = "topic1";
KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
- KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringCanonizer(),
+ KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringInit(), new StringAdd(), new StringRemove(),
new NoOpKeyValueMapper<String, String>(),
strSerializer,
strSerializer,