You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/04 21:42:11 UTC

[2/2] kafka git commit: MINOR: add suppress warnings annotations in Streams API

MINOR: add suppress warnings annotations in Streams API

 - fixes examples with regard to new API
 - fixes `Topology#addGlobalStore` parameters

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>

Closes #4003 from mjsax/minor-deprecated


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/713a67fd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/713a67fd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/713a67fd

Branch: refs/heads/trunk
Commit: 713a67fddaec3fa9cd7cce53dd6fef5ab6e0cdab
Parents: 51c652c
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Oct 4 14:42:07 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 4 14:42:07 2017 -0700

----------------------------------------------------------------------
 .../examples/pageview/PageViewTypedDemo.java    | 77 +++++++++----------
 .../examples/pageview/PageViewUntypedDemo.java  | 66 ++++++++--------
 .../examples/temperature/TemperatureDemo.java   | 54 ++++++-------
 .../examples/wordcount/WordCountDemo.java       | 29 +++----
 .../java/org/apache/kafka/streams/Topology.java |  6 +-
 .../kafka/streams/kstream/KGroupedStream.java   | 80 +++++++++-----------
 .../kafka/streams/kstream/KGroupedTable.java    |  7 +-
 .../kafka/streams/kstream/KStreamBuilder.java   | 30 ++++----
 .../apache/kafka/streams/kstream/KTable.java    | 23 +++---
 .../kstream/internals/AbstractStream.java       | 10 +--
 .../internals/InternalStreamsBuilder.java       |  5 +-
 .../kstream/internals/KGroupedStreamImpl.java   | 59 ++++++++++-----
 .../kstream/internals/KGroupedTableImpl.java    | 36 +++++----
 .../streams/kstream/internals/KStreamImpl.java  | 36 ++++++++-
 .../kstream/internals/KStreamKStreamJoin.java   |  1 +
 .../streams/kstream/internals/KTableImpl.java   | 77 ++++++++++++++-----
 .../kstream/internals/WindowedSerializer.java   |  5 +-
 .../streams/processor/TopologyBuilder.java      |  1 +
 .../processor/internals/AbstractTask.java       |  2 +-
 .../internals/InternalTopologyBuilder.java      | 11 +--
 .../streams/processor/internals/QuickUnion.java |  1 +
 .../processor/internals/StreamsMetricsImpl.java |  2 +-
 .../org/apache/kafka/streams/state/Stores.java  | 12 +--
 .../state/internals/AbstractStoreSupplier.java  |  5 +-
 .../InMemoryKeyValueStoreSupplier.java          |  1 +
 .../InMemoryLRUCacheStoreSupplier.java          |  1 +
 .../internals/MeteredKeyValueBytesStore.java    |  1 +
 .../state/internals/MeteredSessionStore.java    |  1 +
 .../state/internals/MeteredWindowStore.java     |  2 +-
 .../internals/RocksDBKeyValueStoreSupplier.java |  2 +-
 .../internals/RocksDBSessionStoreSupplier.java  |  1 +
 .../internals/RocksDBWindowStoreSupplier.java   |  2 +-
 .../RocksDbSessionBytesStoreSupplier.java       | 17 +++--
 .../RocksDbWindowBytesStoreSupplier.java        |  7 +-
 .../state/internals/WindowStoreSupplier.java    |  6 +-
 .../kafka/test/MockStateStoreSupplier.java      |  2 +-
 36 files changed, 394 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 068eece..101cd23 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
@@ -150,45 +151,45 @@ public class PageViewTypedDemo {
                                                           Consumed.with(Serdes.String(), userProfileSerde));
 
         KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
-                .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
-                    @Override
-                    public PageViewByRegion apply(PageView view, UserProfile profile) {
-                        PageViewByRegion viewByRegion = new PageViewByRegion();
-                        viewByRegion.user = view.user;
-                        viewByRegion.page = view.page;
-
-                        if (profile != null) {
-                            viewByRegion.region = profile.region;
-                        } else {
-                            viewByRegion.region = "UNKNOWN";
-                        }
-                        return viewByRegion;
+            .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
+                @Override
+                public PageViewByRegion apply(PageView view, UserProfile profile) {
+                    PageViewByRegion viewByRegion = new PageViewByRegion();
+                    viewByRegion.user = view.user;
+                    viewByRegion.page = view.page;
+
+                    if (profile != null) {
+                        viewByRegion.region = profile.region;
+                    } else {
+                        viewByRegion.region = "UNKNOWN";
                     }
-                })
-                .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
-                    @Override
-                    public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
-                        return new KeyValue<>(viewRegion.region, viewRegion);
-                    }
-                })
-                .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
-                .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
-                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
-                .toStream()
-                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
-                    @Override
-                    public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
-                        WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
-                        wViewByRegion.windowStart = key.window().start();
-                        wViewByRegion.region = key.key();
-
-                        RegionCount rCount = new RegionCount();
-                        rCount.region = key.key();
-                        rCount.count = value;
-
-                        return new KeyValue<>(wViewByRegion, rCount);
-                    }
-                });
+                    return viewByRegion;
+                }
+            })
+            .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
+                @Override
+                public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
+                    return new KeyValue<>(viewRegion.region, viewRegion);
+                }
+            })
+            .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
+            .windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1)))
+            .count()
+            .toStream()
+            .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
+                @Override
+                public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
+                    WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
+                    wViewByRegion.windowStart = key.window().start();
+                    wViewByRegion.region = key.key();
+
+                    RegionCount rCount = new RegionCount();
+                    rCount.region = key.key();
+                    rCount.count = value;
+
+                    return new KeyValue<>(wViewByRegion, rCount);
+                }
+            });
 
         // write to the result topic
         regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde, regionCountSerde));

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index c20c077..ae72042 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -87,39 +87,39 @@ public class PageViewUntypedDemo {
         });
 
         KStream<JsonNode, JsonNode> regionCount = views
-                .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
-                    @Override
-                    public JsonNode apply(JsonNode view, String region) {
-                        ObjectNode jNode = JsonNodeFactory.instance.objectNode();
-
-                        return jNode.put("user", view.get("user").textValue())
-                                .put("page", view.get("page").textValue())
-                                .put("region", region == null ? "UNKNOWN" : region);
-                    }
-                })
-                .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
-                    @Override
-                    public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
-                        return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
-                    }
-                })
-                .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
-                .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
-                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
-                .toStream()
-                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
-                    @Override
-                    public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
-                        ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
-                        keyNode.put("window-start", key.window().start())
-                                .put("region", key.key());
-
-                        ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
-                        valueNode.put("count", value);
-
-                        return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
-                    }
-                });
+            .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
+                @Override
+                public JsonNode apply(JsonNode view, String region) {
+                    ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+                    return jNode.put("user", view.get("user").textValue())
+                            .put("page", view.get("page").textValue())
+                            .put("region", region == null ? "UNKNOWN" : region);
+                }
+            })
+            .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
+                @Override
+                public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
+                    return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
+                }
+            })
+            .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
+            .windowedBy(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
+            .count()
+            .toStream()
+            .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
+                @Override
+                public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
+                    ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
+                    keyNode.put("window-start", key.window().start())
+                            .put("region", key.key());
+
+                    ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
+                    valueNode.put("count", value);
+
+                    return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
+                }
+            });
 
         // write to the result topic
         regionCount.to("streams-pageviewstats-untyped-output", Produced.with(jsonSerde, jsonSerde));

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index 2039ca5..ea81dd6 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -87,38 +88,39 @@ public class TemperatureDemo {
         KStream<String, String> source = builder.stream("iot-temperature");
 
         KStream<Windowed<String>, String> max = source
-                // temperature values are sent without a key (null), so in order
-                // to group and reduce them, a key is needed ("temp" has been chosen)
-                .selectKey(new KeyValueMapper<String, String, String>() {
-                    @Override
-                    public String apply(String key, String value) {
-                        return "temp";
-                    }
-                })
-                .groupByKey()
-                .reduce(new Reducer<String>() {
-                    @Override
-                    public String apply(String value1, String value2) {
-                        if (Integer.parseInt(value1) > Integer.parseInt(value2))
-                            return value1;
-                        else
-                            return value2;
-                    }
-                }, TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
-                .toStream()
-                .filter(new Predicate<Windowed<String>, String>() {
-                    @Override
-                    public boolean test(Windowed<String> key, String value) {
-                        return Integer.parseInt(value) > TEMPERATURE_THRESHOLD;
-                    }
-                });
+            // temperature values are sent without a key (null), so in order
+            // to group and reduce them, a key is needed ("temp" has been chosen)
+            .selectKey(new KeyValueMapper<String, String, String>() {
+                @Override
+                public String apply(String key, String value) {
+                    return "temp";
+                }
+            })
+            .groupByKey()
+            .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
+            .reduce(new Reducer<String>() {
+                @Override
+                public String apply(String value1, String value2) {
+                    if (Integer.parseInt(value1) > Integer.parseInt(value2))
+                        return value1;
+                    else
+                        return value2;
+                }
+            })
+            .toStream()
+            .filter(new Predicate<Windowed<String>, String>() {
+                @Override
+                public boolean test(Windowed<String> key, String value) {
+                    return Integer.parseInt(value) > TEMPERATURE_THRESHOLD;
+                }
+            });
 
         WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
         WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), TEMPERATURE_WINDOW_SIZE);
         Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
 
         // need to override key serde to Windowed<String> type
-        max.to(windowedSerde, Serdes.String(), "iot-temperature-max");
+        max.to("iot-temperature-max", Produced.with(windowedSerde, Serdes.String()));
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final CountDownLatch latch = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 5689d50..7535315 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueMapper;
 
 import java.util.Arrays;
@@ -63,22 +64,22 @@ public class WordCountDemo {
         KStream<String, String> source = builder.stream("streams-plaintext-input");
 
         KTable<String, Long> counts = source
-                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
-                    @Override
-                    public Iterable<String> apply(String value) {
-                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
-                    }
-                })
-                .groupBy(new KeyValueMapper<String, String, String>() {
-                    @Override
-                    public String apply(String key, String value) {
-                        return value;
-                    }
-                })
-                .count("Counts");
+            .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                @Override
+                public Iterable<String> apply(String value) {
+                    return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
+                }
+            })
+            .groupBy(new KeyValueMapper<String, String, String>() {
+                @Override
+                public String apply(String key, String value) {
+                    return value;
+                }
+            })
+            .count();
 
         // need to override value serde to Long type
-        counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
+        counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final CountDownLatch latch = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 1409b97..3b1ac6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 
 import java.util.regex.Pattern;
 
@@ -573,6 +572,7 @@ public class Topology {
      * @return itself
      * @throws TopologyException if the processor of state is already registered
      */
+    @SuppressWarnings("unchecked")
     public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
                                                 final String sourceName,
                                                 final Deserializer keyDeserializer,
@@ -609,7 +609,8 @@ public class Topology {
      * @return itself
      * @throws TopologyException if the processor of state is already registered
      */
-    public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder,
+    @SuppressWarnings("unchecked")
+    public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
                                                 final String sourceName,
                                                 final TimestampExtractor timestampExtractor,
                                                 final Deserializer keyDeserializer,
@@ -641,6 +642,7 @@ public class Topology {
      *
      * @return a description of the topology.
      */
+
     public synchronized TopologyDescription describe() {
         return internalTopologyBuilder.describe();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 17f2db4..b3945f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.SessionStore;
@@ -132,7 +131,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * String queryableStoreName = storeSupplier.name();
@@ -149,7 +148,7 @@ public interface KGroupedStream<K, V> {
      * @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
-    KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+    KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Count the number of records in this stream by the grouped key.
@@ -290,7 +289,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * String queryableStoreName = storeSupplier.name();
@@ -312,7 +311,7 @@ public interface KGroupedStream<K, V> {
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
-                                                       final StateStoreSupplier<WindowStore> storeSupplier);
+                                                       final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier);
 
 
     /**
@@ -333,10 +332,8 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
-     * Sting queryableStoreName = storeSupplier.name();
      * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
      * String key = "some-key";
      * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -398,7 +395,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
      * Sting queryableStoreName = storeSupplier.name();
@@ -418,7 +415,7 @@ public interface KGroupedStream<K, V> {
      */
     @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
-                                    final StateStoreSupplier<SessionStore> storeSupplier);
+                                    final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier);
 
     /**
      * Combine the values of records in this stream by the grouped key.
@@ -522,7 +519,7 @@ public interface KGroupedStream<K, V> {
      * Combine the value of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
      * Combining implies that the type of the aggregate result is the same as the type of the input value
-     * (c.f. {@link #aggregate(Initializer, Aggregator, StateStoreSupplier)}).
+     * (c.f. {@link #aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)}).
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * provided by the given {@code storeSupplier}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -540,8 +537,8 @@ public interface KGroupedStream<K, V> {
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
      * value as-is.
-     * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
-     * max.
+     * Thus, {@code reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to compute
+     * aggregate functions like sum, min, or max.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same key.
@@ -552,7 +549,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
      * String queryableStoreName = storeSupplier.name();
@@ -571,7 +568,7 @@ public interface KGroupedStream<K, V> {
      */
     @Deprecated
     KTable<K, V> reduce(final Reducer<V> reducer,
-                        final StateStoreSupplier<KeyValueStore> storeSupplier);
+                        final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Combine the value of records in this stream by the grouped key.
@@ -595,7 +592,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
      * value as-is.
-     * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
+     * Thus, {@code reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
      * max.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
@@ -767,8 +764,8 @@ public interface KGroupedStream<K, V> {
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
      * value as-is.
-     * Thus, {@code reduce(Reducer, Windows, StateStoreSupplier)} can be used to compute aggregate functions like sum,
-     * min, or max.
+     * Thus, {@code reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to
+     * compute aggregate functions like sum, min, or max.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -779,7 +776,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
      * Sting queryableStoreName = storeSupplier.name();
@@ -803,7 +800,7 @@ public interface KGroupedStream<K, V> {
     @Deprecated
     <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                      final Windows<W> windows,
-                                                     final StateStoreSupplier<WindowStore> storeSupplier);
+                                                     final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier);
 
     /**
      * Combine values of this stream by the grouped key into {@link SessionWindows}.
@@ -841,10 +838,8 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
-     * Sting queryableStoreName = storeSupplier.name();
      * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
      * String key = "some-key";
      * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -933,8 +928,8 @@ public interface KGroupedStream<K, V> {
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
      * value as-is.
-     * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute aggregate functions like
-     * sum, min, or max.
+     * Thus, {@code reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used
+     * to compute aggregate functions like sum, min, or max.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -945,7 +940,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
      * Sting queryableStoreName = storeSupplier.name();
@@ -975,7 +970,7 @@ public interface KGroupedStream<K, V> {
     @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                   final SessionWindows sessionWindows,
-                                  final StateStoreSupplier<SessionStore> storeSupplier);
+                                  final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier);
 
 
     /**
@@ -1188,8 +1183,8 @@ public interface KGroupedStream<K, V> {
     /**
      * Aggregate the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, StateStoreSupplier) combining via reduce(...)} as it,
-     * for example, allows the result to have a different type than the input values.
+     * Aggregating is a generalization of {@link #reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)
+     * combining via reduce(...)} as it, for example, allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * provided by the given {@code storeSupplier}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -1199,8 +1194,8 @@ public interface KGroupedStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@code aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to compute aggregate functions
-     * like count (c.f. {@link #count()}).
+     * Thus, {@code aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)} can be
+     * used to compute aggregate functions like count (c.f. {@link #count()}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same key.
@@ -1211,7 +1206,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // some aggregation on value type double
      * Sting queryableStoreName = storeSupplier.name();
@@ -1233,7 +1228,7 @@ public interface KGroupedStream<K, V> {
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> aggregator,
-                                 final StateStoreSupplier<KeyValueStore> storeSupplier);
+                                 final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Aggregate the values of records in this stream by the grouped key and defined windows.
@@ -1362,8 +1357,9 @@ public interface KGroupedStream<K, V> {
     /**
      * Aggregate the values of records in this stream by the grouped key and defined windows.
      * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, Windows, StateStoreSupplier) combining via
-     * reduce(...)} as it, for example, allows the result to have a different type than the input values.
+     * Aggregating is a generalization of
+     * {@link #reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) combining via reduce(...)}
+     * as it, for example, allows the result to have a different type than the input values.
      * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
      * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
      * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
@@ -1377,8 +1373,8 @@ public interface KGroupedStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@code aggregate(Initializer, Aggregator, Windows, StateStoreSupplier)} can be used to compute aggregate
-     * functions like count (c.f. {@link #count(Windows)}).
+     * Thus, {@code aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)}
+     * can be used to compute aggregate functions like count (c.f. {@link #count(Windows)}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1389,7 +1385,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // some windowed aggregation on value type Long
      * Sting queryableStoreName = storeSupplier.name();
@@ -1416,7 +1412,7 @@ public interface KGroupedStream<K, V> {
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                              final Aggregator<? super K, ? super V, VR> aggregator,
                                                              final Windows<W> windows,
-                                                             final StateStoreSupplier<WindowStore> storeSupplier);
+                                                             final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier);
 
     /**
      * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
@@ -1446,10 +1442,8 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // some windowed aggregation on value type double
-     * Sting queryableStoreName = storeSupplier.name();
      * ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore());
      * String key = "some-key";
      * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -1540,8 +1534,8 @@ public interface KGroupedStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, StateStoreSupplier)} can be used
-     * to compute aggregate functions like count (c.f. {@link #count(SessionWindows)}).
+     * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, org.apache.kafka.streams.processor.StateStoreSupplier)}
+     * can be used to compute aggregate functions like count (c.f. {@link #count(SessionWindows)}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1552,7 +1546,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // some windowed aggregation on value type double
      * Sting queryableStoreName = storeSupplier.name();
@@ -1583,7 +1577,7 @@ public interface KGroupedStream<K, V> {
                                          final Merger<? super K, T> sessionMerger,
                                          final SessionWindows sessionWindows,
                                          final Serde<T> aggValueSerde,
-                                         final StateStoreSupplier<SessionStore> storeSupplier);
+                                         final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier);
 
     /**
      * Create a new {@link TimeWindowedKStream} instance that can be used to perform windowed aggregations.

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index f814eaf..d0a38cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 
@@ -195,7 +194,7 @@ public interface KGroupedTable<K, V> {
      * @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier)}
      */
     @Deprecated
-    KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+    KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -467,7 +466,7 @@ public interface KGroupedTable<K, V> {
     @Deprecated
     KTable<K, V> reduce(final Reducer<V> adder,
                         final Reducer<V> subtractor,
-                        final StateStoreSupplier<KeyValueStore> storeSupplier);
+                        final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -945,6 +944,6 @@ public interface KGroupedTable<K, V> {
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> adder,
                                  final Aggregator<? super K, ? super V, VR> subtractor,
-                                 final StateStoreSupplier<KeyValueStore> storeSupplier);
+                                 final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index d4642da..6b51c86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
 import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
@@ -431,7 +430,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final String topic,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         return table(null, null, null, null, topic, storeSupplier);
     }
 
@@ -525,7 +524,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
                                      final String topic,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         return table(offsetReset, null, null, null, topic, storeSupplier);
     }
 
@@ -702,7 +701,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
     public <K, V> KTable<K, V> table(final Serde<K> keySerde,
                                      final Serde<V> valSerde,
                                      final String topic,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         return table(null, null, keySerde, valSerde, topic, storeSupplier);
     }
 
@@ -737,7 +736,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
                                         final Serde<V> valSerde,
                                         final TimestampExtractor timestampExtractor,
                                         final String topic,
-                                        final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                        final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
                                         final boolean isQueryable) {
         try {
             final String source = newName(KStreamImpl.SOURCE_NAME);
@@ -882,6 +881,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
      * @return a {@link KTable} for the specified topic
      */
+    @SuppressWarnings("unchecked")
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
                                      final TimestampExtractor timestampExtractor,
                                      final Serde<K> keySerde,
@@ -889,12 +889,13 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
                                      final String topic,
                                      final String queryableStoreName) {
         final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
-        final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
-                keySerde,
-                valSerde,
-                false,
-                Collections.<String, String>emptyMap(),
-                true);
+        final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(
+            internalStoreName,
+            keySerde,
+            valSerde,
+            false,
+            Collections.<String, String>emptyMap(),
+            true);
         return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null);
     }
 
@@ -965,7 +966,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
                                      final Serde<K> keySerde,
                                      final Serde<V> valSerde,
                                      final String topic,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, true);
     }
@@ -1096,7 +1097,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
     public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
                                                  final Serde<V> valSerde,
                                                  final String topic,
-                                                 final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                                 final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
     }
 
@@ -1172,7 +1173,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
                                                     final Serde<V> valSerde,
                                                     final TimestampExtractor timestampExtractor,
                                                     final String topic,
-                                                    final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                                    final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         try {
             Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
             final String sourceName = newName(KStreamImpl.SOURCE_NAME);
@@ -1224,6 +1225,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * @param streams the {@link KStream}s to be merged
      * @return a {@link KStream} containing all records of the given streams
      */
+    @SuppressWarnings("unchecked")
     public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
         Objects.requireNonNull(streams, "streams can't be null");
         if (streams.length <= 1) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 1abc5e7..33e56aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -206,7 +205,8 @@ public interface KTable<K, V> {
      * @deprecated use {@link #filter(Predicate, Materialized) filter(predicate, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
-    KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
+    KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
+                        final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
@@ -300,7 +300,8 @@ public interface KTable<K, V> {
      * @deprecated use {@link #filterNot(Predicate, Materialized) filterNot(predicate, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
-    KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
+    KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
+                           final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
@@ -512,7 +513,7 @@ public interface KTable<K, V> {
     @Deprecated
     <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
                                  final Serde<VR> valueSerde,
-                                 final StateStoreSupplier<KeyValueStore> storeSupplier);
+                                 final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
 
     /**
@@ -811,7 +812,7 @@ public interface KTable<K, V> {
      */
     @Deprecated
     KTable<K, V> through(final String topic,
-                         final StateStoreSupplier<KeyValueStore> storeSupplier);
+                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
@@ -913,7 +914,7 @@ public interface KTable<K, V> {
     @Deprecated
     KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                          final String topic,
-                         final StateStoreSupplier<KeyValueStore> storeSupplier);
+                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
@@ -978,7 +979,7 @@ public interface KTable<K, V> {
     KTable<K, V> through(final Serde<K> keySerde,
                          final Serde<V> valSerde,
                          final String topic,
-                         final StateStoreSupplier<KeyValueStore> storeSupplier);
+                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
@@ -1080,7 +1081,7 @@ public interface KTable<K, V> {
                          final Serde<V> valSerde,
                          final StreamPartitioner<? super K, ? super V> partitioner,
                          final String topic,
-                         final StateStoreSupplier<KeyValueStore> storeSupplier);
+                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
@@ -1590,7 +1591,7 @@ public interface KTable<K, V> {
     @Deprecated
     <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
-                                final StateStoreSupplier<KeyValueStore> storeSupplier);
+                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
 
     /**
@@ -1934,7 +1935,7 @@ public interface KTable<K, V> {
     @Deprecated
     <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                     final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
-                                    final StateStoreSupplier<KeyValueStore> storeSupplier);
+                                    final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
 
     /**
@@ -2275,7 +2276,7 @@ public interface KTable<K, V> {
     @Deprecated
     <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                      final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier);
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Get the name of the local state store used that can be used to query this {@code KTable}.

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index b5de562..26e404e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
@@ -80,8 +79,8 @@ public abstract class AbstractStream<K> {
         };
     }
 
-    @SuppressWarnings("unchecked")
-    static <T, K>  StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
+    @SuppressWarnings({"unchecked", "deprecation"})
+    static <T, K>  org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
                                                                    final Serde<T> aggValueSerde,
                                                                    final String storeName) {
         Objects.requireNonNull(storeName, "storeName can't be null");
@@ -89,8 +88,8 @@ public abstract class AbstractStream<K> {
         return storeFactory(keySerde, aggValueSerde, storeName).build();
     }
 
-    @SuppressWarnings("unchecked")
-    static  <W extends Window, T, K> StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
+    @SuppressWarnings({"unchecked", "deprecation"})
+    static  <W extends Window, T, K> org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
                                                                                    final Serde<T> aggValSerde,
                                                                                    final Windows<W> windows,
                                                                                    final String storeName) {
@@ -101,6 +100,7 @@ public abstract class AbstractStream<K> {
                 .build();
     }
 
+    @SuppressWarnings("deprecation")
     static  <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<K> keySerde,
                                                                        final Serde<T> aggValueSerde,
                                                                        final String storeName) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 357a70c..0df1524 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -70,9 +69,10 @@ public class InternalStreamsBuilder {
         return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
 
+    @SuppressWarnings("deprecation")
     public <K, V> KTable<K, V> table(final String topic,
                                      final ConsumedInternal<K, V> consumed,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         final String source = newName(KStreamImpl.SOURCE_NAME);
         final String name = newName(KTableImpl.SOURCE_NAME);
@@ -132,6 +132,7 @@ public class InternalStreamsBuilder {
                                 consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeName, isQueryable);
     }
 
+    @SuppressWarnings("unchecked")
     public <K, V> GlobalKTable<K, V> globalTable(final String topic,
                                                  final ConsumedInternal<K, V> consumed,
                                                  final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 64dfd19..dafaa62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -28,11 +28,10 @@ import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindowedKStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -78,6 +77,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         } // no need for else {} since isQueryable is true by default
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> reduce(final Reducer<V> reducer,
                                final String queryableStoreName) {
@@ -91,9 +91,10 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return reduce(reducer, (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> reduce(final Reducer<V> reducer,
-                               final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                               final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doAggregate(
@@ -115,7 +116,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                 materializedInternal);
     }
 
-
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                             final Windows<W> windows,
@@ -124,17 +125,18 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, getOrCreateName(queryableStoreName, REDUCE_NAME)));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                             final Windows<W> windows) {
         return windowedBy(windows).reduce(reducer);
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                             final Windows<W> windows,
-                                                            final StateStoreSupplier<WindowStore> storeSupplier) {
+                                                            final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) {
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(windows, "windows can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -145,6 +147,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         );
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> aggregator,
@@ -176,7 +179,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     }
 
     @Override
-    public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator) {
+    public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                        final Aggregator<? super K, ? super V, VR> aggregator) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         final String storeName = builder.newStoreName(AGGREGATE_NAME);
@@ -189,6 +193,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> aggregator,
@@ -196,10 +201,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return aggregate(initializer, aggregator, aggValueSerde, null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> aggregator,
-                                      final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                      final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -209,6 +215,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                 storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                                   final Aggregator<? super K, ? super V, T> aggregator,
@@ -219,6 +226,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                                   final Aggregator<? super K, ? super V, T> aggregator,
@@ -230,12 +238,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                                      .withValueSerde(aggValueSerde));
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                                   final Aggregator<? super K, ? super V, T> aggregator,
                                                                   final Windows<W> windows,
-                                                                  final StateStoreSupplier<WindowStore> storeSupplier) {
+                                                                  final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(windows, "windows can't be null");
@@ -247,6 +255,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         );
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, Long> count(final String queryableStoreName) {
         determineIsQueryable(queryableStoreName);
@@ -258,8 +267,9 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return count((String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
-    public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
+    public KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, storeSupplier);
     }
 
@@ -274,6 +284,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, materialized);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
                                                               final String queryableStoreName) {
@@ -281,14 +292,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return count(windows, windowedStore(keySerde, Serdes.Long(), windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows) {
         return windowedBy(windows).count();
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
-                                                              final StateStoreSupplier<WindowStore> storeSupplier) {
+                                                              final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) {
         return aggregate(
                 aggregateBuilder.countInitializer,
                 aggregateBuilder.countAggregator,
@@ -296,7 +309,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                 storeSupplier);
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                 final Aggregator<? super K, ? super V, T> aggregator,
@@ -316,6 +329,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                 final Aggregator<? super K, ? super V, T> aggregator,
@@ -330,14 +344,14 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                                             .withValueSerde(aggValueSerde));
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                 final Aggregator<? super K, ? super V, T> aggregator,
                                                 final Merger<? super K, T> sessionMerger,
                                                 final SessionWindows sessionWindows,
                                                 final Serde<T> aggValueSerde,
-                                                final StateStoreSupplier<SessionStore> storeSupplier) {
+                                                final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
@@ -373,7 +387,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                                 aggregateBuilder);
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName) {
         Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized = Materialized.<K, Long, SessionStore<Bytes, byte[]>>as(getOrCreateName(queryableStoreName, AGGREGATE_NAME))
                 .withKeySerde(keySerde)
@@ -381,13 +395,15 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return windowedBy(sessionWindows).count(materialized);
     }
 
+    @SuppressWarnings("deprecation")
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows) {
         return windowedBy(sessionWindows).count();
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
-                                           final StateStoreSupplier<SessionStore> storeSupplier) {
+                                           final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) {
         Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         final Merger<K, Long> sessionMerger = new Merger<K, Long>() {
@@ -406,7 +422,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     }
 
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                          final SessionWindows sessionWindows,
@@ -418,6 +434,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                               .sessionWindowed(sessionWindows.maintainMs()).build());
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                          final SessionWindows sessionWindows) {
@@ -425,10 +442,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return windowedBy(sessionWindows).reduce(reducer);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                          final SessionWindows sessionWindows,
-                                         final StateStoreSupplier<SessionStore> storeSupplier) {
+                                         final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) {
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -471,10 +489,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     }
 
+    @SuppressWarnings("deprecation")
     private <T> KTable<K, T> doAggregate(
             final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
             final String functionName,
-            final StateStoreSupplier storeSupplier) {
+            final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier) {
 
         final String aggFunctionName = builder.newName(functionName);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index e69d4f9..507944a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -21,15 +21,14 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedTable;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Collections;
@@ -71,11 +70,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         }
     };
 
-    public KGroupedTableImpl(final InternalStreamsBuilder builder,
-                             final String name,
-                             final String sourceName,
-                             final Serde<? extends K> keySerde,
-                             final Serde<? extends V> valSerde) {
+    KGroupedTableImpl(final InternalStreamsBuilder builder,
+                      final String name,
+                      final String sourceName,
+                      final Serde<? extends K> keySerde,
+                      final Serde<? extends V> valSerde) {
         super(builder, name, Collections.singleton(sourceName));
         this.keySerde = keySerde;
         this.valSerde = valSerde;
@@ -88,6 +87,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         } // no need for else {} since isQueryable is true by default
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> adder,
@@ -98,6 +98,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> adder,
@@ -106,6 +107,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return aggregate(initializer, adder, subtractor, aggValueSerde, null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> adder,
@@ -122,11 +124,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return aggregate(initializer, adder, subtractor, (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> adder,
                                       final Aggregator<? super K, ? super V, T> subtractor,
-                                      final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                      final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(adder, "adder can't be null");
         Objects.requireNonNull(subtractor, "subtractor can't be null");
@@ -135,9 +138,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return doAggregate(aggregateSupplier, AGGREGATE_NAME, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
                                          final String functionName,
-                                         final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         final String sinkName = builder.newName(KStreamImpl.SINK_NAME);
         final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
         final String funcName = builder.newName(functionName);
@@ -194,6 +198,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), materialized.storeName(), isQueryable);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> reduce(final Reducer<V> adder,
                                final Reducer<V> subtractor,
@@ -223,10 +228,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return reduce(adder, subtractor, (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> reduce(final Reducer<V> adder,
                                final Reducer<V> subtractor,
-                               final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                               final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(adder, "adder can't be null");
         Objects.requireNonNull(subtractor, "subtractor can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -234,6 +240,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return doAggregate(aggregateSupplier, REDUCE_NAME, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, Long> count(final String queryableStoreName) {
         determineIsQueryable(queryableStoreName);
@@ -271,8 +278,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return count((String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
-    public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
+    public KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         return this.aggregate(
                 countInitializer,
                 countAdder,