You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/09/08 21:02:09 UTC

kafka git commit: KAFKA-5852: Add filter, filterNot, mapValues and Materialized to KTable

Repository: kafka
Updated Branches:
  refs/heads/trunk 6e4045586 -> 2db1e4423


KAFKA-5852: Add filter, filterNot, mapValues and Materialized to KTable

Add overloads of `filter`, `filterNot`, `mapValues` that take `Materialized` as a param to `KTable`. Deprecate overloads using `storeName` and `storeSupplier`

Author: Damian Guy <da...@gmail.com>

Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3807 from dguy/ktable-filter-map


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2db1e442
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2db1e442
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2db1e442

Branch: refs/heads/trunk
Commit: 2db1e4423fb33405a319b253b040e57e069c1f7a
Parents: 6e40455
Author: Damian Guy <da...@gmail.com>
Authored: Fri Sep 8 14:01:58 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Sep 8 14:01:58 2017 -0700

----------------------------------------------------------------------
 docs/streams/developer-guide.html               |  11 +-
 .../apache/kafka/streams/kstream/KTable.java    | 140 +++++++++++++-
 .../kafka/streams/kstream/Materialized.java     | 184 +++++++++++++++++++
 .../streams/kstream/internals/KTableImpl.java   |  60 ++++++
 .../internals/KeyValueStoreMaterializer.java    |  52 ++++++
 .../kstream/internals/MaterializedInternal.java |  62 +++++++
 .../QueryableStateIntegrationTest.java          |  13 +-
 .../kstream/internals/KTableFilterTest.java     |  28 +++
 .../kstream/internals/KTableImplTest.java       |  22 +++
 .../kstream/internals/KTableMapValuesTest.java  |   9 +-
 .../KeyValueStoreMaterializerTest.java          | 116 ++++++++++++
 11 files changed, 683 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 42a9b20..ab5a823 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -707,7 +707,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
                 </p>
                 <pre class="brush: java;">
                      KStream&lt;String, Long&gt; stream = ...;
-
+                     KTable&lt;String, Long&gt; table = ...;
                      // A filter that selects (keeps) only positive numbers
                      // Java 8+ example, using lambda expressions
                      KStream&lt;String, Long&gt; onlyPositives = stream.filter((key, value) -> value > 0);
@@ -720,6 +720,9 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
                            return value > 0;
                          }
                        });
+
+                    // A filter on a KTable that materializes the result into a StateStore
+                    table.filter((key, value) -> value != 0, Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("filtered"));
 	            </pre>
             </td>
         </tr>
@@ -991,7 +994,8 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
                 </p>
 
                 <pre class="brush: java;">
-                       KStream&lt;byte[], String> stream = ...;
+                       KStream&lt;byte[], String&gt; stream = ...;
+                       KTable&lt;String, String&gt; table = ...;
 
                        // Java 8+ example, using lambda expressions
                        KStream&lt;byte[], String&gt; uppercased = stream.mapValues(value -> value.toUpperCase());
@@ -1004,6 +1008,9 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
                              return s.toUpperCase();
                           }
                        });
+
+                       // mapValues on a KTable and also materialize the results into a statestore
+                       table.mapValue(value -> value.toUpperCase(), Materialized.&lt;String, String, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("uppercased"));
                 </pre>
             </td>
         </tr>

http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 4bc9572..2571ac1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -19,14 +19,17 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@@ -116,6 +119,44 @@ public interface KTable<K, V> {
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
+     * <p>
+     *
+     * @param predicate     a filter {@link Predicate} that is applied to each record
+     * @param materialized  a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+     *                      should be materialized
+     * @return a {@code KTable} that contains only those records that satisfy the given predicate
+     * @see #filterNot(Predicate, Materialized)
+     */
+    KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
+                        final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
+
+    /**
+     * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
+     * predicate.
+     * All records that do not satisfy the predicate are dropped.
+     * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+     * result {@code KTable}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * Note that {@code filter} for a <i>changelog stream</i> works different to {@link KStream#filter(Predicate)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+     * directly if required (i.e., if there is anything to be deleted).
+     * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record
+     * is forwarded.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // filtering words
+     * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+     * K key = "some-word";
+     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      *
      * @param predicate a filter {@link Predicate} that is applied to each record
@@ -124,8 +165,10 @@ public interface KTable<K, V> {
      *                          alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
      *                          (i.e., that would be equivalent to calling {@link KTable#filter(Predicate)}.
      * @return a {@code KTable} that contains only those records that satisfy the given predicate
-     * @see #filterNot(Predicate)
+     * @see #filterNot(Predicate, Materialized)
+     * @deprecated use {@link #filter(Predicate, Materialized)}
      */
+    @Deprecated
     KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
 
     /**
@@ -159,8 +202,10 @@ public interface KTable<K, V> {
      * @param predicate a filter {@link Predicate} that is applied to each record
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains only those records that satisfy the given predicate
-     * @see #filterNot(Predicate)
+     * @see #filterNot(Predicate, Materialized)
+     * @deprecated use {@link #filter(Predicate, Materialized)}
      */
+    @Deprecated
     KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
@@ -211,12 +256,50 @@ public interface KTable<K, V> {
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
+     * <p>
+     * @param predicate a filter {@link Predicate} that is applied to each record
+     * @param materialized  a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+     *                      should be materialized
+     * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
+     * @see #filter(Predicate, Materialized)
+     */
+    KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
+                           final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
+    /**
+     * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
+     * given predicate.
+     * All records that <em>do</em> satisfy the predicate are dropped.
+     * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+     * result {@code KTable}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * Note that {@code filterNot} for a <i>changelog stream</i> works different to {@link KStream#filterNot(Predicate)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+     * directly if required (i.e., if there is anything to be deleted).
+     * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
+     * forwarded.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // filtering words
+     * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+     * K key = "some-word";
+     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * @param predicate a filter {@link Predicate} that is applied to each record
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
-     * @see #filter(Predicate)
+     * @see #filter(Predicate, Materialized)
+     * @deprecated use {@link #filterNot(Predicate, Materialized)}
      */
+    @Deprecated
     KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
@@ -252,8 +335,10 @@ public interface KTable<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
      * (i.e., that would be equivalent to calling {@link KTable#filterNot(Predicate)}.
      * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
-     * @see #filter(Predicate)
+     * @see #filter(Predicate, Materialized)
+     * @deprecated use {@link #filter(Predicate, Materialized)}
      */
+    @Deprecated
     KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
 
 
@@ -291,6 +376,49 @@ public interface KTable<K, V> {
      */
     <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);
 
+    /**
+     * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
+     * (with possible new type)in the new {@code KTable}.
+     * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
+     * computes a new value for it, resulting in an update record for the result {@code KTable}.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * The example below counts the number of token of the value string.
+     * <pre>{@code
+     * KTable<String, String> inputTable = builder.table("topic");
+     * KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
+     *     Integer apply(String value) {
+     *         return value.split(" ").length;
+     *     }
+     * });
+     * }</pre>
+     * <p>
+     * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
+     * <p>
+     * This operation preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
+     * the result {@code KTable}.
+     * <p>
+     * Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapper)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
+     * delete the corresponding record in the result {@code KTable}.
+     *
+     * @param mapper a {@link ValueMapper} that computes a new output value
+     * @param materialized  a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+     *                      should be materialized
+     * @param <VR>   the value type of the result {@code KTable}
+     *
+     * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+     */
+    <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
+                                 final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
 
     /**
      * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
@@ -335,7 +463,9 @@ public interface KTable<K, V> {
      * @param <VR>   the value type of the result {@code KTable}
      *
      * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+     * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
      */
+    @Deprecated
     <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);
 
     /**
@@ -377,7 +507,9 @@ public interface KTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @param <VR>   the value type of the result {@code KTable}
      * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+     * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
      */
+    @Deprecated
     <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
                                  final Serde<VR> valueSerde,
                                  final StateStoreSupplier<KeyValueStore> storeSupplier);

http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
new file mode 100644
index 0000000..fb2e7a6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreSupplier;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Used to describe how a {@link StateStore} should be materialized.
+ * You can either provide a custom {@link StateStore} backend
+ * through one of the provided methods accepting a supplier or use the default RocksDB backends
+ * by providing just a store name.
+ */
+public class Materialized<K, V, S extends StateStore> {
+    protected StoreSupplier<S> storeSupplier;
+    protected String storeName;
+    protected Serde<V> valueSerde;
+    protected Serde<K> keySerde;
+    protected boolean loggingEnabled = true;
+    protected boolean cachingEnabled = true;
+    protected Map<String, String> topicConfig = new HashMap<>();
+
+    private Materialized(final StoreSupplier<S> storeSupplier) {
+        this.storeSupplier = storeSupplier;
+    }
+
+    private Materialized(final String storeName) {
+        this.storeName = storeName;
+    }
+
+    /**
+     * Copy constructor.
+     * @param materialized  the {@link Materialized} instance to copy.
+     */
+    protected Materialized(final Materialized<K, V, S> materialized) {
+        this.storeSupplier = materialized.storeSupplier;
+        this.storeName = materialized.storeName;
+        this.keySerde = materialized.keySerde;
+        this.valueSerde = materialized.valueSerde;
+        this.loggingEnabled = materialized.loggingEnabled;
+        this.cachingEnabled = materialized.cachingEnabled;
+        this.topicConfig = materialized.topicConfig;
+    }
+
+    /**
+     * Materialize a {@link StateStore} with the given name.
+     *
+     * @param storeName name of the store to materialize
+     * @param <K>       key type of the store
+     * @param <V>       value type of the store
+     * @param <S>       type of the {@link StateStore}
+     * @return a new {@link Materialized} instance with the given storeName
+     */
+    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName) {
+        return new Materialized<>(storeName);
+    }
+
+    /**
+     * Materialize a {@link WindowStore} using the provided {@link WindowBytesStoreSupplier}.
+     *
+     * @param supplier the {@link WindowBytesStoreSupplier} used to materialize the store
+     * @param <K>      key type of the store
+     * @param <V>      value type of the store
+     * @return a new {@link Materialized} instance with the given supplier
+     */
+    public static <K, V> Materialized<K, V, WindowStore<Bytes, byte[]>> as(final WindowBytesStoreSupplier supplier) {
+        return new Materialized<>(supplier);
+    }
+
+    /**
+     * Materialize a {@link SessionStore} using the provided {@link SessionBytesStoreSupplier}.
+     *
+     * @param supplier the {@link SessionBytesStoreSupplier} used to materialize the store
+     * @param <K>      key type of the store
+     * @param <V>      value type of the store
+     * @return a new {@link Materialized} instance with the given supplier
+     */
+    public static <K, V> Materialized<K, V, SessionStore<Bytes, byte[]>> as(final SessionBytesStoreSupplier supplier) {
+        return new Materialized<>(supplier);
+    }
+
+    /**
+     * Materialize a {@link KeyValueStore} using the provided {@link KeyValueBytesStoreSupplier}.
+     *
+     * @param supplier the {@link KeyValueBytesStoreSupplier} used to materialize the store
+     * @param <K>      key type of the store
+     * @param <V>      value type of the store
+     * @return a new {@link Materialized} instance with the given supplier
+     */
+    public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> as(final KeyValueBytesStoreSupplier supplier) {
+        return new Materialized<>(supplier);
+    }
+
+    /**
+     * Set the valueSerde the materialized {@link StateStore} will use.
+     *
+     * @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value
+     *                   serde from configs will be used
+     * @return itself
+     */
+    public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde) {
+        this.valueSerde = valueSerde;
+        return this;
+    }
+
+    /**
+     * Set the keySerde the materialize {@link StateStore} will use.
+     * @param keySerde  the key {@link Serde} to use. If the {@link Serde} is null, then the default key
+     *                  serde from configs will be used
+     * @return itself
+     */
+    public Materialized<K, V, S> withKeySerde(final Serde<K> keySerde) {
+        this.keySerde = keySerde;
+        return this;
+    }
+
+    /**
+     * Indicates that a changelog should be created for the store. The changelog will be created
+     * with the provided configs.
+     * <p>
+     * Note: Any unrecognized configs will be ignored.
+     * @param config    any configs that should be applied to the changelog
+     * @return itself
+     */
+    public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> config) {
+        loggingEnabled = true;
+        this.topicConfig = config;
+        return this;
+    }
+
+    /**
+     * Disable change logging for the materialized {@link StateStore}.
+     * @return itself
+     */
+    public Materialized<K, V, S> withLoggingDisabled() {
+        loggingEnabled = false;
+        this.topicConfig.clear();
+        return this;
+    }
+
+    /**
+     * Enable caching for the materialized {@link StateStore}.
+     * @return itself
+     */
+    public Materialized<K, V, S> withCachingEnabled() {
+        cachingEnabled = true;
+        return this;
+    }
+
+    /**
+     * Disable caching for the materialized {@link StateStore}.
+     * @return itself
+     */
+    public Materialized<K, V, S> withCachingDisabled() {
+        cachingEnabled = false;
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 87277b6..d3d6ce2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -24,6 +25,7 @@ import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -33,6 +35,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.io.FileNotFoundException;
 import java.io.PrintWriter;
@@ -155,6 +158,30 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null);
     }
 
+    private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
+                                  final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized,
+                                  final boolean filterNot) {
+        String name = builder.newName(FILTER_NAME);
+
+        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this,
+                                                                                predicate,
+                                                                                filterNot,
+                                                                                materialized.storeName());
+        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
+
+        final StoreBuilder builder = new KeyValueStoreMaterializer<>(materialized).materialize();
+        this.builder.internalTopologyBuilder.addStateStore(builder, name);
+
+        return new KTableImpl<>(this.builder,
+                                name,
+                                processorSupplier,
+                                this.keySerde,
+                                this.valSerde,
+                                sourceNodes,
+                                builder.name(),
+                                true);
+    }
+
     @Override
     public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate) {
         return filter(predicate, (String) null);
@@ -162,6 +189,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @Override
     public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
+                               final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(predicate, "predicate can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        return doFilter(predicate, new MaterializedInternal<>(materialized), false);
+    }
+
+    @Override
+    public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
                                final String queryableStoreName) {
         StateStoreSupplier<KeyValueStore> storeSupplier = null;
         if (queryableStoreName != null) {
@@ -184,6 +219,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @Override
     public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
+                                  final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(predicate, "predicate can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        return doFilter(predicate, new MaterializedInternal<>(materialized), true);
+    }
+
+    @Override
+    public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
                                   final String queryableStoreName) {
         StateStoreSupplier<KeyValueStore> storeSupplier = null;
         if (queryableStoreName != null) {
@@ -224,6 +267,23 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
+    public <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
+                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
+                = new MaterializedInternal<>(materialized);
+        final String name = builder.newName(MAPVALUES_NAME);
+        final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this,
+                                                                                          mapper,
+                                                                                          materializedInternal.storeName());
+        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
+        builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
+                                                      name);
+        return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true);
+    }
+
+    @Override
     public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
                                         final Serde<V1> valueSerde,
                                         final String queryableStoreName) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
new file mode 100644
index 0000000..1d702f2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+public class KeyValueStoreMaterializer<K, V> {
+    private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized;
+
+    public KeyValueStoreMaterializer(final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        this.materialized = materialized;
+    }
+
+    public StoreBuilder<KeyValueStore<K, V>> materialize() {
+        KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier();
+        if (supplier == null) {
+            supplier = Stores.persistentKeyValueStore(materialized.storeName());
+        }
+        final StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder(supplier,
+                                                                                      materialized.keySerde(),
+                                                                                      materialized.valueSerde());
+
+        if (materialized.loggingEnabled()) {
+            builder.withLoggingEnabled(materialized.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        if (materialized.cachingEnabled()) {
+            builder.withCachingEnabled();
+        }
+        return builder;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
new file mode 100644
index 0000000..d7ebc65
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreSupplier;
+
+import java.util.Map;
+
+public class MaterializedInternal<K, V, S extends StateStore> extends Materialized<K, V, S> {
+
+    public MaterializedInternal(final Materialized<K, V, S> materialized) {
+        super(materialized);
+    }
+
+    public String storeName() {
+        if (storeName != null) {
+            return storeName;
+        }
+        return storeSupplier.name();
+    }
+
+    public StoreSupplier<S> storeSupplier() {
+        return storeSupplier;
+    }
+
+    public Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    public Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
+    public boolean loggingEnabled() {
+        return loggingEnabled;
+    }
+
+    public Map<String, String> logConfig() {
+        return topicConfig;
+    }
+
+    public boolean cachingEnabled() {
+        return cachingEnabled;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 0e9bc33..dc59fb4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreamsTest;
@@ -39,11 +40,13 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@@ -446,8 +449,8 @@ public class QueryableStateIntegrationTest {
             }
         };
         final KTable<String, Long> t1 = builder.table(streamOne);
-        final KTable<String, Long> t2 = t1.filter(filterPredicate, "queryFilter");
-        t1.filterNot(filterPredicate, "queryFilterNot");
+        final KTable<String, Long> t2 = t1.filter(filterPredicate, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryFilter"));
+        t1.filterNot(filterPredicate, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryFilterNot"));
         t2.to(outputTopic);
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -509,7 +512,7 @@ public class QueryableStateIntegrationTest {
             public Long apply(final String value) {
                 return Long.valueOf(value);
             }
-        }, Serdes.Long(), "queryMapValues");
+        }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
         t2.to(Serdes.String(), Serdes.Long(), outputTopic);
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -559,13 +562,13 @@ public class QueryableStateIntegrationTest {
             }
         };
         final KTable<String, String> t1 = builder.table(streamOne);
-        final KTable<String, String> t2 = t1.filter(filterPredicate, "queryFilter");
+        final KTable<String, String> t2 = t1.filter(filterPredicate, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("queryFilter"));
         final KTable<String, Long> t3 = t2.mapValues(new ValueMapper<String, Long>() {
             @Override
             public Long apply(final String value) {
                 return Long.valueOf(value);
             }
-        }, Serdes.Long(), "queryMapValues");
+        }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
         t3.to(Serdes.String(), Serdes.Long(), outputTopic);
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);

http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 3350072..a885edd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -18,9 +18,12 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
@@ -94,6 +97,7 @@ public class KTableFilterTest {
         doTestKTable(builder, table2, table3, topic1);
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testQueryableKTable() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -118,6 +122,30 @@ public class KTableFilterTest {
         doTestKTable(builder, table2, table3, topic1);
     }
 
+    @Test
+    public void shouldAddQueryableStore() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final String topic1 = "topic1";
+
+        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+
+        KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
+            @Override
+            public boolean test(String key, Integer value) {
+                return (value % 2) == 0;
+            }
+        }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyStoreNameFilter"));
+        KTable<String, Integer> table3 = table1.filterNot(new Predicate<String, Integer>() {
+            @Override
+            public boolean test(String key, Integer value) {
+                return (value % 2) == 0;
+            }
+        });
+
+        doTestKTable(builder, table2, table3, topic1);
+    }
+
     private void doTestValueGetter(final StreamsBuilder builder,
                                    final KTableImpl<String, Integer, Integer> table2,
                                    final KTableImpl<String, Integer, Integer> table3,

http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index f06cc63..64ae6de 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -18,15 +18,18 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
@@ -474,4 +477,23 @@ public class KTableImplTest {
         table.leftJoin(null, MockValueJoiner.TOSTRING_JOINER);
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() {
+        table.filter(new Predicate<String, String>() {
+            @Override
+            public boolean test(final String key, final String value) {
+                return false;
+            }
+        }, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() {
+        table.filterNot(new Predicate<String, String>() {
+            @Override
+            public boolean test(final String key, final String value) {
+                return false;
+            }
+        }, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 2e7ccad..4bfaea6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -18,11 +18,14 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
@@ -92,7 +95,7 @@ public class KTableMapValuesTest {
             public Integer apply(CharSequence value) {
                 return value.charAt(0) - 48;
             }
-        }, Serdes.Integer(), "anyName");
+        }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyName").withValueSerde(Serdes.Integer()));
 
         MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
         table2.toStream().process(proc2);
@@ -249,14 +252,14 @@ public class KTableMapValuesTest {
                 public Integer apply(String value) {
                     return new Integer(value);
                 }
-            }, Serdes.Integer(), "anyMapName");
+            }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyMapName").withValueSerde(Serdes.Integer()));
         KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
             new Predicate<String, Integer>() {
                 @Override
                 public boolean test(String key, Integer value) {
                     return (value % 2) == 0;
                 }
-            }, "anyFilterName");
+            }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyFilterName").withValueSerde(Serdes.Integer()));
         KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
             table1.through(stringSerde, stringSerde, topic2, storeName2);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
new file mode 100644
index 0000000..21a5d57
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
+import org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+import org.easymock.EasyMock;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.hamcrest.core.IsNot.not;
+
+
+public class KeyValueStoreMaterializerTest {
+
+    @Test
+    public void shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnabled() {
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
+        final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
+        final KeyValueStore<String, String> store = builder.build();
+        final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+        final StateStore logging = caching.wrappedStore();
+        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        assertThat(caching, instanceOf(CachedStateStore.class));
+        assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+    }
+
+    @Test
+    public void shouldCreateBuilderThatBuildsStoreWithCachingDisabled() {
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
+                                                     .withCachingDisabled());
+        final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
+        final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
+        final KeyValueStore<String, String> store = builder.build();
+        final WrappedStateStore logging = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+        assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+    }
+
+    @Test
+    public void shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() {
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
+                                                     .withLoggingDisabled());
+        final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
+        final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
+        final KeyValueStore<String, String> store = builder.build();
+        final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+        assertThat(caching, instanceOf(CachedStateStore.class));
+        assertThat(caching.wrappedStore(), not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
+    }
+
+    @Test
+    public void shouldCreateBuilderThatBuildsStoreWithCachingAndLoggingDisabled() {
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
+                                                     .withCachingDisabled()
+                                                     .withLoggingDisabled());
+        final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
+        final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
+        final KeyValueStore<String, String> store = builder.build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        assertThat(wrapped, not(instanceOf(CachedStateStore.class)));
+        assertThat(wrapped, not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
+    }
+
+    @Test
+    public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() {
+        final KeyValueBytesStoreSupplier supplier = EasyMock.createNiceMock(KeyValueBytesStoreSupplier.class);
+        final InMemoryKeyValueStore<Bytes, byte[]> store = new InMemoryKeyValueStore<>("name", Serdes.Bytes(), Serdes.ByteArray());
+        EasyMock.expect(supplier.name()).andReturn("name").anyTimes();
+        EasyMock.expect(supplier.get()).andReturn(store);
+        EasyMock.replay(supplier);
+
+        final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>> materialized
+                = new MaterializedInternal<>(Materialized.<String, Integer>as(supplier));
+        final KeyValueStoreMaterializer<String, Integer> materializer = new KeyValueStoreMaterializer<>(materialized);
+        final StoreBuilder<KeyValueStore<String, Integer>> builder = materializer.materialize();
+        final KeyValueStore<String, Integer> built = builder.build();
+        final StateStore inner = ((WrappedStateStore) built).inner();
+
+        assertThat(inner, CoreMatchers.<StateStore>equalTo(store));
+    }
+
+}
\ No newline at end of file