You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/03 23:15:59 UTC

[2/5] kafka git commit: KAFKA-5045: Clarify on KTable APIs for queryable stores

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index cc6a126..6ed3e84 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -47,6 +46,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     private final Serde<K> keySerde;
     private final Serde<V> valSerde;
     private final boolean repartitionRequired;
+    private boolean isQueryable = true;
 
     KGroupedStreamImpl(final KStreamBuilder topology,
                        final String name,
@@ -58,12 +58,26 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         this.keySerde = keySerde;
         this.valSerde = valSerde;
         this.repartitionRequired = repartitionRequired;
+        this.isQueryable = true;
+    }
+
+    private void determineIsQueryable(final String queryableStoreName) {
+        if (queryableStoreName == null) {
+            isQueryable = false;
+        } // no need for else {} since isQueryable is true by default
     }
 
     @Override
     public KTable<K, V> reduce(final Reducer<V> reducer,
-                               final String storeName) {
-        return reduce(reducer, keyValueStore(keySerde, valSerde, storeName));
+                               final String queryableStoreName) {
+        determineIsQueryable(queryableStoreName);
+        return reduce(reducer, keyValueStore(keySerde, valSerde, getOrCreateName(queryableStoreName, REDUCE_NAME)));
+    }
+
+    @Override
+    public KTable<K, V> reduce(final Reducer<V> reducer) {
+        determineIsQueryable(null);
+        return reduce(reducer, (String) null);
     }
 
     @Override
@@ -82,8 +96,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                             final Windows<W> windows,
-                                                            final String storeName) {
-        return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, storeName));
+                                                            final String queryableStoreName) {
+        determineIsQueryable(queryableStoreName);
+        return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, getOrCreateName(queryableStoreName, REDUCE_NAME)));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                                            final Windows<W> windows) {
+        return reduce(reducer, windows, (String) null);
     }
 
     @SuppressWarnings("unchecked")
@@ -105,8 +127,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> aggregator,
                                       final Serde<T> aggValueSerde,
-                                      final String storeName) {
-        return aggregate(initializer, aggregator, keyValueStore(keySerde, aggValueSerde, storeName));
+                                      final String queryableStoreName) {
+        determineIsQueryable(queryableStoreName);
+        return aggregate(initializer, aggregator, keyValueStore(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
+    }
+
+    @Override
+    public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+                                      final Aggregator<? super K, ? super V, T> aggregator,
+                                      final Serde<T> aggValueSerde) {
+        return aggregate(initializer, aggregator, aggValueSerde, null);
     }
 
     @Override
@@ -128,8 +158,18 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                                                   final Aggregator<? super K, ? super V, T> aggregator,
                                                                   final Windows<W> windows,
                                                                   final Serde<T> aggValueSerde,
-                                                                  final String storeName) {
-        return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, storeName));
+                                                                  final String queryableStoreName) {
+        determineIsQueryable(queryableStoreName);
+        return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                                                  final Aggregator<? super K, ? super V, T> aggregator,
+                                                                  final Windows<W> windows,
+                                                                  final Serde<T> aggValueSerde) {
+        return aggregate(initializer, aggregator, windows, aggValueSerde, null);
     }
 
     @SuppressWarnings("unchecked")
@@ -150,8 +190,14 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     }
 
     @Override
-    public KTable<K, Long> count(final String storeName) {
-        return count(keyValueStore(keySerde, Serdes.Long(), storeName));
+    public KTable<K, Long> count(final String queryableStoreName) {
+        determineIsQueryable(queryableStoreName);
+        return count(keyValueStore(keySerde, Serdes.Long(), getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
+    }
+
+    @Override
+    public KTable<K, Long> count() {
+        return count((String) null);
     }
 
     @Override
@@ -171,8 +217,14 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     @Override
     public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
-                                                              final String storeName) {
-        return count(windows, windowedStore(keySerde, Serdes.Long(), windows, storeName));
+                                                              final String queryableStoreName) {
+        determineIsQueryable(queryableStoreName);
+        return count(windows, windowedStore(keySerde, Serdes.Long(), windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
+    }
+
+    @Override
+    public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows) {
+        return count(windows, (String) null);
     }
 
     @Override
@@ -201,15 +253,14 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                                 final Merger<? super K, T> sessionMerger,
                                                 final SessionWindows sessionWindows,
                                                 final Serde<T> aggValueSerde,
-                                                final String storeName) {
-        Objects.requireNonNull(storeName, "storeName can't be null");
-        Topic.validate(storeName);
+                                                final String queryableStoreName) {
+        determineIsQueryable(queryableStoreName);
         return aggregate(initializer,
                          aggregator,
                          sessionMerger,
                          sessionWindows,
                          aggValueSerde,
-                         storeFactory(keySerde, aggValueSerde, storeName)
+                         storeFactory(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME))
                           .sessionWindowed(sessionWindows.maintainMs()).build());
 
 
@@ -221,6 +272,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                                 final Aggregator<? super K, ? super V, T> aggregator,
                                                 final Merger<? super K, T> sessionMerger,
                                                 final SessionWindows sessionWindows,
+                                                final Serde<T> aggValueSerde) {
+        return aggregate(initializer, aggregator, sessionMerger, sessionWindows, aggValueSerde, (String) null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                                final Aggregator<? super K, ? super V, T> aggregator,
+                                                final Merger<? super K, T> sessionMerger,
+                                                final SessionWindows sessionWindows,
                                                 final Serde<T> aggValueSerde,
                                                 final StateStoreSupplier<SessionStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
@@ -237,14 +298,17 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     }
 
     @SuppressWarnings("unchecked")
-    public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName) {
-        Objects.requireNonNull(storeName, "storeName can't be null");
-        Topic.validate(storeName);
+    public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName) {
+        determineIsQueryable(queryableStoreName);
         return count(sessionWindows,
-                     storeFactory(keySerde, Serdes.Long(), storeName)
+                     storeFactory(keySerde, Serdes.Long(), getOrCreateName(queryableStoreName, AGGREGATE_NAME))
                              .sessionWindowed(sessionWindows.maintainMs()).build());
     }
 
+    public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows) {
+        return count(sessionWindows, (String) null);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
@@ -278,15 +342,22 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                          final SessionWindows sessionWindows,
-                                         final String storeName) {
+                                         final String queryableStoreName) {
+        determineIsQueryable(queryableStoreName);
 
-        Objects.requireNonNull(storeName, "storeName can't be null");
-        Topic.validate(storeName);
         return reduce(reducer, sessionWindows,
-                      storeFactory(keySerde, valSerde, storeName)
+                      storeFactory(keySerde, valSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME))
                               .sessionWindowed(sessionWindows.maintainMs()).build());
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                         final SessionWindows sessionWindows) {
+
+        return reduce(reducer, sessionWindows, (String) null);
+    }
+
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                          final SessionWindows sessionWindows,
@@ -339,16 +410,17 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                 aggregateSupplier,
                 sourceName.equals(this.name) ? sourceNodes
                         : Collections.singleton(sourceName),
-                storeSupplier.name());
+                storeSupplier.name(),
+                isQueryable);
     }
 
     /**
      * @return the new sourceName if repartitioned. Otherwise the name of this stream
      */
-    private String repartitionIfRequired(final String storeName) {
+    private String repartitionIfRequired(final String queryableStoreName) {
         if (!repartitionRequired) {
             return this.name;
         }
-        return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, storeName);
+        return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 08a4c5d..7e62727 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -48,41 +47,64 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
     protected final Serde<? extends K> keySerde;
     protected final Serde<? extends V> valSerde;
+    private boolean isQueryable = true;
 
-    public KGroupedTableImpl(KStreamBuilder topology,
-                             String name,
-                             String sourceName,
-                             Serde<? extends K> keySerde,
-                             Serde<? extends V> valSerde) {
+    public KGroupedTableImpl(final KStreamBuilder topology,
+                             final String name,
+                             final String sourceName,
+                             final Serde<? extends K> keySerde,
+                             final Serde<? extends V> valSerde) {
         super(topology, name, Collections.singleton(sourceName));
         this.keySerde = keySerde;
         this.valSerde = valSerde;
+        this.isQueryable = true;
+    }
+
+    private void determineIsQueryable(final String queryableStoreName) {
+        if (queryableStoreName == null) {
+            isQueryable = false;
+        } // no need for else {} since isQueryable is true by default
+    }
+
+    @Override
+    public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+                                      final Aggregator<? super K, ? super V, T> adder,
+                                      final Aggregator<? super K, ? super V, T> subtractor,
+                                      final Serde<T> aggValueSerde,
+                                      final String queryableStoreName) {
+        determineIsQueryable(queryableStoreName);
+        return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
+    }
+
+    @Override
+    public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+                                      final Aggregator<? super K, ? super V, T> adder,
+                                      final Aggregator<? super K, ? super V, T> subtractor,
+                                      final Serde<T> aggValueSerde) {
+        return aggregate(initializer, adder, subtractor, aggValueSerde, (String) null);
     }
 
     @Override
-    public <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                                      Aggregator<? super K, ? super V, T> adder,
-                                      Aggregator<? super K, ? super V, T> subtractor,
-                                      Serde<T> aggValueSerde,
-                                      String storeName) {
-        return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, storeName));
+    public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+                                      final Aggregator<? super K, ? super V, T> adder,
+                                      final Aggregator<? super K, ? super V, T> subtractor,
+                                      final String queryableStoreName) {
+        determineIsQueryable(queryableStoreName);
+        return aggregate(initializer, adder, subtractor, null, getOrCreateName(queryableStoreName, AGGREGATE_NAME));
     }
 
     @Override
-    public <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                                      Aggregator<? super K, ? super V, T> adder,
-                                      Aggregator<? super K, ? super V, T> subtractor,
-                                      String storeName) {
-        Objects.requireNonNull(storeName, "storeName can't be null");
-        Topic.validate(storeName);
-        return aggregate(initializer, adder, subtractor, null, storeName);
+    public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+                                      final Aggregator<? super K, ? super V, T> adder,
+                                      final Aggregator<? super K, ? super V, T> subtractor) {
+        return aggregate(initializer, adder, subtractor, (String) null);
     }
 
     @Override
-    public <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                                      Aggregator<? super K, ? super V, T> adder,
-                                      Aggregator<? super K, ? super V, T> subtractor,
-                                      StateStoreSupplier<KeyValueStore> storeSupplier) {
+    public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+                                      final Aggregator<? super K, ? super V, T> adder,
+                                      final Aggregator<? super K, ? super V, T> subtractor,
+                                      final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(adder, "adder can't be null");
         Objects.requireNonNull(subtractor, "subtractor can't be null");
@@ -91,9 +113,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return doAggregate(aggregateSupplier, AGGREGATE_NAME, storeSupplier);
     }
 
-    private <T> KTable<K, T> doAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier,
-                                         String functionName,
-                                         StateStoreSupplier<KeyValueStore> storeSupplier) {
+    private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
+                                         final String functionName,
+                                         final StateStoreSupplier<KeyValueStore> storeSupplier) {
         String sinkName = topology.newName(KStreamImpl.SINK_NAME);
         String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
         String funcName = topology.newName(functionName);
@@ -120,22 +142,27 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         topology.addStateStore(storeSupplier, funcName);
 
         // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name());
+        return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
+    }
+
+    @Override
+    public KTable<K, V> reduce(final Reducer<V> adder,
+                               final Reducer<V> subtractor,
+                               final String queryableStoreName) {
+        determineIsQueryable(queryableStoreName);
+        return reduce(adder, subtractor, keyValueStore(keySerde, valSerde, getOrCreateName(queryableStoreName, REDUCE_NAME)));
     }
 
     @Override
-    public KTable<K, V> reduce(Reducer<V> adder,
-                               Reducer<V> subtractor,
-                               String storeName) {
-        Objects.requireNonNull(storeName, "storeName can't be null");
-        Topic.validate(storeName);
-        return reduce(adder, subtractor, keyValueStore(keySerde, valSerde, storeName));
+    public KTable<K, V> reduce(final Reducer<V> adder,
+                               final Reducer<V> subtractor) {
+        return reduce(adder, subtractor, (String) null);
     }
 
     @Override
-    public KTable<K, V> reduce(Reducer<V> adder,
-                               Reducer<V> subtractor,
-                               StateStoreSupplier<KeyValueStore> storeSupplier) {
+    public KTable<K, V> reduce(final Reducer<V> adder,
+                               final Reducer<V> subtractor,
+                               final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(adder, "adder can't be null");
         Objects.requireNonNull(subtractor, "subtractor can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -144,14 +171,18 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     }
 
     @Override
-    public KTable<K, Long> count(String storeName) {
-        Objects.requireNonNull(storeName, "storeName can't be null");
-        Topic.validate(storeName);
-        return count(keyValueStore(keySerde, Serdes.Long(), storeName));
+    public KTable<K, Long> count(final String queryableStoreName) {
+        determineIsQueryable(queryableStoreName);
+        return count(keyValueStore(keySerde, Serdes.Long(), getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
+    }
+
+    @Override
+    public KTable<K, Long> count() {
+        return count((String) null);
     }
 
     @Override
-    public KTable<K, Long> count(StateStoreSupplier<KeyValueStore> storeSupplier) {
+    public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
         return this.aggregate(
                 new Initializer<Long>() {
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bbd4ac4..b751294 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -617,7 +617,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         final String name = topology.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
         topology.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name);
-        topology.connectProcessorAndStateStores(name, other.getStoreName());
+        topology.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).internalStoreName());
         topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
 
         return new KStreamImpl<>(topology, name, allSourceNodes, false);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 774f235..af8c906 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -20,19 +20,22 @@ import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
 
 class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
 
     private final KTableImpl<K, ?, V> parent;
     private final Predicate<? super K, ? super V> predicate;
     private final boolean filterNot;
-
+    private final String queryableName;
     private boolean sendOldValues = false;
 
-    public KTableFilter(KTableImpl<K, ?, V> parent, Predicate<? super K, ? super V> predicate, boolean filterNot) {
+    public KTableFilter(final KTableImpl<K, ?, V> parent, final Predicate<? super K, ? super V> predicate,
+                        final boolean filterNot, final String queryableName) {
         this.parent = parent;
         this.predicate = predicate;
         this.filterNot = filterNot;
+        this.queryableName = queryableName;
     }
 
     @Override
@@ -74,6 +77,18 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
     }
 
     private class KTableFilterProcessor extends AbstractProcessor<K, Change<V>> {
+        private KeyValueStore<K, V> store;
+        private TupleForwarder<K, V> tupleForwarder;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+            if (queryableName != null) {
+                store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
+                tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+            }
+        }
 
         @Override
         public void process(K key, Change<V> change) {
@@ -83,7 +98,12 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
             if (sendOldValues && oldValue == null && newValue == null)
                 return; // unnecessary to forward here.
 
-            context().forward(key, new Change<>(newValue, oldValue));
+            if (queryableName != null) {
+                store.put(key, newValue);
+                tupleForwarder.maybeForward(key, newValue, oldValue);
+            } else {
+                context().forward(key, new Change<>(newValue, oldValue));
+            }
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 6120f91..96a0b2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -29,7 +29,9 @@ import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.io.FileNotFoundException;
 import java.io.PrintWriter;
@@ -67,58 +69,157 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
+    public static final String STATE_STORE_NAME = "STATE-STORE-";
+
     private final ProcessorSupplier<?, ?> processorSupplier;
 
-    private final String storeName;
+    private final String queryableStoreName;
+    private final boolean isQueryable;
 
     private boolean sendOldValues = false;
+    private final Serde<K> keySerde;
+    private final Serde<V> valSerde;
 
+    public KTableImpl(KStreamBuilder topology,
+                      String name,
+                      ProcessorSupplier<?, ?> processorSupplier,
+                      Set<String> sourceNodes,
+                      final String queryableStoreName,
+                      boolean isQueryable) {
+        super(topology, name, sourceNodes);
+        this.processorSupplier = processorSupplier;
+        this.queryableStoreName = queryableStoreName;
+        this.keySerde = null;
+        this.valSerde = null;
+        this.isQueryable = isQueryable;
+    }
 
     public KTableImpl(KStreamBuilder topology,
                       String name,
                       ProcessorSupplier<?, ?> processorSupplier,
+                      final Serde<K> keySerde,
+                      final Serde<V> valSerde,
                       Set<String> sourceNodes,
-                      final String storeName) {
+                      final String queryableStoreName,
+                      boolean isQueryable) {
         super(topology, name, sourceNodes);
         this.processorSupplier = processorSupplier;
-        this.storeName = storeName;
+        this.queryableStoreName = queryableStoreName;
+        this.keySerde = keySerde;
+        this.valSerde = valSerde;
+        this.isQueryable = isQueryable;
     }
 
     @Override
-    public String getStoreName() {
-        return this.storeName;
+    public String queryableStoreName() {
+        if (!isQueryable) {
+            return null;
+        }
+        return this.queryableStoreName;
     }
 
-    @Override
-    public KTable<K, V> filter(Predicate<? super K, ? super V> predicate) {
+    String internalStoreName() {
+        return this.queryableStoreName;
+    }
+
+    private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
+                                  final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                  boolean isFilterNot) {
         Objects.requireNonNull(predicate, "predicate can't be null");
         String name = topology.newName(FILTER_NAME);
-        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, false);
+        String internalStoreName = null;
+        if (storeSupplier != null) {
+            internalStoreName = storeSupplier.name();
+        }
+        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, isFilterNot, internalStoreName);
         topology.addProcessor(name, processorSupplier, this.name);
+        if (storeSupplier != null) {
+            topology.addStateStore(storeSupplier, name);
+        }
+        return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null);
+    }
 
-        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.storeName);
+    @Override
+    public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate) {
+        return filter(predicate, (String) null);
     }
 
     @Override
-    public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
-        Objects.requireNonNull(predicate, "predicate can't be null");
-        String name = topology.newName(FILTER_NAME);
-        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, true);
+    public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName) {
+        StateStoreSupplier<KeyValueStore> storeSupplier = null;
+        if (queryableStoreName != null) {
+            storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
+        }
+        return doFilter(predicate, storeSupplier, false);
+    }
 
-        topology.addProcessor(name, processorSupplier, this.name);
+    @Override
+    public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return doFilter(predicate, storeSupplier, false);
+    }
 
-        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.storeName);
+    @Override
+    public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
+        return filterNot(predicate, (String) null);
+    }
+
+    @Override
+    public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName) {
+        StateStoreSupplier<KeyValueStore> storeSupplier = null;
+        if (queryableStoreName != null) {
+            storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
+        }
+        return doFilter(predicate, storeSupplier, true);
     }
 
     @Override
-    public <V1> KTable<K, V1> mapValues(ValueMapper<? super V, ? extends V1> mapper) {
+    public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return doFilter(predicate, storeSupplier, true);
+    }
+
+    private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper,
+                                           final Serde<V1> valueSerde,
+                                           final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(mapper);
         String name = topology.newName(MAPVALUES_NAME);
-        KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper);
-
+        String internalStoreName = null;
+        if (storeSupplier != null) {
+            internalStoreName = storeSupplier.name();
+        }
+        KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper, internalStoreName);
         topology.addProcessor(name, processorSupplier, this.name);
+        if (storeSupplier != null) {
+            topology.addStateStore(storeSupplier, name);
+            return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, valueSerde, sourceNodes, internalStoreName, true);
+        } else {
+            return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.queryableStoreName, false);
+        }
+    }
+
+    @Override
+    public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) {
+        return mapValues(mapper, null, (String) null);
+    }
 
-        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.storeName);
+    @Override
+    public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
+                                        final Serde<V1> valueSerde,
+                                        final String queryableStoreName) {
+        StateStoreSupplier<KeyValueStore> storeSupplier = null;
+        if (queryableStoreName != null) {
+            storeSupplier = keyValueStore(this.keySerde, valueSerde, queryableStoreName);
+        }
+        return doMapValues(mapper, valueSerde, storeSupplier);
+    }
+
+    @Override
+    public  <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
+                                         final Serde<V1> valueSerde,
+                                         final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return doMapValues(mapper, valueSerde, storeSupplier);
     }
 
     @Override
@@ -193,30 +294,98 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public KTable<K, V> through(Serde<K> keySerde,
-                                Serde<V> valSerde,
-                                StreamPartitioner<? super K, ? super V> partitioner,
-                                String topic,
-                                final String storeName) {
-        Objects.requireNonNull(storeName, "storeName can't be null");
+    public KTable<K, V> through(final Serde<K> keySerde,
+                                final Serde<V> valSerde,
+                                final StreamPartitioner<? super K, ? super V> partitioner,
+                                final String topic,
+                                final String queryableStoreName) {
+        final String internalStoreName = queryableStoreName != null ? queryableStoreName : topology.newStoreName(KTableImpl.TOSTREAM_NAME);
+
+        to(keySerde, valSerde, partitioner, topic);
+
+        return topology.table(keySerde, valSerde, topic, internalStoreName);
+    }
+
+    @Override
+    public KTable<K, V> through(final Serde<K> keySerde,
+                                final Serde<V> valSerde,
+                                final StreamPartitioner<? super K, ? super V> partitioner,
+                                final String topic,
+                                final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         to(keySerde, valSerde, partitioner, topic);
 
-        return topology.table(keySerde, valSerde, topic, storeName);
+        return topology.table(keySerde, valSerde, topic, storeSupplier);
+    }
+
+    @Override
+    public KTable<K, V> through(final Serde<K> keySerde,
+                                final Serde<V> valSerde,
+                                final StreamPartitioner<? super K, ? super V> partitioner,
+                                final String topic) {
+        return through(keySerde, valSerde, partitioner, topic, (String) null);
+    }
+    @Override
+    public KTable<K, V> through(final Serde<K> keySerde,
+                                final Serde<V> valSerde,
+                                final String topic,
+                                final String queryableStoreName) {
+        return through(keySerde, valSerde, null, topic, queryableStoreName);
+    }
+
+    @Override
+    public KTable<K, V> through(final Serde<K> keySerde,
+                                final Serde<V> valSerde,
+                                final String topic,
+                                final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return through(keySerde, valSerde, null, topic, storeSupplier);
+    }
+
+    @Override
+    public KTable<K, V> through(final Serde<K> keySerde,
+                                final Serde<V> valSerde,
+                                final String topic) {
+        return through(keySerde, valSerde, null, topic, (String) null);
+    }
+
+    @Override
+    public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
+                                final String topic,
+                                final String queryableStoreName) {
+        return through(null, null, partitioner, topic, queryableStoreName);
+    }
+
+    @Override
+    public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
+                                final String topic,
+                                final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return through(null, null, partitioner, topic, storeSupplier);
+    }
+
+    @Override
+    public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
+                                final String topic) {
+        return through(null, null, partitioner, topic, (String) null);
     }
 
     @Override
-    public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic, final String storeName) {
-        return through(keySerde, valSerde, null, topic, storeName);
+    public KTable<K, V> through(final String topic,
+                                final String queryableStoreName) {
+        return through(null, null, null, topic, queryableStoreName);
     }
 
     @Override
-    public KTable<K, V> through(StreamPartitioner<? super K, ? super V> partitioner, String topic, final String storeName) {
-        return through(null, null, partitioner, topic, storeName);
+    public KTable<K, V> through(final String topic,
+                                final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return through(null, null, null, topic, storeSupplier);
     }
 
     @Override
-    public KTable<K, V> through(String topic, final String storeName) {
-        return through(null, null, null, topic, storeName);
+    public KTable<K, V> through(final String topic) {
+        return through(null, null, null, topic, (String) null);
     }
 
     @Override
@@ -259,27 +428,94 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public <V1, R> KTable<K, R> join(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
-        return doJoin(other, joiner, false, false);
+    public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
+                                     final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+        return doJoin(other, joiner, false, false, null, (String) null);
     }
 
+    @Override
+    public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
+                                     final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                     final Serde<R> joinSerde,
+                                     final String queryableStoreName) {
+        return doJoin(other, joiner, false, false, joinSerde, queryableStoreName);
+    }
+
+    @Override
+    public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
+                                     final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return doJoin(other, joiner, false, false, storeSupplier);
+    }
+
+    @Override
+    public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
+                                          final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+        return doJoin(other, joiner, true, true, null, (String) null);
+    }
 
+    @Override
+    public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
+                                          final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                          final Serde<R> joinSerde,
+                                          final String queryableStoreName) {
+        return doJoin(other, joiner, true, true, joinSerde, queryableStoreName);
+    }
 
     @Override
-    public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
-        return doJoin(other, joiner, true, true);
+    public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
+                                          final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                          final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return doJoin(other, joiner, true, true, storeSupplier);
     }
 
     @Override
-    public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
-        return doJoin(other, joiner, true, false);
+    public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
+                                         final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+        return doJoin(other, joiner, true, false, null, (String) null);
+    }
+
+    @Override
+    public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
+                                         final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                         final Serde<R> joinSerde,
+                                         final String queryableStoreName) {
+        return doJoin(other, joiner, true, false, joinSerde, queryableStoreName);
+    }
+
+    @Override
+    public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
+                                         final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                         final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return doJoin(other, joiner, true, false, storeSupplier);
     }
 
     @SuppressWarnings("unchecked")
-    private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, final boolean leftOuter, final boolean rightOuter) {
+    private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other,
+                                        ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                        final boolean leftOuter,
+                                        final boolean rightOuter,
+                                        final Serde<R> joinSerde,
+                                        final String queryableStoreName) {
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
+        final StateStoreSupplier storeSupplier = queryableStoreName == null ? null : keyValueStore(this.keySerde, joinSerde, queryableStoreName);
+
+        return doJoin(other, joiner, leftOuter, rightOuter, storeSupplier);
+    }
+
+    private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other,
+                                        ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                        final boolean leftOuter,
+                                        final boolean rightOuter,
+                                        final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(other, "other can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
+        final String internalQueryableName = storeSupplier == null ? null : storeSupplier.name();
         final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
         if (leftOuter) {
@@ -308,8 +544,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         }
 
         final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
-            new KTableImpl<K, V, R>(topology, joinThisName, joinThis, sourceNodes, storeName),
-                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
+                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, sourceNodes, this.internalStoreName(), false),
+                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes,
+                        ((KTableImpl<K, ?, ?>) other).internalStoreName(), false),
+                internalQueryableName
         );
 
         topology.addProcessor(joinThisName, joinThis, this.name);
@@ -318,7 +556,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
         topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
 
-        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
+        if (internalQueryableName != null) {
+            topology.addStateStore(storeSupplier, joinMergeName);
+        }
+
+        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, internalQueryableName, internalQueryableName != null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index ee7a064..82d9c26 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -18,15 +18,22 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
 
 class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
 
     private final KTableImpl<K, ?, V> parent1;
     private final KTableImpl<K, ?, V> parent2;
+    private final String queryableName;
+    private boolean sendOldValues = false;
 
-    public KTableKTableJoinMerger(KTableImpl<K, ?, V> parent1, KTableImpl<K, ?, V> parent2) {
+    public KTableKTableJoinMerger(final KTableImpl<K, ?, V> parent1,
+                                  final KTableImpl<K, ?, V> parent2,
+                                  final String queryableName) {
         this.parent1 = parent1;
         this.parent2 = parent2;
+        this.queryableName = queryableName;
     }
 
     @Override
@@ -43,13 +50,35 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
     public void enableSendingOldValues() {
         parent1.enableSendingOldValues();
         parent2.enableSendingOldValues();
+        sendOldValues = true;
     }
 
-    private static final class KTableKTableJoinMergeProcessor<K, V>
+    private class KTableKTableJoinMergeProcessor<K, V>
         extends AbstractProcessor<K, Change<V>> {
+        private KeyValueStore<K, V> store;
+        private TupleForwarder<K, V> tupleForwarder;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+            if (queryableName != null) {
+                store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
+                tupleForwarder = new TupleForwarder<>(store, context,
+                    new ForwardingCacheFlushListener<K, V>(context, sendOldValues),
+                    sendOldValues);
+            }
+        }
+
         @Override
         public void process(K key, Change<V> value) {
-            context().forward(key, value);
+
+            if (queryableName != null) {
+                store.put(key, value.newValue);
+                tupleForwarder.maybeForward(key, value.newValue, value.oldValue);
+            } else {
+                context().forward(key, value);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 90610de..41dd7cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -20,18 +20,21 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
 
 
 class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
 
     private final KTableImpl<K, ?, V> parent;
     private final ValueMapper<? super V, ? extends V1> mapper;
-
+    private final String queryableName;
     private boolean sendOldValues = false;
 
-    public KTableMapValues(KTableImpl<K, ?, V> parent, ValueMapper<? super V, ? extends V1> mapper) {
+    public KTableMapValues(final KTableImpl<K, ?, V> parent, final ValueMapper<? super V, ? extends V1> mapper,
+                           final String queryableName) {
         this.parent = parent;
         this.mapper = mapper;
+        this.queryableName = queryableName;
     }
 
     @Override
@@ -73,12 +76,30 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
 
     private class KTableMapValuesProcessor extends AbstractProcessor<K, Change<V>> {
 
+        private KeyValueStore<K, V1> store;
+        private TupleForwarder<K, V1> tupleForwarder;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+            if (queryableName != null) {
+                store = (KeyValueStore<K, V1>) context.getStateStore(queryableName);
+                tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V1>(context, sendOldValues), sendOldValues);
+            }
+        }
+
         @Override
         public void process(K key, Change<V> change) {
             V1 newValue = computeValue(change.newValue);
             V1 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
 
-            context().forward(key, new Change<>(newValue, oldValue));
+            if (queryableName != null) {
+                store.put(key, newValue);
+                tupleForwarder.maybeForward(key, newValue, oldValue);
+            } else {
+                context().forward(key, new Change<>(newValue, oldValue));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index a7bc5bd..81f0ff9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.internals.QuickUnion;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -460,7 +461,7 @@ public class TopologyBuilder {
      * receive all records forwarded from the {@link SourceNode}. This
      * {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
      *
-     * @param store                 the instance of {@link StateStore}
+     * @param storeSupplier         user defined state store supplier
      * @param sourceName            name of the {@link SourceNode} that will be automatically added
      * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
      * @param valueDeserializer     the {@link Deserializer} to deserialize values with
@@ -469,14 +470,14 @@ public class TopologyBuilder {
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized TopologyBuilder addGlobalStore(final StateStore store,
+    public synchronized TopologyBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
                                                        final String sourceName,
                                                        final Deserializer keyDeserializer,
                                                        final Deserializer valueDeserializer,
                                                        final String topic,
                                                        final String processorName,
                                                        final ProcessorSupplier stateUpdateSupplier) {
-        Objects.requireNonNull(store, "store must not be null");
+        Objects.requireNonNull(storeSupplier, "store supplier must not be null");
         Objects.requireNonNull(sourceName, "sourceName must not be null");
         Objects.requireNonNull(topic, "topic must not be null");
         Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
@@ -487,8 +488,11 @@ public class TopologyBuilder {
         if (nodeFactories.containsKey(processorName)) {
             throw new TopologyBuilderException("Processor " + processorName + " is already added.");
         }
-        if (stateFactories.containsKey(store.name()) || globalStateStores.containsKey(store.name())) {
-            throw new TopologyBuilderException("StateStore " + store.name() + " is already added.");
+        if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
+            throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " is already added.");
+        }
+        if (storeSupplier.loggingEnabled()) {
+            throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
         }
         if (sourceName.equals(processorName)) {
             throw new TopologyBuilderException("sourceName and processorName must be different.");
@@ -504,13 +508,13 @@ public class TopologyBuilder {
 
         final String[] parents = {sourceName};
         final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, parents, stateUpdateSupplier);
-        nodeFactory.addStateStore(store.name());
+        nodeFactory.addStateStore(storeSupplier.name());
         nodeFactories.put(processorName, nodeFactory);
         nodeGrouper.add(processorName);
         nodeGrouper.unite(processorName, parents);
 
-        globalStateStores.put(store.name(), store);
-        connectSourceStoreAndTopic(store.name(), topic);
+        globalStateStores.put(storeSupplier.name(), storeSupplier.get());
+        connectSourceStoreAndTopic(storeSupplier.name(), topic);
         return this;
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index e81c7d3..0722210 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -355,13 +355,7 @@ public class KStreamAggregationIntegrationTest {
             )));
     }
 
-    @Test
-    public void shouldCount() throws Exception {
-        produceMessages(mockTime.milliseconds());
-
-        groupedStream.count("count-by-key")
-            .to(Serdes.String(), Serdes.Long(), outputTopic);
-
+    private void shouldCountHelper() throws Exception {
         startStreams();
 
         produceMessages(mockTime.milliseconds());
@@ -392,6 +386,26 @@ public class KStreamAggregationIntegrationTest {
     }
 
     @Test
+    public void shouldCount() throws Exception {
+        produceMessages(mockTime.milliseconds());
+
+        groupedStream.count("count-by-key")
+            .to(Serdes.String(), Serdes.Long(), outputTopic);
+
+        shouldCountHelper();
+    }
+
+    @Test
+    public void shouldCountWithInternalStore() throws Exception {
+        produceMessages(mockTime.milliseconds());
+
+        groupedStream.count()
+            .to(Serdes.String(), Serdes.Long(), outputTopic);
+
+        shouldCountHelper();
+    }
+
+    @Test
     public void shouldGroupByKey() throws Exception {
         final long timestamp = mockTime.milliseconds();
         produceMessages(timestamp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 26deb92..5fa7e49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -30,6 +30,9 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -41,11 +44,15 @@ import org.junit.experimental.categories.Category;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 @Category({IntegrationTest.class})
 public class KTableKTableJoinIntegrationTest {
@@ -134,12 +141,22 @@ public class KTableKTableJoinIntegrationTest {
 
     @Test
     public void shouldInnerInnerJoin() throws Exception {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")));
+        verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false);
+    }
+
+    @Test
+    public void shouldInnerInnerJoinQueryable() throws Exception {
+        verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true);
     }
 
     @Test
     public void shouldInnerLeftJoin() throws Exception {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")));
+        verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false);
+    }
+
+    @Test
+    public void shouldInnerLeftJoinQueryable() throws Exception {
+        verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true);
     }
 
     @Test
@@ -148,7 +165,16 @@ public class KTableKTableJoinIntegrationTest {
                 new KeyValue<>("a", "null-A3"),
                 new KeyValue<>("b", "null-B3"),
                 new KeyValue<>("c", "null-C3"),
-                new KeyValue<>("b", "B1-B2-B3")));
+                new KeyValue<>("b", "B1-B2-B3")), false);
+    }
+
+    @Test
+    public void shouldInnerOuterJoinQueryable() throws Exception {
+        verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList(
+            new KeyValue<>("a", "null-A3"),
+            new KeyValue<>("b", "null-B3"),
+            new KeyValue<>("c", "null-C3"),
+            new KeyValue<>("b", "B1-B2-B3")), true);
     }
 
     @Test
@@ -156,7 +182,15 @@ public class KTableKTableJoinIntegrationTest {
         verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3")));
+                new KeyValue<>("b", "B1-B2-B3")), false);
+    }
+
+    @Test
+    public void shouldLeftInnerJoinQueryable() throws Exception {
+        verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
+            new KeyValue<>("a", "A1-null-A3"),
+            new KeyValue<>("b", "B1-null-B3"),
+            new KeyValue<>("b", "B1-B2-B3")), true);
     }
 
     @Test
@@ -164,7 +198,15 @@ public class KTableKTableJoinIntegrationTest {
         verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3")));
+                new KeyValue<>("b", "B1-B2-B3")), false);
+    }
+
+    @Test
+    public void shouldLeftLeftJoinQueryable() throws Exception {
+        verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
+            new KeyValue<>("a", "A1-null-A3"),
+            new KeyValue<>("b", "B1-null-B3"),
+            new KeyValue<>("b", "B1-B2-B3")), true);
     }
 
     @Test
@@ -175,7 +217,18 @@ public class KTableKTableJoinIntegrationTest {
                 new KeyValue<>("c", "null-C3"),
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3")));
+                new KeyValue<>("b", "B1-B2-B3")), false);
+    }
+
+    @Test
+    public void shouldLeftOuterJoinQueryable() throws Exception {
+        verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList(
+            new KeyValue<>("a", "null-A3"),
+            new KeyValue<>("b", "null-B3"),
+            new KeyValue<>("c", "null-C3"),
+            new KeyValue<>("a", "A1-null-A3"),
+            new KeyValue<>("b", "B1-null-B3"),
+            new KeyValue<>("b", "B1-B2-B3")), true);
     }
 
     @Test
@@ -184,7 +237,16 @@ public class KTableKTableJoinIntegrationTest {
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
                 new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C2-C3")));
+                new KeyValue<>("c", "null-C2-C3")), false);
+    }
+
+    @Test
+    public void shouldOuterInnerJoinQueryable() throws Exception {
+        verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList(
+            new KeyValue<>("a", "A1-null-A3"),
+            new KeyValue<>("b", "B1-null-B3"),
+            new KeyValue<>("b", "B1-B2-B3"),
+            new KeyValue<>("c", "null-C2-C3")), true);
     }
 
     @Test
@@ -193,7 +255,16 @@ public class KTableKTableJoinIntegrationTest {
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
                 new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C2-C3")));
+                new KeyValue<>("c", "null-C2-C3")), false);
+    }
+
+    @Test
+    public void shouldOuterLeftJoinQueryable() throws Exception {
+        verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT,  Arrays.asList(
+            new KeyValue<>("a", "A1-null-A3"),
+            new KeyValue<>("b", "B1-null-B3"),
+            new KeyValue<>("b", "B1-B2-B3"),
+            new KeyValue<>("c", "null-C2-C3")), true);
     }
 
     @Test
@@ -205,16 +276,30 @@ public class KTableKTableJoinIntegrationTest {
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
                 new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C2-C3")));
+                new KeyValue<>("c", "null-C2-C3")), false);
+    }
+
+    @Test
+    public void shouldOuterOuterJoinQueryable() throws Exception {
+        verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList(
+            new KeyValue<>("a", "null-A3"),
+            new KeyValue<>("b", "null-B3"),
+            new KeyValue<>("c", "null-C3"),
+            new KeyValue<>("a", "A1-null-A3"),
+            new KeyValue<>("b", "B1-null-B3"),
+            new KeyValue<>("b", "B1-B2-B3"),
+            new KeyValue<>("c", "null-C2-C3")), true);
     }
 
 
     private void verifyKTableKTableJoin(final JoinType joinType1,
                                         final JoinType joinType2,
-                                        final List<KeyValue<String, String>> expectedResult) throws Exception {
-        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join");
+                                        final List<KeyValue<String, String>> expectedResult,
+                                        boolean verifyQueryableState) throws Exception {
+        final String queryableName = verifyQueryableState ? joinType1 + "-" + joinType2 + "-ktable-ktable-join-query" : null;
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join" + queryableName);
 
-        streams = prepareTopology(joinType1, joinType2);
+        streams = prepareTopology(joinType1, joinType2, queryableName);
         streams.start();
 
         final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
@@ -223,20 +308,54 @@ public class KTableKTableJoinIntegrationTest {
                 expectedResult.size());
 
         assertThat(result, equalTo(expectedResult));
+
+        if (verifyQueryableState) {
+            verifyKTableKTableJoinQueryableState(joinType1, joinType2, expectedResult);
+        }
     }
-    private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2) {
+
+    private void verifyKTableKTableJoinQueryableState(final JoinType joinType1,
+                                                      final JoinType joinType2,
+                                                      final List<KeyValue<String, String>> expectedResult) {
+        final String queryableName = joinType1 + "-" + joinType2 + "-ktable-ktable-join-query";
+        final ReadOnlyKeyValueStore<String, String> myJoinStore = streams.store(queryableName,
+            QueryableStoreTypes.<String, String>keyValueStore());
+
+        // store only keeps last set of values, not entire stream of value changes
+        final Map<String, String> expectedInStore = new HashMap<>();
+        for (KeyValue<String, String> expected : expectedResult) {
+            expectedInStore.put(expected.key, expected.value);
+        }
+
+        for (Map.Entry<String, String> expected : expectedInStore.entrySet()) {
+            assertEquals(expected.getValue(), myJoinStore.get(expected.getKey()));
+        }
+        final KeyValueIterator<String, String> all = myJoinStore.all();
+        while (all.hasNext()) {
+            KeyValue<String, String> storeEntry = all.next();
+            assertTrue(expectedResult.contains(storeEntry));
+        }
+        all.close();
+
+    }
+
+    private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2, final String queryableName) {
         final KStreamBuilder builder = new KStreamBuilder();
 
         final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
         final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2);
         final KTable<String, String> table3 = builder.table(TABLE_3, TABLE_3);
 
-        join(join(table1, table2, joinType1), table3, joinType2).to(OUTPUT);
+        join(join(table1, table2, joinType1, null /* no need to query intermediate result */), table3,
+            joinType2, queryableName).to(OUTPUT);
 
         return new KafkaStreams(builder, new StreamsConfig(streamsConfig));
     }
 
-    private KTable<String, String> join(KTable<String, String> first, KTable<String, String> second, JoinType joinType) {
+    private KTable<String, String> join(final KTable<String, String> first,
+                                        final KTable<String, String> second,
+                                        final JoinType joinType,
+                                        final String queryableName) {
         final ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
             @Override
             public String apply(final String value1, final String value2) {
@@ -246,11 +365,11 @@ public class KTableKTableJoinIntegrationTest {
 
         switch (joinType) {
             case INNER:
-                return first.join(second, joiner);
+                return first.join(second, joiner, Serdes.String(), queryableName);
             case LEFT:
-                return first.leftJoin(second, joiner);
+                return first.leftJoin(second, joiner, Serdes.String(), queryableName);
             case OUTER:
-                return first.outerJoin(second, joiner);
+                return first.outerJoin(second, joiner, Serdes.String(), queryableName);
         }
 
         throw new RuntimeException("Unknown join type.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 314079f..b435ceb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -36,6 +37,8 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -71,6 +74,7 @@ import java.util.concurrent.TimeUnit;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -425,6 +429,180 @@ public class QueryableStateIntegrationTest {
         verifyCanQueryState(10 * 1024 * 1024);
     }
 
+    @Test
+    public void shouldBeAbleToQueryFilterState() throws Exception {
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        final KStreamBuilder builder = new KStreamBuilder();
+        final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
+        final Set<KeyValue<String, Long>> batch1 = new HashSet<>();
+        batch1.addAll(Arrays.asList(
+            new KeyValue<>(keys[0], 1L),
+            new KeyValue<>(keys[1], 1L),
+            new KeyValue<>(keys[2], 3L),
+            new KeyValue<>(keys[3], 5L),
+            new KeyValue<>(keys[4], 2L)));
+        final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
+        expectedBatch1.addAll(Arrays.asList(
+            new KeyValue<>(keys[4], 2L)));
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamOne,
+            batch1,
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                LongSerializer.class,
+                new Properties()),
+            mockTime);
+        final Predicate<String, Long> filterPredicate = new Predicate<String, Long>() {
+            @Override
+            public boolean test(String key, Long value) {
+                return key.contains("kafka");
+            }
+        };
+        final KTable<String, Long> t1 = builder.table(streamOne);
+        final KTable<String, Long> t2 = t1.filter(filterPredicate, "queryFilter");
+        t1.filterNot(filterPredicate, "queryFilterNot");
+        t2.to(outputTopic);
+
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
+
+        waitUntilAtLeastNumRecordProcessed(outputTopic, 2);
+
+        final ReadOnlyKeyValueStore<String, Long>
+            myFilterStore = kafkaStreams.store("queryFilter", QueryableStoreTypes.<String, Long>keyValueStore());
+        final ReadOnlyKeyValueStore<String, Long>
+            myFilterNotStore = kafkaStreams.store("queryFilterNot", QueryableStoreTypes.<String, Long>keyValueStore());
+
+        for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
+            assertEquals(myFilterStore.get(expectedEntry.key), expectedEntry.value);
+        }
+        for (final KeyValue<String, Long> batchEntry : batch1) {
+            if (!expectedBatch1.contains(batchEntry)) {
+                assertNull(myFilterStore.get(batchEntry.key));
+            }
+        }
+
+        for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
+            assertNull(myFilterNotStore.get(expectedEntry.key));
+        }
+        for (final KeyValue<String, Long> batchEntry : batch1) {
+            if (!expectedBatch1.contains(batchEntry)) {
+                assertEquals(myFilterNotStore.get(batchEntry.key), batchEntry.value);
+            }
+        }
+    }
+
+    @Test
+    public void shouldBeAbleToQueryMapValuesState() throws Exception {
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        final KStreamBuilder builder = new KStreamBuilder();
+        final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
+        final Set<KeyValue<String, String>> batch1 = new HashSet<>();
+        batch1.addAll(Arrays.asList(
+            new KeyValue<>(keys[0], "1"),
+            new KeyValue<>(keys[1], "1"),
+            new KeyValue<>(keys[2], "3"),
+            new KeyValue<>(keys[3], "5"),
+            new KeyValue<>(keys[4], "2")));
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamOne,
+            batch1,
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            mockTime);
+
+        final KTable<String, String> t1 = builder.table(streamOne);
+        final KTable<String, Long> t2 = t1.mapValues(new ValueMapper<String, Long>() {
+            @Override
+            public Long apply(String value) {
+                return Long.valueOf(value);
+            }
+        }, Serdes.Long(), "queryMapValues");
+        t2.to(Serdes.String(), Serdes.Long(), outputTopic);
+
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
+
+        waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
+
+        final ReadOnlyKeyValueStore<String, Long>
+            myMapStore = kafkaStreams.store("queryMapValues",
+            QueryableStoreTypes.<String, Long>keyValueStore());
+        for (final KeyValue<String, String> batchEntry : batch1) {
+            assertEquals(myMapStore.get(batchEntry.key), Long.valueOf(batchEntry.value));
+        }
+    }
+
+    @Test
+    public void shouldBeAbleToQueryMapValuesAfterFilterState() throws Exception {
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        final KStreamBuilder builder = new KStreamBuilder();
+        final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
+        final Set<KeyValue<String, String>> batch1 = new HashSet<>();
+        batch1.addAll(Arrays.asList(
+            new KeyValue<>(keys[0], "1"),
+            new KeyValue<>(keys[1], "1"),
+            new KeyValue<>(keys[2], "3"),
+            new KeyValue<>(keys[3], "5"),
+            new KeyValue<>(keys[4], "2")));
+        final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
+        expectedBatch1.addAll(Arrays.asList(
+            new KeyValue<>(keys[4], 2L)));
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamOne,
+            batch1,
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            mockTime);
+
+        final Predicate<String, String> filterPredicate = new Predicate<String, String>() {
+            @Override
+            public boolean test(String key, String value) {
+                return key.contains("kafka");
+            }
+        };
+        final KTable<String, String> t1 = builder.table(streamOne);
+        final KTable<String, String> t2 = t1.filter(filterPredicate, "queryFilter");
+        final KTable<String, Long> t3 = t2.mapValues(new ValueMapper<String, Long>() {
+            @Override
+            public Long apply(String value) {
+                return Long.valueOf(value);
+            }
+        }, Serdes.Long(), "queryMapValues");
+        t3.to(Serdes.String(), Serdes.Long(), outputTopic);
+
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
+
+        waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
+
+        final ReadOnlyKeyValueStore<String, Long>
+            myMapStore = kafkaStreams.store("queryMapValues",
+            QueryableStoreTypes.<String, Long>keyValueStore());
+        for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
+            assertEquals(myMapStore.get(expectedEntry.key), expectedEntry.value);
+        }
+        for (final KeyValue<String, String> batchEntry : batch1) {
+            final KeyValue<String, Long> batchEntryMapValue = new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value));
+            if (!expectedBatch1.contains(batchEntryMapValue)) {
+                assertNull(myMapStore.get(batchEntry.key));
+            }
+        }
+    }
+
     private void verifyCanQueryState(int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
         final KStreamBuilder builder = new KStreamBuilder();