You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/17 20:52:54 UTC

kafka git commit: KAFKA-3825: Allow users to specify different types of state stores in Streams DSL

Repository: kafka
Updated Branches:
  refs/heads/trunk 2daa10d77 -> eaf0e4af3


KAFKA-3825: Allow users to specify different types of state stores in Streams DSL

Author: Jeyhun Karimov <je...@gmail.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #1588 from jeyhunkarimov/KAFKA-3825


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eaf0e4af
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eaf0e4af
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eaf0e4af

Branch: refs/heads/trunk
Commit: eaf0e4af341818b335d17400b0a02be87a7fff9b
Parents: 2daa10d
Author: Jeyhun Karimov <je...@gmail.com>
Authored: Thu Nov 17 12:52:50 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 17 12:52:50 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/KGroupedStream.java   |  88 +++++++++++
 .../kafka/streams/kstream/KGroupedTable.java    |  47 ++++++
 .../kstream/internals/AbstractStream.java       |  38 +++++
 .../kstream/internals/KGroupedStreamImpl.java   | 148 +++++++++++--------
 .../kstream/internals/KGroupedTableImpl.java    |  77 +++++-----
 .../kstream/internals/KStreamAggregate.java     |   5 +-
 .../kstream/internals/KStreamReduce.java        |   7 +-
 .../internals/KStreamWindowAggregate.java       |   9 +-
 .../kstream/internals/KStreamWindowReduce.java  |   7 +-
 .../kstream/internals/KTableAggregate.java      |   5 +-
 .../streams/kstream/internals/KTableReduce.java |   7 +-
 .../streams/kstream/internals/KTableSource.java |   7 +-
 .../kstream/internals/TupleForwarder.java       |  51 +++++++
 .../streams/processor/StateStoreSupplier.java   |  11 +-
 .../state/internals/AbstractStoreSupplier.java  |   3 +-
 .../InMemoryKeyValueStoreSupplier.java          |   4 +-
 .../InMemoryLRUCacheStoreSupplier.java          |  13 +-
 .../internals/RocksDBKeyValueStoreSupplier.java |  15 +-
 .../internals/RocksDBWindowStoreSupplier.java   |   6 +-
 .../internals/KGroupedStreamImplTest.java       |  25 +++-
 .../internals/KGroupedTableImplTest.java        |  19 ++-
 21 files changed, 442 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 16b55d9..f47c904 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -17,6 +17,9 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStore;
 
 /**
  * {@link KGroupedStream} is an abstraction of a <i>grouped record stream</i> of key-value pairs
@@ -49,6 +52,17 @@ public interface KGroupedStream<K, V> {
     KTable<K, V> reduce(Reducer<V> reducer,
                         final String storeName);
 
+    /**
+     * Combine values of this stream by the grouped key into a new instance of ever-updating
+     * {@link KTable}. The resulting {@link KTable} will be materialized in a state
+     * store provided by the {@link StateStoreSupplier}.
+     *
+     * @param reducer       the instance of {@link Reducer}
+     * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
+     */
+    KTable<K, V> reduce(final Reducer<V> reducer,
+                        final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
@@ -69,6 +83,23 @@ public interface KGroupedStream<K, V> {
                                                      final String storeName);
 
     /**
+     * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a state
+     * store provided by the {@link StateStoreSupplier}.
+     *
+     * @param reducer       the instance of {@link Reducer}
+     * @param windows       the specification of the aggregation {@link Windows}
+     * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) aggregate for each key within that window
+     */
+    <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
+                                                     Windows<W> windows,
+                                                     final StateStoreSupplier<WindowStore> storeSupplier);
+
+
+    /**
      * Aggregate values of this stream by key into a new instance of a {@link KTable}.
      * The resulting {@link KTable} will be materialized in a local state
      * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
@@ -90,6 +121,21 @@ public interface KGroupedStream<K, V> {
                                final String storeName);
 
     /**
+     * Aggregate values of this stream by key into a new instance of a {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a state
+     * store provided by the {@link StateStoreSupplier}.
+     *
+     * @param initializer   the instance of {@link Initializer}
+     * @param aggregator    the instance of {@link Aggregator}
+     * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+     * @param <T>           the value type of the resulting {@link KTable}
+     * @return a {@link KTable} that represents the latest (rolling) aggregate for each key
+     */
+    <T> KTable<K, T> aggregate(Initializer<T> initializer,
+                               Aggregator<K, V, T> aggregator,
+                               final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+    /**
      * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
      * The resulting {@link KTable} will be materialized in a local state
      * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
@@ -113,6 +159,24 @@ public interface KGroupedStream<K, V> {
                                                            Serde<T> aggValueSerde,
                                                            final String storeName);
 
+    /**
+     * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a state
+     * store provided by the {@link StateStoreSupplier}.
+     *
+     * @param initializer   the instance of {@link Initializer}
+     * @param aggregator    the instance of {@link Aggregator}
+     * @param windows       the specification of the aggregation {@link Windows}
+     * @param <T>           the value type of the resulting {@link KTable}
+     * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values with type {@code T}
+     *         that represent the latest (rolling) aggregate for each key within that window
+     */
+    <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer,
+                                                           Aggregator<K, V, T> aggregator,
+                                                           Windows<W> windows,
+                                                           final StateStoreSupplier<WindowStore> storeSupplier);
 
     /**
      * Count number of records of this stream by key into a new instance of a {@link KTable}.
@@ -127,6 +191,16 @@ public interface KGroupedStream<K, V> {
      */
     KTable<K, Long> count(final String storeName);
 
+    /**
+     * Count number of records of this stream by key into a new instance of a {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a state
+     * store provided by the {@link StateStoreSupplier}.
+     *
+     * @param storeSupplier  user defined state store supplier {@link StateStoreSupplier}
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
+     */
+    KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
@@ -143,4 +217,18 @@ public interface KGroupedStream<K, V> {
      */
     <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows, final String storeName);
 
+    /**
+     * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a state
+     * store provided by the {@link StateStoreSupplier}.
+     *
+     * @param windows       the specification of the aggregation {@link Windows}
+     * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
+     */
+    <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows,
+                                                       final StateStoreSupplier<WindowStore> storeSupplier);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 3ba4f22..c587538 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
 
 /**
  * {@link KGroupedTable} is an abstraction of a <i>grouped changelog stream</i> from a primary-keyed table,
@@ -51,6 +53,21 @@ public interface KGroupedTable<K, V> {
                         String storeName);
 
     /**
+     * Combine updating values of this stream by the selected key into a new instance of {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a state
+     * store provided by the {@link StateStoreSupplier}.
+     *
+     * @param adder         the instance of {@link Reducer} for addition
+     * @param subtractor    the instance of {@link Reducer} for subtraction
+     * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+     * @return a {@link KTable} with the same key and value types as this {@link KGroupedTable},
+     *         containing aggregated values for each key
+     */
+    KTable<K, V> reduce(Reducer<V> adder,
+                        Reducer<V> subtractor,
+                        final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+    /**
      * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}.
      * The resulting {@link KTable} will be materialized in a local state
      * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
@@ -95,6 +112,25 @@ public interface KGroupedTable<K, V> {
                                String storeName);
 
     /**
+     * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}
+     * using default serializers and deserializers.
+     * The resulting {@link KTable} will be materialized in a state
+     * store provided by the {@link StateStoreSupplier}.
+     *
+     * @param initializer   the instance of {@link Initializer}
+     * @param adder         the instance of {@link Aggregator} for addition
+     * @param subtractor    the instance of {@link Aggregator} for subtraction
+     * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+     * @param <T>           the value type of the aggregated {@link KTable}
+     * @return a {@link KTable} with same key and aggregated value type {@code T},
+     *         containing aggregated values for each key
+     */
+    <T> KTable<K, T> aggregate(Initializer<T> initializer,
+                               Aggregator<K, V, T> adder,
+                               Aggregator<K, V, T> subtractor,
+                               final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+    /**
      * Count number of records of this stream by the selected key into a new instance of {@link KTable}.
      * The resulting {@link KTable} will be materialized in a local state
      * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
@@ -107,4 +143,15 @@ public interface KGroupedTable<K, V> {
      */
     KTable<K, Long> count(String storeName);
 
+    /**
+     * Count number of records of this stream by the selected key into a new instance of {@link KTable}.
+     * The resulting {@link KTable} will be materialized in a state
+     * store provided by the {@link StateStoreSupplier}.
+     *
+     * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+     * @return a {@link KTable} with same key and {@link Long} value type as this {@link KGroupedTable},
+     * containing the number of values for each key
+     */
+    KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 2f5b160..31a3dc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -17,10 +17,18 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
 
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 
 public abstract class AbstractStream<K> {
@@ -64,4 +72,34 @@ public abstract class AbstractStream<K> {
         };
     }
 
+    @SuppressWarnings("unchecked")
+    public static <T, K>  StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
+                                                                  final Serde<T> aggValueSerde,
+                                                                  final String storeName) {
+        Objects.requireNonNull(storeName, "storeName can't be null");
+        return storeFactory(keySerde, aggValueSerde, storeName).build();
+    }
+
+    @SuppressWarnings("unchecked")
+    public static  <W extends Window, T, K> StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
+                                                                                     final Serde<T> aggValSerde,
+                                                                                     final Windows<W> windows,
+                                                                                     final String storeName) {
+        Objects.requireNonNull(storeName, "storeName can't be null");
+        return storeFactory(keySerde, aggValSerde, storeName)
+                .windowed(windows.size(), windows.maintainMs(), windows.segments, false)
+                .build();
+    }
+    @SuppressWarnings("unchecked")
+    public static  <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<K> keySerde,
+                                                                         final Serde<T> aggValueSerde,
+                                                                         final String storeName) {
+        return Stores.create(storeName)
+                .withKeys(keySerde)
+                .withValues(aggValueSerde)
+                .persistent()
+                .enableCaching();
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 9bc66e8..e50b6dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -16,17 +16,18 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStore;
 
 import java.util.Collections;
 import java.util.Objects;
@@ -56,27 +57,41 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
     @Override
     public KTable<K, V> reduce(final Reducer<V> reducer,
                                final String storeName) {
+        return reduce(reducer, keyValueStore(keySerde, valSerde, storeName));
+    }
+
+    @Override
+    public KTable<K, V> reduce(final Reducer<V> reducer,
+                               final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(reducer, "reducer can't be null");
-        Objects.requireNonNull(storeName, "storeName can't be null");
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doAggregate(
-            new KStreamReduce<K, V>(storeName, reducer),
-            REDUCE_NAME,
-            keyValueStore(valSerde, storeName));
+                new KStreamReduce<K, V>(storeSupplier.name(), reducer),
+                REDUCE_NAME,
+                storeSupplier);
     }
 
 
     @SuppressWarnings("unchecked")
     @Override
-    public <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
-                                                            Windows<W> windows,
+    public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                                            final Windows<W> windows,
                                                             final String storeName) {
+        return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, storeName));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                                            final Windows<W> windows,
+                                                            final StateStoreSupplier<WindowStore> storeSupplier) {
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(windows, "windows can't be null");
-        Objects.requireNonNull(storeName, "storeName can't be null");
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return (KTable<Windowed<K>, V>) doAggregate(
-            new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
-            REDUCE_NAME,
-            windowedStore(valSerde, windows, storeName)
+                new KStreamWindowReduce<K, V, W>(windows, storeSupplier.name(), reducer),
+                REDUCE_NAME,
+                storeSupplier
         );
     }
 
@@ -85,13 +100,20 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
                                       final Aggregator<K, V, T> aggregator,
                                       final Serde<T> aggValueSerde,
                                       final String storeName) {
+        return aggregate(initializer, aggregator, keyValueStore(keySerde, aggValueSerde, storeName));
+    }
+
+    @Override
+    public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+                                      final Aggregator<K, V, T> aggregator,
+                                      final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
-        Objects.requireNonNull(storeName, "storeName can't be null");
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doAggregate(
-            new KStreamAggregate<>(storeName, initializer, aggregator),
-            AGGREGATE_NAME,
-            keyValueStore(aggValueSerde, storeName));
+                new KStreamAggregate<>(storeSupplier.name(), initializer, aggregator),
+                AGGREGATE_NAME,
+                storeSupplier);
     }
 
     @SuppressWarnings("unchecked")
@@ -101,34 +123,33 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
                                                                   final Windows<W> windows,
                                                                   final Serde<T> aggValueSerde,
                                                                   final String storeName) {
+        return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, storeName));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                                                  final Aggregator<K, V, T> aggregator,
+                                                                  final Windows<W> windows,
+                                                                  final StateStoreSupplier<WindowStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(windows, "windows can't be null");
-        Objects.requireNonNull(storeName, "storeName can't be null");
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return (KTable<Windowed<K>, T>) doAggregate(
-            new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
-            AGGREGATE_NAME,
-            windowedStore(aggValueSerde, windows, storeName)
+                new KStreamWindowAggregate<>(windows, storeSupplier.name(), initializer, aggregator),
+                AGGREGATE_NAME,
+                storeSupplier
         );
     }
 
     @Override
     public KTable<K, Long> count(final String storeName) {
-        return aggregate(new Initializer<Long>() {
-            @Override
-            public Long apply() {
-                return 0L;
-            }
-        }, new Aggregator<K, V, Long>() {
-            @Override
-            public Long apply(K aggKey, V value, Long aggregate) {
-                return aggregate + 1;
-            }
-        }, Serdes.Long(), storeName);
+        return count(keyValueStore(keySerde, Serdes.Long(), storeName));
     }
 
     @Override
-    public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows, final String storeName) {
+    public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
         return aggregate(new Initializer<Long>() {
             @Override
             public Long apply() {
@@ -139,36 +160,39 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
             public Long apply(K aggKey, V value, Long aggregate) {
                 return aggregate + 1;
             }
-        }, windows, Serdes.Long(), storeName);
+        }, storeSupplier);
     }
 
-    private <T> StateStoreSupplier keyValueStore(final Serde<T> aggValueSerde, final String name) {
-        return storeFactory(aggValueSerde, name).build();
+    @Override
+    public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
+                                                              final String storeName) {
+        return count(windows, windowedStore(keySerde, Serdes.Long(), windows, storeName));
     }
 
-
-    private <W extends Window, T> StateStoreSupplier windowedStore(final Serde<T> aggValSerde,
-                                                                   final Windows<W> windows,
-                                                                   final String storeName) {
-        return storeFactory(aggValSerde, storeName)
-                .windowed(windows.size(), windows.maintainMs(), windows.segments, false)
-                .build();
-
+    @Override
+    public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
+                                                              final StateStoreSupplier<WindowStore> storeSupplier) {
+        return aggregate(
+                new Initializer<Long>() {
+                    @Override
+                    public Long apply() {
+                        return 0L;
+                    }
+                }, new Aggregator<K, V, Long>() {
+                    @Override
+                    public Long apply(K aggKey, V value, Long aggregate) {
+                        return aggregate + 1;
+                    }
+                },
+                windows,
+                storeSupplier);
     }
 
-    private <T> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<T> aggValueSerde,
-                                                                    final String storeName) {
-        return Stores.create(storeName)
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .enableCaching();
-    }
 
     private <T> KTable<K, T> doAggregate(
-        final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
-        final String functionName,
-        final StateStoreSupplier storeSupplier) {
+            final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
+            final String functionName,
+            final StateStoreSupplier storeSupplier) {
 
         final String aggFunctionName = topology.newName(functionName);
 
@@ -178,11 +202,11 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
         topology.addStateStore(storeSupplier, aggFunctionName);
 
         return new KTableImpl<>(topology,
-                                aggFunctionName,
-                                aggregateSupplier,
-                                sourceName.equals(this.name) ? sourceNodes
-                                                             : Collections.singleton(sourceName),
-                                storeSupplier.name());
+                aggFunctionName,
+                aggregateSupplier,
+                sourceName.equals(this.name) ? sourceNodes
+                        : Collections.singleton(sourceName),
+                storeSupplier.name());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 7aa2531..4ca69d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -21,22 +21,22 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
-import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Collections;
 import java.util.Objects;
 
 /**
  * The implementation class of {@link KGroupedTable}.
- * 
+ *
  * @param <K> the key type
  * @param <V> the value type
  */
@@ -65,33 +65,39 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
                                       Aggregator<K, V, T> subtractor,
                                       Serde<T> aggValueSerde,
                                       String storeName) {
-
-        Objects.requireNonNull(initializer, "initializer can't be null");
-        Objects.requireNonNull(adder, "adder can't be null");
-        Objects.requireNonNull(subtractor, "subtractor can't be null");
-        Objects.requireNonNull(storeName, "storeName can't be null");
-        ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(storeName, initializer, adder, subtractor);
-        return doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, storeName);
+        return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, storeName));
     }
 
+
     @Override
     public <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                            Aggregator<K, V, T> adder,
-                            Aggregator<K, V, T> subtractor,
-                            String storeName) {
-
+                                      Aggregator<K, V, T> adder,
+                                      Aggregator<K, V, T> subtractor,
+                                      String storeName) {
         return aggregate(initializer, adder, subtractor, null, storeName);
     }
 
+    @Override
+    public <T> KTable<K, T> aggregate(Initializer<T> initializer,
+                                      Aggregator<K, V, T> adder,
+                                      Aggregator<K, V, T> subtractor,
+                                      StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(adder, "adder can't be null");
+        Objects.requireNonNull(subtractor, "subtractor can't be null");
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(storeSupplier.name(), initializer, adder, subtractor);
+        return doAggregate(aggregateSupplier, AGGREGATE_NAME, storeSupplier);
+    }
+
     private <T> KTable<K, T> doAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier,
-                                         Serde<T> aggValueSerde,
                                          String functionName,
-                                         String storeName) {
+                                         StateStoreSupplier<KeyValueStore> storeSupplier) {
         String sinkName = topology.newName(KStreamImpl.SINK_NAME);
         String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
         String funcName = topology.newName(functionName);
 
-        String topic = storeName + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
+        String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
         Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
         Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
@@ -101,13 +107,6 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
         ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
 
-        StateStoreSupplier aggregateStore = Stores.create(storeName)
-            .withKeys(keySerde)
-            .withValues(aggValueSerde)
-            .persistent()
-            .enableCaching()
-            .build();
-
         // send the aggregate key-value pairs to the intermediate topic for partitioning
         topology.addInternalTopic(topic);
         topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
@@ -117,25 +116,37 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
         // aggregate the values with the aggregator and local store
         topology.addProcessor(funcName, aggregateSupplier, sourceName);
-        topology.addStateStore(aggregateStore, funcName);
+        topology.addStateStore(storeSupplier, funcName);
 
         // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeName);
+        return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name());
     }
 
     @Override
     public KTable<K, V> reduce(Reducer<V> adder,
                                Reducer<V> subtractor,
                                String storeName) {
+        return reduce(adder, subtractor, keyValueStore(keySerde, valSerde, storeName));
+    }
+
+    @Override
+    public KTable<K, V> reduce(Reducer<V> adder,
+                               Reducer<V> subtractor,
+                               StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(adder, "adder can't be null");
         Objects.requireNonNull(subtractor, "subtractor can't be null");
-        Objects.requireNonNull(storeName, "storeName can't be null");
-        ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(storeName, adder, subtractor);
-        return doAggregate(aggregateSupplier, valSerde, REDUCE_NAME, storeName);
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(storeSupplier.name(), adder, subtractor);
+        return doAggregate(aggregateSupplier, REDUCE_NAME, storeSupplier);
     }
 
     @Override
     public KTable<K, Long> count(String storeName) {
+        return count(keyValueStore(keySerde, Serdes.Long(), storeName));
+    }
+
+    @Override
+    public KTable<K, Long> count(StateStoreSupplier<KeyValueStore> storeSupplier) {
         return this.aggregate(
                 new Initializer<Long>() {
                     @Override
@@ -154,7 +165,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
                         return aggregate - 1L;
                     }
                 },
-                Serdes.Long(), storeName);
+                storeSupplier);
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index d596d5e..5bbda1f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> {
 
@@ -53,13 +52,14 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
     private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
 
         private KeyValueStore<K, T> store;
+        private TupleForwarder<K, T> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-            ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
         }
 
 
@@ -82,6 +82,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
 
             // update the store with the new value
             store.put(key, newAgg);
+            tupleForwarder.maybeForward(key, newAgg, oldAgg, sendOldValues);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 1408169..9af4368 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> {
 
@@ -49,6 +48,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
     private class KStreamReduceProcessor extends AbstractProcessor<K, V> {
 
         private KeyValueStore<K, V> store;
+        private TupleForwarder<K, V> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
@@ -56,7 +56,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
             super.init(context);
 
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
         }
 
 
@@ -77,10 +77,9 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
                     newAgg = reducer.apply(newAgg, value);
                 }
             }
-
             // update the store with the new value
             store.put(key, newAgg);
-
+            tupleForwarder.maybeForward(key, newAgg, oldAgg, sendOldValues);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 55b0916..d74a399 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -20,15 +20,14 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 import java.util.Map;
 
@@ -61,6 +60,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
     private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {
 
         private WindowStore<K, T> windowStore;
+        private TupleForwarder<Windowed<K>, T> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
@@ -68,7 +68,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
             super.init(context);
 
             windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
-            ((CachedStateStore) windowStore).setFlushListener(new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues));
         }
 
         @Override
@@ -110,7 +110,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
 
                         // update the store with the new value
                         windowStore.put(key, newAgg, window.start());
-
+                        tupleForwarder.maybeForward(new Windowed<>(key, window), newAgg, oldAgg, sendOldValues);
                         matchedWindows.remove(entry.key);
                     }
                 }
@@ -121,6 +121,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
                 T oldAgg = initializer.apply();
                 T newAgg = aggregator.apply(key, value, oldAgg);
                 windowStore.put(key, newAgg, windowStartMs);
+                tupleForwarder.maybeForward(new Windowed<>(key, matchedWindows.get(windowStartMs)), newAgg, oldAgg, sendOldValues);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 0b93468..5ee02e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 import java.util.Map;
 
@@ -58,13 +57,14 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
     private class KStreamWindowReduceProcessor extends AbstractProcessor<K, V> {
 
         private WindowStore<K, V> windowStore;
+        private TupleForwarder<Windowed<K>, V> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);
             windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
-            ((CachedStateStore) windowStore).setFlushListener(new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues));
         }
 
         @Override
@@ -108,7 +108,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
 
                         // update the store with the new value
                         windowStore.put(key, newAgg, window.start());
-
+                        tupleForwarder.maybeForward(new Windowed<>(key, window), newAgg, oldAgg, sendOldValues);
                         matchedWindows.remove(entry.key);
                     }
                 }
@@ -117,6 +117,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
             // create the new window for the rest of unmatched window that do not exist yet
             for (long windowStartMs : matchedWindows.keySet()) {
                 windowStore.put(key, value, windowStartMs);
+                tupleForwarder.maybeForward(new Windowed<>(key, matchedWindows.get(windowStartMs)), value, null, false);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 2ef4709..fd04fb3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {
 
@@ -55,13 +54,14 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
     private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
 
         private KeyValueStore<K, T> store;
+        private TupleForwarder<K, T> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-            ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
         }
 
         /**
@@ -92,6 +92,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
 
             // update the store with the new value
             store.put(key, newAgg);
+            tupleForwarder.maybeForward(key, newAgg, oldAgg, sendOldValues);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 8c2e5f9..7b29d1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
 
@@ -52,14 +51,14 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
     private class KTableReduceProcessor extends AbstractProcessor<K, Change<V>> {
 
         private KeyValueStore<K, V> store;
+        private TupleForwarder<K, V> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);
-
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<K, V>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
         }
 
         /**
@@ -90,7 +89,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
 
             // update the store with the new value
             store.put(key, newAgg);
-
+            tupleForwarder.maybeForward(key, newAgg, oldAgg, sendOldValues);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 20a80f4..7b777d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
 
 public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
 
@@ -47,13 +46,14 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
     private class KTableSourceProcessor extends AbstractProcessor<K, V> {
 
         private KeyValueStore<K, V> store;
+        private TupleForwarder<K, V> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
         }
 
         @Override
@@ -61,8 +61,9 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
             // the keys should never be null
             if (key == null)
                 throw new StreamsException("Record key for the source KTable from store name " + storeName + " should not be null.");
-
+            V oldValue = store.get(key);
             store.put(key, value);
+            tupleForwarder.maybeForward(key, value, oldValue, sendOldValues);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
new file mode 100644
index 0000000..02609d7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
+
+
+class TupleForwarder<K, V> {
+    private final boolean cached;
+    private final ProcessorContext context;
+    @SuppressWarnings("unchecked")
+    public TupleForwarder(final StateStore store,
+                          final ProcessorContext context,
+                          final ForwardingCacheFlushListener flushListener) {
+        this.cached = store instanceof CachedStateStore;
+        this.context = context;
+        if (this.cached) {
+            ((CachedStateStore) store).setFlushListener(flushListener);
+        }
+    }
+
+    public void maybeForward(final K key,
+                             final V newValue,
+                             final V oldValue,
+                             final boolean sendOldValues) {
+        if (!cached) {
+            if (sendOldValues) {
+                context.forward(key, new Change<>(newValue, oldValue));
+            } else {
+                context.forward(key, new Change<>(newValue, null));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
index d3b0a1b..e4051ca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -21,8 +21,10 @@ import java.util.Map;
 
 /**
  * A state store supplier which can create one or more {@link StateStore} instances.
+ *
+ * @param <T> State store type
  */
-public interface StateStoreSupplier {
+public interface StateStoreSupplier<T extends StateStore> {
 
     /**
      * Return the name of this state store supplier.
@@ -34,14 +36,15 @@ public interface StateStoreSupplier {
     /**
      * Return a new {@link StateStore} instance.
      *
-     * @return  a new {@link StateStore} instance
+     * @return a new {@link StateStore} instance of type T
      */
-    StateStore get();
+    T get();
 
     /**
      * Returns a Map containing any log configs that will be used when creating the changelog for the {@link StateStore}
-     *
+     * <p>
      * Note: any unrecognized configs will be ignored by the Kafka brokers.
+     *
      * @return Map containing any log configs to be used when creating the changelog for the {@link StateStore}
      * If {@code loggingEnabled} returns false, this function will always return an empty map
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
index 39a33a0..3ad44ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
@@ -18,12 +18,13 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 import java.util.Map;
 
 
-public abstract class AbstractStoreSupplier<K, V> implements StateStoreSupplier {
+public abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements StateStoreSupplier<T> {
     protected final String name;
     protected final Serde<K> keySerde;
     protected final Serde<V> valueSerde;

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index c05ebb2..d09630d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -45,7 +45,7 @@ import java.util.TreeMap;
  *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
-public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V> {
+public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
 
 
     public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
@@ -56,7 +56,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K
         super(name, keySerde, valueSerde, time, logged, logConfig);
     }
 
-    public StateStore get() {
+    public KeyValueStore get() {
         MemoryStore<K, V> store = new MemoryStore<>(name, keySerde, valueSerde);
 
         return new MeteredKeyValueStore<>(logged ? store.enableLogging() : store, "in-memory-state", time);

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index 45bcca3..c2b56fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -1,12 +1,12 @@
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,11 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Map;
 
@@ -29,7 +30,7 @@ import java.util.Map;
  * @param <V> The value type
  *
  */
-public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K, V> {
+public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
 
     private final int capacity;
 
@@ -42,7 +43,7 @@ public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K
         this.capacity = capacity;
     }
 
-    public StateStore get() {
+    public KeyValueStore get() {
         MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<>(name, capacity, keySerde, valueSerde);
         return new MeteredKeyValueStore<>(logged ? cache.enableLogging() : cache, "in-memory-lru-state", time);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index 68a4429..164b352 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Map;
 
@@ -30,11 +30,10 @@ import java.util.Map;
  *
  * @param <K> the type of keys
  * @param <V> the type of values
- *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
 
-public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V>  {
+public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
 
     private final boolean enableCaching;
 
@@ -47,7 +46,7 @@ public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K,
         this.enableCaching = enableCaching;
     }
 
-    public StateStore get() {
+    public KeyValueStore get() {
         if (!enableCaching) {
             RocksDBStore<K, V> store = new RocksDBStore<>(name, keySerde, valueSerde);
             return new MeteredKeyValueStore<>(logged ? store.enableLogging() : store, "rocksdb-state", time);
@@ -55,9 +54,9 @@ public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K,
 
         final RocksDBStore<Bytes, byte[]> store = new RocksDBStore<>(name, Serdes.Bytes(), Serdes.ByteArray());
         return new CachingKeyValueStore<>(new MeteredKeyValueStore<>(logged ? store.enableLogging() : store,
-                                                                     "rocksdb-state",
-                                                                     time),
-                                          keySerde,
-                                          valueSerde);
+                "rocksdb-state",
+                time),
+                keySerde,
+                valueSerde);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index eb16bba..49d8882 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.WindowStore;
 
 import java.util.Map;
 
@@ -34,7 +34,7 @@ import java.util.Map;
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
 
-public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V> {
+public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> {
 
     private final long retentionPeriod;
     private final boolean retainDuplicates;
@@ -59,7 +59,7 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
         return name;
     }
 
-    public StateStore get() {
+    public WindowStore get() {
         if (!enableCaching) {
             final RocksDBWindowStore<K, V> rocksDbStore = new RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde);
             return new MeteredWindowStore<>(logged ? rocksDbStore.enableLogging() : rocksDbStore, "rocksdb-window", time);

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index a95d1fb..292f229 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockReducer;
@@ -45,7 +46,14 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreNameOnReduce() throws Exception {
-        groupedStream.reduce(MockReducer.STRING_ADDER, null);
+        String storeName = null;
+        groupedStream.reduce(MockReducer.STRING_ADDER, storeName);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullStoreSupplierOnReduce() throws Exception {
+        StateStoreSupplier storeSupplier = null;
+        groupedStream.reduce(MockReducer.STRING_ADDER, storeSupplier);
     }
 
     @Test(expected = NullPointerException.class)
@@ -60,7 +68,8 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreNameWithWindowedReduce() throws Exception {
-        groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), null);
+        String storeName = null;
+        groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), storeName);
     }
 
     @Test(expected = NullPointerException.class)
@@ -75,7 +84,8 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreNameOnAggregate() throws Exception {
-        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, Serdes.String(), null);
+        String storeName = null;
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, Serdes.String(), storeName);
     }
 
     @Test(expected = NullPointerException.class)
@@ -95,6 +105,13 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreNameOnWindowedAggregate() throws Exception {
-        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, TimeWindows.of(10), Serdes.String(), null);
+        String storeName = null;
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, TimeWindows.of(10), Serdes.String(), storeName);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() throws Exception {
+        StateStoreSupplier storeSupplier = null;
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, TimeWindows.of(10), storeSupplier);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 9b3a90b..85e2073 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
@@ -50,7 +51,8 @@ public class KGroupedTableImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullStoreNameOnAggregate() throws Exception {
-        groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, null);
+        String storeName = null;
+        groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, storeName);
     }
 
     @Test(expected = NullPointerException.class)
@@ -80,7 +82,14 @@ public class KGroupedTableImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullStoreNameOnReduce() throws Exception {
-        groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, null);
+        String storeName = null;
+        groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, storeName);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullStoreSupplierOnReduce() throws Exception {
+        StateStoreSupplier storeName = null;
+        groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, storeName);
     }
 
     @Test