You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/02 21:01:59 UTC

kafka git commit: KAFKA-3121: Refactor KStream Aggregate to be Lambda-able.

Repository: kafka
Updated Branches:
  refs/heads/trunk 8189f9d58 -> 95174337c


KAFKA-3121: Refactor KStream Aggregate to be Lambda-able.

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Matsuda <ya...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #839 from guozhangwang/K3121s2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/95174337
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/95174337
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/95174337

Branch: refs/heads/trunk
Commit: 95174337c2f6cda90e213e5c3a73fc89854f42a7
Parents: 8189f9d
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Feb 2 12:01:47 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Feb 2 12:01:47 2016 -0800

----------------------------------------------------------------------
 .../examples/pageview/PageViewTypedJob.java     |  3 +-
 .../examples/pageview/PageViewUnTypedJob.java   |  4 +--
 .../examples/wordcount/WordCountJob.java        |  4 +--
 .../kafka/streams/kstream/Aggregator.java       | 16 +--------
 .../org/apache/kafka/streams/kstream/Count.java | 36 --------------------
 .../kafka/streams/kstream/Initializer.java      | 23 +++++++++++++
 .../apache/kafka/streams/kstream/KStream.java   | 24 +++++++++----
 .../apache/kafka/streams/kstream/KTable.java    | 26 ++++++++++++--
 .../apache/kafka/streams/kstream/SumAsLong.java | 36 --------------------
 .../kstream/internals/KStreamAggregate.java     | 13 ++++---
 .../streams/kstream/internals/KStreamImpl.java  | 27 +++++++++++++--
 .../kstream/internals/KTableAggregate.java      | 17 +++++----
 .../streams/kstream/internals/KTableImpl.java   | 36 ++++++++++++++++++--
 .../kstream/internals/KStreamAggregateTest.java | 23 ++++++-------
 .../kstream/internals/KTableAggregateTest.java  | 23 ++++++++-----
 15 files changed, 175 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
index 7f11512..358cbe8 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.Count;
 import org.apache.kafka.streams.kstream.HoppingWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KStream;
@@ -99,7 +98,7 @@ public class PageViewTypedJob {
                     return viewByRegion;
                 })
                 .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
-                .aggregateByKey(new Count<String, PageViewByRegion>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
                         stringSerializer, longSerializer,
                         stringDeserializer, longDeserializer)
                 .toStream()

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
index 013332e..2fdfa97 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
@@ -31,7 +31,7 @@ import org.apache.kafka.connect.json.JsonDeserializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Count;
+import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.HoppingWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KStream;
@@ -78,7 +78,7 @@ public class PageViewUnTypedJob {
                             .put("region", region);
                 })
                 .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
-                .aggregateByKey(new Count<String, JsonNode>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
                         stringSerializer, longSerializer,
                         stringDeserializer, longDeserializer)
                 .toStream()

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
index c66e965..b922695 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
@@ -31,7 +31,7 @@ import org.apache.kafka.connect.json.JsonSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Count;
+import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -79,7 +79,7 @@ public class WordCountJob {
                         return new KeyValue<String, String>(value, value);
                     }
                 })
-                .aggregateByKey(new Count<>(), UnlimitedWindows.of("Counts").startOn(0L),
+                .countByKey(UnlimitedWindows.of("Counts").startOn(0L),
                         stringSerializer, longSerializer,
                         stringDeserializer, longDeserializer)
                 .toStream()

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index c601024..e3eb18f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -18,20 +18,6 @@
 package org.apache.kafka.streams.kstream;
 
 public interface Aggregator<K, V, T> {
-    /**
-     * Set the initial aggregate value
-     */
-    T initialValue(K aggKey);
 
-    /**
-     * When a new record with the aggregate key is added,
-     * updating the aggregate value for this key
-     */
-    T add(K aggKey, V value, T aggregate);
-
-    /**
-     * when an old record with the aggregate key is removed,
-     * updating the aggregate value for this key
-     */
-    T remove(K aggKey, V value, T aggregate);
+    T apply(K aggKey, V value, T aggregate);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
deleted file mode 100644
index 8780cc7..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public class Count<K, V> implements Aggregator<K, V, Long> {
-
-    @Override
-    public Long initialValue(K aggKey) {
-        return 0L;
-    }
-
-    @Override
-    public Long add(K aggKey, V value, Long aggregate) {
-        return aggregate + 1L;
-    }
-
-    @Override
-    public Long remove(K aggKey, V value, Long aggregate) {
-        return aggregate - 1L;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
new file mode 100644
index 0000000..0aeddc9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface Initializer<T> {
+
+    T apply();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 26f04f0..f6fa48d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -274,24 +274,36 @@ public interface KStream<K, V> {
      * @param windows the specification of the aggregation window
      */
     <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
-                                                             Windows<W> windows,
-                                                             Serializer<K> keySerializer,
-                                                             Serializer<V> aggValueSerializer,
-                                                             Deserializer<K> keyDeserializer,
-                                                             Deserializer<V> aggValueDeserializer);
+                                                          Windows<W> windows,
+                                                          Serializer<K> keySerializer,
+                                                          Serializer<V> aggValueSerializer,
+                                                          Deserializer<K> keyDeserializer,
+                                                          Deserializer<V> aggValueDeserializer);
 
     /**
      * Aggregate values of this stream by key on a window basis.
      *
+     * @param initializer the class of Initializer
      * @param aggregator the class of Aggregator
      * @param windows the specification of the aggregation window
      * @param <T>   the value type of the aggregated table
      */
-    <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator,
+    <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
+                                                                Aggregator<K, V, T> aggregator,
                                                                 Windows<W> windows,
                                                                 Serializer<K> keySerializer,
                                                                 Serializer<T> aggValueSerializer,
                                                                 Deserializer<K> keyDeserializer,
                                                                 Deserializer<T> aggValueDeserializer);
 
+    /**
+     * Count number of messages of this stream by key on a window basis.
+     *
+     * @param windows the specification of the aggregation window
+     */
+    <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
+                                                            Serializer<K> keySerializer,
+                                                            Serializer<Long> aggValueSerializer,
+                                                            Deserializer<K> keyDeserializer,
+                                                            Deserializer<Long> aggValueDeserializer);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index feb28ab..5cd9d9c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -162,14 +162,18 @@ public interface KTable<K, V> {
     /**
      * Aggregate values of this table by the selected key.
      *
-     * @param aggregator the class of Aggregator
+     * @param initializer the class of Initializer
+     * @param add the class of Aggregator
+     * @param remove the class of Aggregator
      * @param selector the KeyValue mapper that select the aggregate key
      * @param name the name of the resulted table
      * @param <K1>   the key type of the aggregated table
      * @param <V1>   the value type of the aggregated table
      * @return the instance of KTable
      */
-    <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator,
+    <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
+                                        Aggregator<K1, V1, T> add,
+                                        Aggregator<K1, V1, T> remove,
                                         KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
                                         Serializer<K1> keySerializer,
                                         Serializer<V1> valueSerializer,
@@ -178,4 +182,22 @@ public interface KTable<K, V> {
                                         Deserializer<V1> valueDeserializer,
                                         Deserializer<T> aggValueDeserializer,
                                         String name);
+
+    /**
+     * Count number of records of this table by the selected key.
+     *
+     * @param selector the KeyValue mapper that select the aggregate key
+     * @param name the name of the resulted table
+     * @param <K1>   the key type of the aggregated table
+     * @param <V1>   the value type of the aggregated table
+     * @return the instance of KTable
+     */
+    <K1, V1> KTable<K1, Long> count(KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+                                    Serializer<K1> keySerializer,
+                                    Serializer<V1> valueSerializer,
+                                    Serializer<Long> aggValueSerializer,
+                                    Deserializer<K1> keyDeserializer,
+                                    Deserializer<V1> valueDeserializer,
+                                    Deserializer<Long> aggValueDeserializer,
+                                    String name);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
deleted file mode 100644
index 1f8df04..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public class SumAsLong<K> implements Aggregator<K, Long, Long> {
-
-    @Override
-    public Long initialValue(K aggKey) {
-        return 0L;
-    }
-
-    @Override
-    public Long add(K aggKey, Long value, Long aggregate) {
-        return aggregate + value;
-    }
-
-    @Override
-    public Long remove(K aggKey, Long value, Long aggregate) {
-        return aggregate - value;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 49f3e71..b64277c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
@@ -35,13 +36,15 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
 
     private final String storeName;
     private final Windows<W> windows;
+    private final Initializer<T> initializer;
     private final Aggregator<K, V, T> aggregator;
 
     private boolean sendOldValues = false;
 
-    public KStreamAggregate(Windows<W> windows, String storeName, Aggregator<K, V, T> aggregator) {
+    public KStreamAggregate(Windows<W> windows, String storeName, Initializer<T> initializer, Aggregator<K, V, T> aggregator) {
         this.windows = windows;
         this.storeName = storeName;
+        this.initializer = initializer;
         this.aggregator = aggregator;
     }
 
@@ -97,10 +100,10 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
                     T oldAgg = entry.value;
 
                     if (oldAgg == null)
-                        oldAgg = aggregator.initialValue(key);
+                        oldAgg = initializer.apply();
 
                     // try to add the new new value (there will never be old value)
-                    T newAgg = aggregator.add(key, value, oldAgg);
+                    T newAgg = aggregator.apply(key, value, oldAgg);
 
                     // update the store with the new value
                     windowStore.put(key, newAgg, window.start());
@@ -119,8 +122,8 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
 
             // create the new window for the rest of unmatched window that do not exist yet
             for (long windowStartMs : matchedWindows.keySet()) {
-                T oldAgg = aggregator.initialValue(key);
-                T newAgg = aggregator.add(key, value, oldAgg);
+                T oldAgg = initializer.apply();
+                T newAgg = aggregator.apply(key, value, oldAgg);
 
                 windowStore.put(key, newAgg, windowStartMs);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 73f7266..79a3115 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -421,7 +422,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator,
+    public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
+                                                                       Aggregator<K, V, T> aggregator,
                                                                        Windows<W> windows,
                                                                        Serializer<K> keySerializer,
                                                                        Serializer<T> aggValueSerializer,
@@ -434,7 +436,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         String selectName = topology.newName(SELECT_NAME);
 
         ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
-        ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregator);
+        ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), initializer, aggregator);
 
         StateStoreSupplier aggregateStore = Stores.create(windows.name())
                 .withKeys(keySerializer, keyDeserializer)
@@ -451,4 +453,25 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         // return the KTable representation with the intermediate topic as the sources
         return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes);
     }
+
+    @Override
+    public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
+                                                                   Serializer<K> keySerializer,
+                                                                   Serializer<Long> aggValueSerializer,
+                                                                   Deserializer<K> keyDeserializer,
+                                                                   Deserializer<Long> aggValueDeserializer) {
+        return this.aggregateByKey(
+                new Initializer<Long>() {
+                    @Override
+                    public Long apply() {
+                        return 0L;
+                    }
+                },
+                new Aggregator<K, V, Long>() {
+                    @Override
+                    public Long apply(K aggKey, V value, Long aggregate) {
+                        return aggregate + 1L;
+                    }
+                }, windows, keySerializer, aggValueSerializer, keyDeserializer, aggValueDeserializer);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 1730a8c..6ce776a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -26,13 +27,17 @@ import org.apache.kafka.streams.state.KeyValueStore;
 public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {
 
     private final String storeName;
-    private final Aggregator<K, V, T> aggregator;
+    private final Initializer<T> initializer;
+    private final Aggregator<K, V, T> add;
+    private final Aggregator<K, V, T> remove;
 
     private boolean sendOldValues = false;
 
-    public KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) {
+    public KTableAggregate(String storeName, Initializer<T> initializer, Aggregator<K, V, T> add, Aggregator<K, V, T> remove) {
         this.storeName = storeName;
-        this.aggregator = aggregator;
+        this.initializer = initializer;
+        this.add = add;
+        this.remove = remove;
     }
 
     @Override
@@ -62,18 +67,18 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
             T oldAgg = store.get(key);
 
             if (oldAgg == null)
-                oldAgg = aggregator.initialValue(key);
+                oldAgg = initializer.apply();
 
             T newAgg = oldAgg;
 
             // first try to remove the old value
             if (value.oldValue != null) {
-                newAgg = aggregator.remove(key, value.oldValue, newAgg);
+                newAgg = remove.apply(key, value.oldValue, newAgg);
             }
 
             // then try to add the new new value
             if (value.newValue != null) {
-                newAgg = aggregator.add(key, value.newValue, newAgg);
+                newAgg = add.apply(key, value.newValue, newAgg);
             }
 
             // update the store with the new value

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 4398e3f..9853737 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -246,7 +247,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator,
+    public <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
+                                               Aggregator<K1, V1, T> add,
+                                               Aggregator<K1, V1, T> remove,
                                                KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
                                                Serializer<K1> keySerializer,
                                                Serializer<V1> valueSerializer,
@@ -268,7 +271,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
 
-        ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregator);
+        ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, initializer, add, remove);
 
         StateStoreSupplier aggregateStore = Stores.create(name)
                 .withKeys(keySerializer, keyDeserializer)
@@ -296,6 +299,35 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
+    public <K1, V1> KTable<K1, Long> count(KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+                                           Serializer<K1> keySerializer,
+                                           Serializer<V1> valueSerializer,
+                                           Serializer<Long> aggValueSerializer,
+                                           Deserializer<K1> keyDeserializer,
+                                           Deserializer<V1> valueDeserializer,
+                                           Deserializer<Long> aggValueDeserializer,
+                                           String name) {
+        return this.aggregate(
+                new Initializer<Long>() {
+                    @Override
+                    public Long apply() {
+                        return 0L;
+                    }
+                },
+                new Aggregator<K1, V1, Long>() {
+                    @Override
+                    public Long apply(K1 aggKey, V1 value, Long aggregate) {
+                        return aggregate + 1L;
+                    }
+                }, new Aggregator<K1, V1, Long>() {
+                    @Override
+                    public Long apply(K1 aggKey, V1 value, Long aggregate) {
+                        return aggregate - 1L;
+                    }
+                }, selector, keySerializer, valueSerializer, aggValueSerializer, keyDeserializer, valueDeserializer, aggValueDeserializer, name);
+    }
+
+    @Override
     public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer,
                                           Reducer<V1> removeReducer,
                                           KeyValueMapper<K, V, KeyValue<K1, V1>> selector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
index 8a81113..93c5df6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -43,21 +44,19 @@ public class KStreamAggregateTest {
     private final Serializer<String> strSerializer = new StringSerializer();
     private final Deserializer<String> strDeserializer = new StringDeserializer();
 
-    private class StringCanonizer implements Aggregator<String, String, String> {
+    private class StringAdd implements Aggregator<String, String, String> {
 
         @Override
-        public String initialValue(String aggKey) {
-            return "0";
-        }
-
-        @Override
-        public String add(String aggKey, String value, String aggregate) {
+        public String apply(String aggKey, String value, String aggregate) {
             return aggregate + "+" + value;
         }
+    }
+
+    private class StringInit implements Initializer<String> {
 
         @Override
-        public String remove(String aggKey, String value, String aggregate) {
-            return aggregate + "-" + value;
+        public String apply() {
+            return "0";
         }
     }
 
@@ -70,7 +69,7 @@ public class KStreamAggregateTest {
             String topic1 = "topic1";
 
             KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1);
-            KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringCanonizer(),
+            KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringInit(), new StringAdd(),
                     HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
                     strSerializer,
                     strSerializer,
@@ -149,7 +148,7 @@ public class KStreamAggregateTest {
             String topic2 = "topic2";
 
             KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1);
-            KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringCanonizer(),
+            KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringInit(), new StringAdd(),
                     HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
                     strSerializer,
                     strSerializer,
@@ -160,7 +159,7 @@ public class KStreamAggregateTest {
             table1.toStream().process(proc1);
 
             KStream<String, String> stream2 = builder.stream(strDeserializer, strDeserializer, topic2);
-            KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringCanonizer(),
+            KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringInit(), new StringAdd(),
                     HoppingWindows.of("topic2-Canonized").with(10L).every(5L),
                     strSerializer,
                     strSerializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/95174337/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 439aa09..59711db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -40,21 +41,27 @@ public class KTableAggregateTest {
     private final Serializer<String> strSerializer = new StringSerializer();
     private final Deserializer<String> strDeserializer = new StringDeserializer();
 
-    private class StringCanonizer implements Aggregator<String, String, String> {
+    private class StringAdd implements Aggregator<String, String, String> {
 
         @Override
-        public String initialValue(String aggKey) {
-            return "0";
+        public String apply(String aggKey, String value, String aggregate) {
+            return aggregate + "+" + value;
         }
+    }
+
+    private class StringRemove implements Aggregator<String, String, String> {
 
         @Override
-        public String add(String aggKey, String value, String aggregate) {
-            return aggregate + "+" + value;
+        public String apply(String aggKey, String value, String aggregate) {
+            return aggregate + "-" + value;
         }
+    }
+
+    private class StringInit implements Initializer<String> {
 
         @Override
-        public String remove(String aggKey, String value, String aggregate) {
-            return aggregate + "-" + value;
+        public String apply() {
+            return "0";
         }
     }
 
@@ -67,7 +74,7 @@ public class KTableAggregateTest {
             String topic1 = "topic1";
 
             KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
-            KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringCanonizer(),
+            KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringInit(), new StringAdd(), new StringRemove(),
                     new NoOpKeyValueMapper<String, String>(),
                     strSerializer,
                     strSerializer,