You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/21 01:10:51 UTC

[5/5] kafka git commit: KAFKA-3121: Remove aggregatorSupplier and add Reduce functions

KAFKA-3121: Remove aggregatorSupplier and add Reduce functions

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Matsuda

Closes #795 from guozhangwang/K3121s1


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/959cf09e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/959cf09e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/959cf09e

Branch: refs/heads/trunk
Commit: 959cf09e8653f4b8255f49c6f4c258ed1a5ec38e
Parents: e4ef8e6
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Jan 20 16:10:43 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jan 20 16:10:43 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/examples/KTableJob.java       | 111 ---
 .../kafka/streams/kstream/Aggregator.java       |   7 +-
 .../streams/kstream/AggregatorSupplier.java     |  23 -
 .../org/apache/kafka/streams/kstream/Count.java |  36 +
 .../apache/kafka/streams/kstream/KStream.java   |  38 +-
 .../apache/kafka/streams/kstream/KTable.java    |  59 +-
 .../apache/kafka/streams/kstream/Reducer.java   |  23 +
 .../apache/kafka/streams/kstream/SumAsLong.java |  36 +
 .../kstream/internals/CountSupplier.java        |  52 --
 .../kstream/internals/KStreamAggregate.java     |   4 +-
 .../streams/kstream/internals/KStreamImpl.java  | 122 ++--
 .../kstream/internals/KStreamReduce.java        | 167 +++++
 .../kstream/internals/KTableAggregate.java      |   4 +-
 .../streams/kstream/internals/KTableImpl.java   | 128 ++--
 .../streams/kstream/internals/KTableReduce.java | 120 ++++
 .../kstream/internals/KTableStoreSupplier.java  |   4 +-
 .../kstream/internals/LongSumSupplier.java      |  52 --
 .../streams/kstream/internals/TopKSupplier.java | 106 ---
 .../internals/ProcessorStateManager.java        |   2 +-
 .../state/InMemoryKeyValueStoreSupplier.java    | 155 -----
 .../state/InMemoryLRUCacheStoreSupplier.java    | 195 ------
 .../streams/state/MeteredKeyValueStore.java     | 250 -------
 .../kafka/streams/state/MeteredWindowStore.java | 206 ------
 .../kafka/streams/state/OffsetCheckpoint.java   | 162 -----
 .../state/RocksDBKeyValueStoreSupplier.java     |  52 --
 .../kafka/streams/state/RocksDBStore.java       | 265 --------
 .../kafka/streams/state/RocksDBWindowStore.java | 289 --------
 .../state/RocksDBWindowStoreSupplier.java       |  58 --
 .../kafka/streams/state/StoreChangeLogger.java  |  91 ---
 .../org/apache/kafka/streams/state/Stores.java  |   3 +
 .../InMemoryKeyValueStoreSupplier.java          | 159 +++++
 .../InMemoryLRUCacheStoreSupplier.java          | 199 ++++++
 .../state/internals/MeteredKeyValueStore.java   | 254 +++++++
 .../state/internals/MeteredWindowStore.java     | 209 ++++++
 .../state/internals/OffsetCheckpoint.java       | 162 +++++
 .../internals/RocksDBKeyValueStoreSupplier.java |  53 ++
 .../streams/state/internals/RocksDBStore.java   | 269 ++++++++
 .../state/internals/RocksDBWindowStore.java     | 295 ++++++++
 .../internals/RocksDBWindowStoreSupplier.java   |  59 ++
 .../state/internals/StoreChangeLogger.java      |  92 +++
 .../kstream/internals/KStreamAggregateTest.java |  36 +-
 .../kstream/internals/KTableAggregateTest.java  |  36 +-
 .../internals/ProcessorStateManagerTest.java    |   2 +-
 .../processor/internals/StandbyTaskTest.java    |   2 +-
 .../state/AbstractKeyValueStoreTest.java        | 191 ------
 .../state/InMemoryKeyValueStoreTest.java        |  48 --
 .../state/InMemoryLRUCacheStoreTest.java        | 156 -----
 .../streams/state/KeyValueStoreTestDriver.java  |   4 +-
 .../streams/state/RocksDBKeyValueStoreTest.java |  50 --
 .../streams/state/RocksDBWindowStoreTest.java   | 671 ------------------
 .../internals/AbstractKeyValueStoreTest.java    | 195 ++++++
 .../internals/InMemoryKeyValueStoreTest.java    |  50 ++
 .../internals/InMemoryLRUCacheStoreTest.java    | 159 +++++
 .../internals/RocksDBKeyValueStoreTest.java     |  52 ++
 .../state/internals/RocksDBWindowStoreTest.java | 676 +++++++++++++++++++
 55 files changed, 3468 insertions(+), 3431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
deleted file mode 100644
index 45ff58e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.KafkaStreaming;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Windowed;
-
-import java.util.Properties;
-
-public class KTableJob {
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamingConfig.JOB_ID_CONFIG, "example-ktable");
-        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
-        StreamingConfig config = new StreamingConfig(props);
-
-        Serializer<String> stringSerializer = new StringSerializer();
-        Deserializer<String> stringDeserializer = new StringDeserializer();
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        // stream aggregate
-        KStream<String, Long> stream1 = builder.stream("topic1");
-
-        @SuppressWarnings("unchecked")
-        KTable<Windowed<String>, Long> wtable1 = stream1.sumByKey(new KeyValueToLongMapper<String, Long>() {
-            @Override
-            public long apply(String key, Long value) {
-                return value;
-            }
-        }, HoppingWindows.of("window1").with(500L).every(500L).emit(1000L).until(1000L * 60 * 60 * 24 /* one day */), stringSerializer, stringDeserializer);
-
-        // table aggregation
-        KTable<String, String> table1 = builder.table("topic2");
-
-        KTable<String, Long> table2 = table1.sum(new KeyValueMapper<String, String, String>() {
-            @Override
-            public String apply(String key, String value) {
-                return value;
-            }
-        }, new KeyValueToLongMapper<String, String>() {
-            @Override
-            public long apply(String key, String value) {
-                return Long.parseLong(value);
-            }
-        }, stringSerializer, stringDeserializer, "table2");
-
-        // stream-table join
-        KStream<String, Long> stream2 = stream1.leftJoin(table2, new ValueJoiner<Long, Long, Long>() {
-            @Override
-            public Long apply(Long value1, Long value2) {
-                if (value2 == null)
-                    return 0L;
-                else
-                    return value1 * value2;
-            }
-        });
-
-        // table-table join
-        KTable<String, String> table3 = table1.outerJoin(table2, new ValueJoiner<String, Long, String>() {
-            @Override
-            public String apply(String value1, Long value2) {
-                if (value2 == null)
-                    return value1 + "-null";
-                else if (value1 == null)
-                    return "null-" + value2;
-                else
-                    return value1 + "-" + value2;
-            }
-        });
-
-        wtable1.to("topic3");
-
-        KafkaStreaming kstream = new KafkaStreaming(builder, config);
-        kstream.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index d715fbd..c601024 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -21,7 +21,7 @@ public interface Aggregator<K, V, T> {
     /**
      * Set the initial aggregate value
      */
-    T initialValue();
+    T initialValue(K aggKey);
 
     /**
      * When a new record with the aggregate key is added,
@@ -34,9 +34,4 @@ public interface Aggregator<K, V, T> {
      * updating the aggregate value for this key
      */
     T remove(K aggKey, V value, T aggregate);
-
-    /**
-     * Merge two aggregate values
-     */
-    T merge(T aggr1, T aggr2);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
deleted file mode 100644
index 6ed9125..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface AggregatorSupplier<K, V, T> {
-
-    Aggregator<K, V, T> get();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
new file mode 100644
index 0000000..3c1ed46
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public class Count<K> implements Aggregator<K, Long, Long> {
+
+    @Override
+    public Long initialValue(K aggKey) {
+        return 0L;
+    }
+
+    @Override
+    public Long add(K aggKey, Long value, Long aggregate) {
+        return aggregate + 1L;
+    }
+
+    @Override
+    public Long remove(K aggKey, Long value, Long aggregate) {
+        return aggregate - 1L;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 36741a8..dfed661 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 
+
 /**
  * KStream is an abstraction of a stream of key-value pairs.
  *
@@ -268,35 +269,28 @@ public interface KStream<K, V> {
     /**
      * Aggregate values of this stream by key on a window basis.
      *
-     * @param aggregatorSupplier the class of aggregatorSupplier
+     * @param reducer the class of Reducer
+     * @param windows the specification of the aggregation window
+     */
+    <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
+                                                             Windows<W> windows,
+                                                             Serializer<K> keySerializer,
+                                                             Serializer<V> aggValueSerializer,
+                                                             Deserializer<K> keyDeserializer,
+                                                             Deserializer<V> aggValueDeserializer);
+
+    /**
+     * Aggregate values of this stream by key on a window basis.
+     *
+     * @param aggregator the class of Aggregator
      * @param windows the specification of the aggregation window
      * @param <T>   the value type of the aggregated table
      */
-    <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier,
+    <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator,
                                                                 Windows<W> windows,
                                                                 Serializer<K> keySerializer,
                                                                 Serializer<T> aggValueSerializer,
                                                                 Deserializer<K> keyDeserializer,
                                                                 Deserializer<T> aggValueDeserializer);
 
-    /**
-     * Sum extracted long integer values of this stream by key on a window basis.
-     *
-     * @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value
-     * @param windows the specification of the aggregation window
-     */
-    <W extends Window> KTable<Windowed<K>, Long> sumByKey(KeyValueToLongMapper<K, V> valueSelector,
-                                                          Windows<W> windows,
-                                                          Serializer<K> keySerializer,
-                                                          Deserializer<K> keyDeserializer);
-
-    /**
-     * Count number of records of this stream by key on a window basis.
-     *
-     * @param windows the specification of the aggregation window
-     */
-    <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
-                                                            Serializer<K> keySerializer,
-                                                            Deserializer<K> keyDeserializer);
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 93eceec..87298d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -139,49 +139,42 @@ public interface KTable<K, V> {
     <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
     /**
-     * Aggregate values of this table by the selected key.
+     * Reduce values of this table by the selected key.
      *
-     * @param aggregatorSupplier the class of AggregatorSupplier
+     * @param addReducer the class of Reducer
+     * @param removeReducer the class of Reducer
      * @param selector the KeyValue mapper that select the aggregate key
      * @param name the name of the resulted table
      * @param <K1>   the key type of the aggregated table
      * @param <V1>   the value type of the aggregated table
      * @return the instance of KTable
      */
-    <K1, V1, T> KTable<K1, T> aggregate(AggregatorSupplier<K1, V1, T> aggregatorSupplier,
-                                          KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                          Serializer<K1> keySerializer,
-                                          Serializer<V1> valueSerializer,
-                                          Serializer<T> aggValueSerializer,
-                                          Deserializer<K1> keyDeserializer,
-                                          Deserializer<V1> valueDeserializer,
-                                          Deserializer<T> aggValueDeserializer,
-                                          String name);
-
-    /**
-     * Sum extracted long integer values of this table by the selected aggregation key
-     *
-     * @param keySelector the class of KeyValueMapper to select the aggregation key
-     * @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value
-     * @param name the name of the resulted table
-     */
-    <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
-                              KeyValueToLongMapper<K, V> valueSelector,
-                              Serializer<K1> keySerializer,
-                              Deserializer<K1> keyDeserializer,
-                              String name);
+    <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer,
+                                   Reducer<V1> removeReducer,
+                                   KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+                                   Serializer<K1> keySerializer,
+                                   Serializer<V1> valueSerializer,
+                                   Deserializer<K1> keyDeserializer,
+                                   Deserializer<V1> valueDeserializer,
+                                   String name);
 
     /**
-     * Count number of records of this table by the selected aggregation key
+     * Aggregate values of this table by the selected key.
      *
-     * @param keySelector the class of KeyValueMapper to select the aggregation key
+     * @param aggregator the class of Aggregator
+     * @param selector the KeyValue mapper that select the aggregate key
      * @param name the name of the resulted table
+     * @param <K1>   the key type of the aggregated table
+     * @param <V1>   the value type of the aggregated table
+     * @return the instance of KTable
      */
-    <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
-                                Serializer<K1> keySerializer,
-                                Serializer<V> valueSerializer,
-                                Deserializer<K1> keyDeserializer,
-                                Deserializer<V> valueDeserializer,
-                                String name);
-
+    <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator,
+                                        KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+                                        Serializer<K1> keySerializer,
+                                        Serializer<V1> valueSerializer,
+                                        Serializer<T> aggValueSerializer,
+                                        Deserializer<K1> keyDeserializer,
+                                        Deserializer<V1> valueDeserializer,
+                                        Deserializer<T> aggValueDeserializer,
+                                        String name);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
new file mode 100644
index 0000000..418f442
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface Reducer<V> {
+
+    V apply(V value1, V value2);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
new file mode 100644
index 0000000..1f8df04
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public class SumAsLong<K> implements Aggregator<K, Long, Long> {
+
+    @Override
+    public Long initialValue(K aggKey) {
+        return 0L;
+    }
+
+    @Override
+    public Long add(K aggKey, Long value, Long aggregate) {
+        return aggregate + value;
+    }
+
+    @Override
+    public Long remove(K aggKey, Long value, Long aggregate) {
+        return aggregate - value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
deleted file mode 100644
index b7dc5aa..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
-
-public class CountSupplier<K, V> implements AggregatorSupplier<K, V, Long> {
-
-    private class Count implements Aggregator<K, V, Long> {
-
-        @Override
-        public Long initialValue() {
-            return 0L;
-        }
-
-        @Override
-        public Long add(K aggKey, V value, Long aggregate) {
-            return aggregate + 1;
-        }
-
-        @Override
-        public Long remove(K aggKey, V value, Long aggregate) {
-            return aggregate - 1;
-        }
-
-        @Override
-        public Long merge(Long aggr1, Long aggr2) {
-            return aggr1 + aggr2;
-        }
-    }
-
-    @Override
-    public Aggregator<K, V, Long> get() {
-        return new Count();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 5745a03..91bfa9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -97,7 +97,7 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
                     T oldAgg = entry.value;
 
                     if (oldAgg == null)
-                        oldAgg = aggregator.initialValue();
+                        oldAgg = aggregator.initialValue(key);
 
                     // try to add the new new value (there will never be old value)
                     T newAgg = aggregator.add(key, value, oldAgg);
@@ -119,7 +119,7 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
 
             // create the new window for the rest of unmatched window that do not exist yet
             for (long windowStartMs : matchedWindows.keySet()) {
-                T oldAgg = aggregator.initialValue();
+                T oldAgg = aggregator.initialValue(key);
                 T newAgg = aggregator.add(key, value, oldAgg);
 
                 windowStore.put(key, newAgg, windowStartMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 691910b..ce89220 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -18,15 +18,13 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
+import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
+import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
@@ -39,7 +37,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.streams.state.Serdes;
 
 import java.lang.reflect.Array;
@@ -48,47 +46,50 @@ import java.util.Set;
 
 public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
 
-    private static final String FILTER_NAME = "KSTREAM-FILTER-";
+    private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
 
-    private static final String MAP_NAME = "KSTREAM-MAP-";
+    private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
 
-    private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
+    private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
+
+    private static final String FILTER_NAME = "KSTREAM-FILTER-";
 
     private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
 
     private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
 
-    private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
+    public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
 
-    private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
+    public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
 
-    private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
+    public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
 
-    private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
+    private static final String MAP_NAME = "KSTREAM-MAP-";
 
-    private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
+    private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
 
-    private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
+    public static final String MERGE_NAME = "KSTREAM-MERGE-";
 
-    private static final String SELECT_NAME = "KSTREAM-SELECT-";
+    public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
 
-    private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
+    public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
 
-    public static final String SINK_NAME = "KSTREAM-SINK-";
+    private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
 
-    public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
+    private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
 
-    public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
+    private static final String SELECT_NAME = "KSTREAM-SELECT-";
 
-    public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
+    public static final String SINK_NAME = "KSTREAM-SINK-";
 
-    public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
+    public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
 
-    public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
+    private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
 
-    public static final String MERGE_NAME = "KSTREAM-MERGE-";
+    private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
+
+    private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
 
-    public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
 
     public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
         super(topology, name, sourceNodes);
@@ -394,7 +395,41 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier,
+    public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
+                                                                 Windows<W> windows,
+                                                                 Serializer<K> keySerializer,
+                                                                 Serializer<V> aggValueSerializer,
+                                                                 Deserializer<K> keyDeserializer,
+                                                                 Deserializer<V> aggValueDeserializer) {
+
+        // TODO: this agg window operator is only used for casting K to Windowed<K> for
+        // KTableProcessorSupplier, which is a bit awkward and better be removed in the future
+        String reduceName = topology.newName(REDUCE_NAME);
+        String selectName = topology.newName(SELECT_NAME);
+
+        ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
+        ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamReduce<>(windows, windows.name(), reducer);
+
+        RocksDBWindowStoreSupplier<K, V> aggregateStore =
+                new RocksDBWindowStoreSupplier<>(
+                        windows.name(),
+                        windows.maintainMs(),
+                        windows.segments,
+                        false,
+                        new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer),
+                        null);
+
+        // aggregate the values with the aggregator and local store
+        topology.addProcessor(selectName, aggWindowSupplier, this.name);
+        topology.addProcessor(reduceName, aggregateSupplier, selectName);
+        topology.addStateStore(aggregateStore, reduceName);
+
+        // return the KTable representation with the intermediate topic as the sources
+        return new KTableImpl<>(topology, reduceName, aggregateSupplier, sourceNodes);
+    }
+
+    @Override
+    public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator,
                                                                        Windows<W> windows,
                                                                        Serializer<K> keySerializer,
                                                                        Serializer<T> aggValueSerializer,
@@ -407,7 +442,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         String selectName = topology.newName(SELECT_NAME);
 
         ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
-        ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregatorSupplier.get());
+        ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregator);
 
         RocksDBWindowStoreSupplier<K, T> aggregateStore =
                 new RocksDBWindowStoreSupplier<>(
@@ -426,39 +461,4 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         // return the KTable representation with the intermediate topic as the sources
         return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes);
     }
-
-    @Override
-    public <W extends Window> KTable<Windowed<K>, Long> sumByKey(final KeyValueToLongMapper<K, V> valueSelector,
-                                                                 Windows<W> windows,
-                                                                 Serializer<K> keySerializer,
-                                                                 Deserializer<K> keyDeserializer) {
-
-        KStream<K, Long> selected = this.map(new KeyValueMapper<K, V, KeyValue<K, Long>>() {
-            @Override
-            public KeyValue<K, Long> apply(K key, V value) {
-                return new KeyValue<>(key, valueSelector.apply(key, value));
-            }
-        });
-
-        return selected.<Long, W>aggregateByKey(new LongSumSupplier<K>(),
-                windows,
-                keySerializer,
-                new LongSerializer(),
-                keyDeserializer,
-                new LongDeserializer());
-    }
-
-
-    @Override
-    public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
-                                                                   Serializer<K> keySerializer,
-                                                                   Deserializer<K> keyDeserializer) {
-
-        return this.<Long, W>aggregateByKey(new CountSupplier<K, V>(),
-                windows,
-                keySerializer,
-                new LongSerializer(),
-                keyDeserializer,
-                new LongDeserializer());
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
new file mode 100644
index 0000000..7d6eb27
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.Iterator;
+import java.util.Map;
+
+public class KStreamReduce<K, V, W extends Window> implements KTableProcessorSupplier<Windowed<K>, V, V> {
+
+    private final String storeName;
+    private final Windows<W> windows;
+    private final Reducer<V> reducer;
+
+    private boolean sendOldValues = false;
+
+    public KStreamReduce(Windows<W> windows, String storeName, Reducer<V> reducer) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.reducer = reducer;
+    }
+
+    @Override
+    public Processor<Windowed<K>, Change<V>> get() {
+        return new KStreamAggregateProcessor();
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamAggregateProcessor extends AbstractProcessor<Windowed<K>, Change<V>> {
+
+        private WindowStore<K, V> windowStore;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+
+            windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+        }
+
+        @Override
+        public void process(Windowed<K> windowedKey, Change<V> change) {
+            // first get the matching windows
+            long timestamp = windowedKey.window().start();
+            K key = windowedKey.value();
+            V value = change.newValue;
+
+            Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
+
+            long timeFrom = Long.MAX_VALUE;
+            long timeTo = Long.MIN_VALUE;
+
+            // use range query on window store for efficient reads
+            for (long windowStartMs : matchedWindows.keySet()) {
+                timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
+                timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
+            }
+
+            WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo);
+
+            // for each matching window, try to update the corresponding key and send to the downstream
+            while (iter.hasNext()) {
+                KeyValue<Long, V> entry = iter.next();
+                W window = matchedWindows.get(entry.key);
+
+                if (window != null) {
+
+                    V oldAgg = entry.value;
+                    V newAgg = oldAgg;
+
+                    // try to add the new new value (there will never be old value)
+                    if (newAgg == null) {
+                        newAgg = value;
+                    } else {
+                        newAgg = reducer.apply(newAgg, value);
+                    }
+
+                    // update the store with the new value
+                    windowStore.put(key, newAgg, window.start());
+
+                    // forward the aggregated change pair
+                    if (sendOldValues)
+                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+                    else
+                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+
+                    matchedWindows.remove(entry.key);
+                }
+            }
+
+            iter.close();
+
+            // create the new window for the rest of unmatched window that do not exist yet
+            for (long windowStartMs : matchedWindows.keySet()) {
+                windowStore.put(key, value, windowStartMs);
+
+                // send the new aggregate pair (there will be no old value)
+                context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(value, null));
+            }
+        }
+    }
+
+    @Override
+    public KTableValueGetterSupplier<Windowed<K>, V> view() {
+
+        return new KTableValueGetterSupplier<Windowed<K>, V>() {
+
+            public KTableValueGetter<Windowed<K>, V> get() {
+                return new KStreamAggregateValueGetter();
+            }
+
+        };
+    }
+
+    private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, V> {
+
+        private WindowStore<K, V> windowStore;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public V get(Windowed<K> windowedKey) {
+            K key = windowedKey.value();
+            W window = (W) windowedKey.window();
+
+            // this iterator should only contain one element
+            Iterator<KeyValue<Long, V>> iter = windowStore.fetch(key, window.start(), window.start());
+
+            return iter.next().value;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index a5948f8..1730a8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -30,7 +30,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
 
     private boolean sendOldValues = false;
 
-    KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) {
+    public KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) {
         this.storeName = storeName;
         this.aggregator = aggregator;
     }
@@ -62,7 +62,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
             T oldAgg = store.get(key);
 
             if (oldAgg == null)
-                oldAgg = aggregator.initialValue();
+                oldAgg = aggregator.initialValue(key);
 
             T newAgg = oldAgg;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 9888dff..8ee557c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,17 +18,15 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
+import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -48,31 +46,34 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
 
-    private static final String FILTER_NAME = "KTABLE-FILTER-";
+    private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
 
-    private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
+    private static final String FILTER_NAME = "KTABLE-FILTER-";
 
-    private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
+    public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
 
-    private static final String SELECT_NAME = "KTABLE-SELECT-";
+    public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
 
-    private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
+    public static final String LEFTTHIS_NAME = "KTABLE-LEFTTHIS-";
 
-    public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+    public static final String LEFTOTHER_NAME = "KTABLE-LEFTOTHER-";
 
-    public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
+    private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
 
-    public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
+    public static final String MERGE_NAME = "KTABLE-MERGE-";
 
     public static final String OUTERTHIS_NAME = "KTABLE-OUTERTHIS-";
 
     public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-";
 
-    public static final String LEFTTHIS_NAME = "KTABLE-LEFTTHIS-";
+    private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
-    public static final String LEFTOTHER_NAME = "KTABLE-LEFTOTHER-";
+    private static final String SELECT_NAME = "KTABLE-SELECT-";
+
+    public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+
+    private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
-    public static final String MERGE_NAME = "KTABLE-MERGE-";
 
     public final ProcessorSupplier<K, ?> processorSupplier;
 
@@ -245,15 +246,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
-                                                 KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                                 Serializer<K1> keySerializer,
-                                                 Serializer<V1> valueSerializer,
-                                                 Serializer<V2> aggValueSerializer,
-                                                 Deserializer<K1> keyDeserializer,
-                                                 Deserializer<V1> valueDeserializer,
-                                                 Deserializer<V2> aggValueDeserializer,
-                                                 String name) {
+    public <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator,
+                                               KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+                                               Serializer<K1> keySerializer,
+                                               Serializer<V1> valueSerializer,
+                                               Serializer<T> aggValueSerializer,
+                                               Deserializer<K1> keyDeserializer,
+                                               Deserializer<V1> valueDeserializer,
+                                               Deserializer<T> aggValueDeserializer,
+                                               String name) {
 
         String selectName = topology.newName(SELECT_NAME);
         String sinkName = topology.newName(KStreamImpl.SINK_NAME);
@@ -267,7 +268,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
 
-        ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregatorSupplier.get());
+        ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregator);
 
         StateStoreSupplier aggregateStore = Stores.create(name)
                 .withKeys(keySerializer, keyDeserializer)
@@ -295,55 +296,52 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public <K1> KTable<K1, Long> sum(final KeyValueMapper<K, V, K1> keySelector,
-                                     final KeyValueToLongMapper<K, V> valueSelector,
-                                     Serializer<K1> keySerializer,
-                                     Deserializer<K1> keyDeserializer,
-                                     String name) {
+    public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer,
+                                          Reducer<V1> removeReducer,
+                                          KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+                                          Serializer<K1> keySerializer,
+                                          Serializer<V1> valueSerializer,
+                                          Deserializer<K1> keyDeserializer,
+                                          Deserializer<V1> valueDeserializer,
+                                          String name) {
 
-        Serializer<Long> longSerializer = new LongSerializer();
-        Deserializer<Long> longDeserializer = new LongDeserializer();
+        String selectName = topology.newName(SELECT_NAME);
+        String sinkName = topology.newName(KStreamImpl.SINK_NAME);
+        String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
+        String reduceName = topology.newName(REDUCE_NAME);
 
-        KeyValueMapper<K, V, KeyValue<K1, Long>> mapper = new KeyValueMapper<K, V, KeyValue<K1, Long>>() {
-            @Override
-            public KeyValue<K1, Long> apply(K key, V value) {
-                K1 aggKey = keySelector.apply(key, value);
-                Long aggValue = valueSelector.apply(key, value);
+        String topic = name + REPARTITION_TOPIC_SUFFIX;
 
-                return new KeyValue<>(aggKey, aggValue);
-            }
-        };
+        ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
+        ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
 
-        return this.<K1, Long, Long>aggregate(new LongSumSupplier<K1>(), mapper,
-                keySerializer, longSerializer, longSerializer,
-                keyDeserializer, longDeserializer, longDeserializer,
-                name);
-    }
+        KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
 
-    @Override
-    public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> keySelector,
-                                       Serializer<K1> keySerializer,
-                                       Serializer<V> valueSerializer,
-                                       Deserializer<K1> keyDeserializer,
-                                       Deserializer<V> valueDeserializer,
-                                       String name) {
+        ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableReduce<>(name, addReducer, removeReducer);
 
-        Serializer<Long> longSerializer = new LongSerializer();
-        Deserializer<Long> longDeserializer = new LongDeserializer();
+        StateStoreSupplier aggregateStore = Stores.create(name)
+                .withKeys(keySerializer, keyDeserializer)
+                .withValues(valueSerializer, valueDeserializer)
+                .localDatabase()
+                .build();
 
-        KeyValueMapper<K, V, KeyValue<K1, V>> mapper = new KeyValueMapper<K, V, KeyValue<K1, V>>() {
-            @Override
-            public KeyValue<K1, V> apply(K key, V value) {
-                K1 aggKey = keySelector.apply(key, value);
+        // select the aggregate key and values (old and new), it would require parent to send old values
+        topology.addProcessor(selectName, selectSupplier, this.name);
+        this.enableSendingOldValues();
 
-                return new KeyValue<>(aggKey, value);
-            }
-        };
+        // send the aggregate key-value pairs to the intermediate topic for partitioning
+        topology.addInternalTopic(topic);
+        topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName);
 
-        return this.<K1, V, Long>aggregate(new CountSupplier<K1, V>(), mapper,
-                keySerializer, valueSerializer, longSerializer,
-                keyDeserializer, valueDeserializer, longDeserializer,
-                name);
+        // read the intermediate topic
+        topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
+
+        // aggregate the values with the aggregator and local store
+        topology.addProcessor(reduceName, aggregateSupplier, sourceName);
+        topology.addStateStore(aggregateStore, reduceName);
+
+        // return the KTable representation with the intermediate topic as the sources
+        return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName));
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
new file mode 100644
index 0000000..0d1b55a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
+
+    private final String storeName;
+    private final Reducer<V> addReducer;
+    private final Reducer<V> removeReducer;
+
+    private boolean sendOldValues = false;
+
+    public KTableReduce(String storeName, Reducer<V> addReducer, Reducer<V> removeReducer) {
+        this.storeName = storeName;
+        this.addReducer = addReducer;
+        this.removeReducer = removeReducer;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    @Override
+    public Processor<K, Change<V>> get() {
+        return new KTableAggregateProcessor();
+    }
+
+    private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
+
+        private KeyValueStore<K, V> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+
+            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+        }
+
+        @Override
+        public void process(K key, Change<V> value) {
+            V oldAgg = store.get(key);
+            V newAgg = oldAgg;
+
+            // first try to add the new new value
+            if (value.newValue != null) {
+                if (newAgg == null) {
+                    newAgg = value.newValue;
+                } else {
+                    newAgg = addReducer.apply(newAgg, value.newValue);
+                }
+            }
+
+            // then try to remove the old value
+            if (value.oldValue != null) {
+                newAgg = removeReducer.apply(newAgg, value.oldValue);
+            }
+
+            // update the store with the new value
+            store.put(key, newAgg);
+
+            // send the old / new pair
+            if (sendOldValues)
+                context().forward(key, new Change<>(newAgg, oldAgg));
+            else
+                context().forward(key, new Change<>(newAgg, null));
+        }
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V> view() {
+
+        return new KTableValueGetterSupplier<K, V>() {
+
+            public KTableValueGetter<K, V> get() {
+                return new KTableAggregateValueGetter();
+            }
+
+        };
+    }
+
+    private class KTableAggregateValueGetter implements KTableValueGetter<K, V> {
+
+        private KeyValueStore<K, V> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+        }
+
+        @Override
+        public V get(K key) {
+            return store.get(key);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
index d07fc5d..c993512 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
@@ -22,8 +22,8 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.MeteredKeyValueStore;
-import org.apache.kafka.streams.state.RocksDBStore;
+import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
 import org.apache.kafka.streams.state.Serdes;
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
deleted file mode 100644
index b66590e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
-
-public class LongSumSupplier<K> implements AggregatorSupplier<K, Long, Long> {
-
-    private class LongSum implements Aggregator<K, Long, Long> {
-
-        @Override
-        public Long initialValue() {
-            return 0L;
-        }
-
-        @Override
-        public Long add(K aggKey, Long value, Long aggregate) {
-            return aggregate + value;
-        }
-
-        @Override
-        public Long remove(K aggKey, Long value, Long aggregate) {
-            return aggregate - value;
-        }
-
-        @Override
-        public Long merge(Long aggr1, Long aggr2) {
-            return aggr1 + aggr2;
-        }
-    }
-
-    @Override
-    public Aggregator<K, Long, Long> get() {
-        return new LongSum();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java
deleted file mode 100644
index 00f4b55..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
-import org.apache.kafka.streams.kstream.Aggregator;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-
-/**
- * NOTE: This is just a demo aggregate supplier that can be implemented by users to add their own built-in aggregates.
- * It is highly in-efficient and is not supposed to be merged in.
- */
-public class TopKSupplier<K, V extends Comparable<V>> implements AggregatorSupplier<K, V, Collection<V>> {
-
-    private final int k;
-
-    public TopKSupplier(int k) {
-        this.k = k;
-    }
-
-    private class TopK implements Aggregator<K, V, Collection<V>> {
-
-        private final Map<K, PriorityQueue<V>> sorted = new HashMap<>();
-
-        @Override
-        public Collection<V> initialValue() {
-            return Collections.<V>emptySet();
-        }
-
-        @Override
-        public Collection<V> add(K aggKey, V value, Collection<V> aggregate) {
-            PriorityQueue<V> queue = sorted.get(aggKey);
-            if (queue == null) {
-                queue = new PriorityQueue<>();
-                sorted.put(aggKey, queue);
-            }
-
-            queue.add(value);
-
-            PriorityQueue<V> copy = new PriorityQueue<>(queue);
-
-            Set<V> ret = new HashSet<>();
-            for (int i = 1; i <= k; i++)
-                ret.add(copy.poll());
-
-            return ret;
-        }
-
-        @Override
-        public Collection<V> remove(K aggKey, V value, Collection<V> aggregate) {
-            PriorityQueue<V> queue = sorted.get(aggKey);
-
-            if (queue == null)
-                throw new IllegalStateException("This should not happen.");
-
-            queue.remove(value);
-
-            PriorityQueue<V> copy = new PriorityQueue<>(queue);
-
-            Set<V> ret = new HashSet<>();
-            for (int i = 1; i <= k; i++)
-                ret.add(copy.poll());
-
-            return ret;
-        }
-
-        @Override
-        public Collection<V> merge(Collection<V> aggr1, Collection<V> aggr2) {
-            PriorityQueue<V> copy = new PriorityQueue<>(aggr1);
-            copy.addAll(aggr2);
-
-            Set<V> ret = new HashSet<>();
-            for (int i = 1; i <= k; i++)
-                ret.add(copy.poll());
-
-            return ret;
-        }
-    }
-
-    @Override
-    public Aggregator<K, V, Collection<V>> get() {
-        return new TopK();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3cac3f1..547bb15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
deleted file mode 100644
index d1f845c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-/**
- * An in-memory key-value store based on a TreeMap.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
- */
-public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
-
-    private final String name;
-    private final Serdes serdes;
-    private final Time time;
-
-    protected InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
-        this.name = name;
-        this.serdes = serdes;
-        this.time = time;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    public StateStore get() {
-        return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
-    }
-
-    private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
-
-        private final String name;
-        private final NavigableMap<K, V> map;
-
-        public MemoryStore(String name) {
-            super();
-            this.name = name;
-            this.map = new TreeMap<>();
-        }
-
-        @Override
-        public String name() {
-            return this.name;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            return this.map.get(key);
-        }
-
-        @Override
-        public void put(K key, V value) {
-            this.map.put(key, value);
-        }
-
-        @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
-        }
-
-        @Override
-        public V delete(K key) {
-            return this.map.remove(key);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
-        }
-
-        @Override
-        public void flush() {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public void close() {
-            // do-nothing
-        }
-
-        private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
-            private final Iterator<Map.Entry<K, V>> iter;
-
-            public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
-                this.iter = iter;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return iter.hasNext();
-            }
-
-            @Override
-            public Entry<K, V> next() {
-                Map.Entry<K, V> entry = iter.next();
-                return new Entry<>(entry.getKey(), entry.getValue());
-            }
-
-            @Override
-            public void remove() {
-                iter.remove();
-            }
-
-            @Override
-            public void close() {
-            }
-
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
deleted file mode 100644
index a346534..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
-/**
- * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- */
-public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
-
-    private final String name;
-    private final int capacity;
-    private final Serdes serdes;
-    private final Time time;
-
-    protected InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
-        this.name = name;
-        this.capacity = capacity;
-        this.serdes = serdes;
-        this.time = time;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    public StateStore get() {
-        MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
-        final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
-        cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
-            @Override
-            public void apply(K key, V value) {
-                store.removed(key);
-            }
-        });
-        return store;
-    }
-
-    private static interface EldestEntryRemovalListener<K, V> {
-        public void apply(K key, V value);
-    }
-
-    protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
-
-        private final String name;
-        private final Map<K, V> map;
-        private final NavigableSet<K> keys;
-        private EldestEntryRemovalListener<K, V> listener;
-
-        public MemoryLRUCache(String name, final int maxCacheSize) {
-            this.name = name;
-            this.keys = new TreeSet<>();
-            // leave room for one extra entry to handle adding an entry before the oldest can be removed
-            this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
-                private static final long serialVersionUID = 1L;
-
-                @Override
-                protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-                    if (size() > maxCacheSize) {
-                        K key = eldest.getKey();
-                        keys.remove(key);
-                        if (listener != null) listener.apply(key, eldest.getValue());
-                        return true;
-                    }
-                    return false;
-                }
-            };
-        }
-
-        protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
-            this.listener = listener;
-        }
-
-        @Override
-        public String name() {
-            return this.name;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            return this.map.get(key);
-        }
-
-        @Override
-        public void put(K key, V value) {
-            this.map.put(key, value);
-            this.keys.add(key);
-        }
-
-        @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
-        }
-
-        @Override
-        public V delete(K key) {
-            V value = this.map.remove(key);
-            this.keys.remove(key);
-            return value;
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
-        }
-
-        @Override
-        public void flush() {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public void close() {
-            // do-nothing
-        }
-
-        private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
-            private final Iterator<K> keys;
-            private final Map<K, V> entries;
-            private K lastKey;
-
-            public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
-                this.keys = keys;
-                this.entries = entries;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return keys.hasNext();
-            }
-
-            @Override
-            public Entry<K, V> next() {
-                lastKey = keys.next();
-                return new Entry<>(lastKey, entries.get(lastKey));
-            }
-
-            @Override
-            public void remove() {
-                keys.remove();
-                entries.remove(lastKey);
-            }
-
-            @Override
-            public void close() {
-                // do nothing
-            }
-        }
-    }
-}