You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/04 21:42:10 UTC

[1/2] kafka git commit: MINOR: add suppress warnings annotations in Streams API

Repository: kafka
Updated Branches:
  refs/heads/trunk 51c652c40 -> 713a67fdd


http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ae3808e..cbbe848 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -183,23 +183,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
     }
-    
+
+    @SuppressWarnings("deprecation")
     @Override
     public void print() {
         print(defaultKeyValueMapper, null, null, this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final String label) {
         print(defaultKeyValueMapper, null, null, label);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final Serde<K> keySerde,
                       final Serde<V> valSerde) {
         print(defaultKeyValueMapper, keySerde, valSerde, this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final Serde<K> keySerde,
                       final Serde<V> valSerde,
@@ -207,17 +211,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         print(defaultKeyValueMapper, keySerde, valSerde, label);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final KeyValueMapper<? super K, ? super V, String> mapper) {
         print(mapper, null, null, this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final KeyValueMapper<? super K, ? super V, String> mapper,
                       final String label) {
         print(mapper, null, null, label);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final KeyValueMapper<? super K, ? super V, String> mapper,
                       final Serde<K> keySerde,
@@ -225,6 +232,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         print(mapper, keySerde, valSerde, this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final KeyValueMapper<? super K, ? super V, String> mapper,
                       final Serde<K> keySerde,
@@ -243,17 +251,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         builder.internalTopologyBuilder.addProcessor(name, printedInternal.build(this.name), this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath) {
         writeAsText(filePath, this.name, null, null, defaultKeyValueMapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final String label) {
         writeAsText(filePath, label, null, null, defaultKeyValueMapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final Serde<K> keySerde,
@@ -261,6 +272,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         writeAsText(filePath, this.name, keySerde, valSerde, defaultKeyValueMapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final String label,
@@ -269,12 +281,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         writeAsText(filePath, label, keySerde, valSerde, defaultKeyValueMapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final KeyValueMapper<? super K, ? super V, String> mapper) {
         writeAsText(filePath, this.name, null, null, mapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final String label,
@@ -282,6 +296,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         writeAsText(filePath, label, null, null, mapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final Serde<K> keySerde,
@@ -290,6 +305,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         writeAsText(filePath, this.name, keySerde, valSerde, mapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final String label,
@@ -368,6 +384,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return new KStreamImpl<>(builder, name, allSourceNodes, requireRepartitioning);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KStream<K, V> through(final Serde<K> keySerde,
                                  final Serde<V> valSerde,
@@ -404,6 +421,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return new KStreamImpl<>(builder, name, sourceNodes, repartitionRequired);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KStream<K, V> through(final Serde<K> keySerde,
                                  final Serde<V> valSerde,
@@ -411,6 +429,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return through(topic, Produced.with(keySerde, valSerde));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                                  final String topic) {
@@ -427,12 +446,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         to(topic, Produced.<K, V>with(null, null, null));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void to(final StreamPartitioner<? super K, ? super V> partitioner,
                    final String topic) {
         to(topic, Produced.streamPartitioner(partitioner));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void to(final Serde<K> keySerde,
                    final Serde<V> valSerde,
@@ -440,7 +461,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         to(topic, Produced.with(keySerde, valSerde));
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public void to(final Serde<K> keySerde,
                    final Serde<V> valSerde,
@@ -459,6 +480,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     }
 
+    @SuppressWarnings("unchecked")
     private void to(final String topic, final ProducedInternal<K, V> produced) {
         final String name = builder.newName(SINK_NAME);
         final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer();
@@ -513,6 +535,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KStream<K, R> join(final KStream<K, V1> other,
                                       final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -540,6 +563,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                              new KStreamImplJoin(false, false));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KStream<K, R> outerJoin(final KStream<K, V1> other,
                                            final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -641,6 +665,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return sourceName;
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other,
                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -696,6 +721,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
                                       final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -732,6 +758,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return new KStreamImpl<>(builder, name, sourceNodes, false);
     }
 
+    @SuppressWarnings("unchecked")
     private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
                                                     final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                                     final boolean leftJoin) {
@@ -768,6 +795,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         }
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
     public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other,
                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                           final Serde<K> keySerde,
@@ -795,6 +824,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                         true);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector,
                                               final Serde<K1> keySerde,
@@ -820,6 +850,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KGroupedStream<K, V> groupByKey(final Serde<K> keySerde,
                                            final Serde<V> valSerde) {
@@ -849,6 +880,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             this.rightOuter = rightOuter;
         }
 
+        @SuppressWarnings("unchecked")
         public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs,
                                                    final KStream<K1, V2> other,
                                                    final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index b322415..4f26767 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -51,6 +51,7 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
         private WindowStore<K, V2> otherWindow;
 
+        @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index a42db0b..db8de1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -33,7 +33,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -142,8 +141,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return this.queryableStoreName;
     }
 
+    @SuppressWarnings("deprecation")
     private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
-                                  final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                  final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
                                   final boolean isFilterNot) {
         Objects.requireNonNull(predicate, "predicate can't be null");
         String name = builder.newName(FILTER_NAME);
@@ -196,19 +196,21 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doFilter(predicate, new MaterializedInternal<>(materialized), false);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
                                final String queryableStoreName) {
-        StateStoreSupplier<KeyValueStore> storeSupplier = null;
+        org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null;
         if (queryableStoreName != null) {
             storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
         }
         return doFilter(predicate, storeSupplier, false);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
-                               final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                               final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doFilter(predicate, storeSupplier, false);
     }
@@ -226,26 +228,29 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doFilter(predicate, new MaterializedInternal<>(materialized), true);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
                                   final String queryableStoreName) {
-        StateStoreSupplier<KeyValueStore> storeSupplier = null;
+        org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null;
         if (queryableStoreName != null) {
             storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
         }
         return doFilter(predicate, storeSupplier, true);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
-                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                  final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doFilter(predicate, storeSupplier, true);
     }
 
+    @SuppressWarnings("deprecation")
     private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper,
                                            final Serde<V1> valueSerde,
-                                           final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                           final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(mapper);
         String name = builder.newName(MAPVALUES_NAME);
         String internalStoreName = null;
@@ -284,21 +289,23 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
                                         final Serde<V1> valueSerde,
                                         final String queryableStoreName) {
-        StateStoreSupplier<KeyValueStore> storeSupplier = null;
+        org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null;
         if (queryableStoreName != null) {
             storeSupplier = keyValueStore(this.keySerde, valueSerde, queryableStoreName);
         }
         return doMapValues(mapper, valueSerde, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public  <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
                                          final Serde<V1> valueSerde,
-                                         final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doMapValues(mapper, valueSerde, storeSupplier);
     }
@@ -322,7 +329,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         print(keySerde, valSerde, this.name);
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public void print(final Serde<K> keySerde,
                       final Serde<V> valSerde,
@@ -356,7 +363,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     /**
      * @throws TopologyException if file is not found
      */
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public void writeAsText(final String filePath,
                             final String label,
@@ -390,6 +397,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final Serde<K> keySerde,
                                 final Serde<V> valSerde,
@@ -408,12 +416,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                      queryableStoreName != null));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final Serde<K> keySerde,
                                 final Serde<V> valSerde,
                                 final StreamPartitioner<? super K, ? super V> partitioner,
                                 final String topic,
-                                final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         to(keySerde, valSerde, partitioner, topic);
 
@@ -421,6 +430,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return builder.table(topic, consumed, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final Serde<K> keySerde,
                                 final Serde<V> valSerde,
@@ -428,6 +438,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                 final String topic) {
         return through(keySerde, valSerde, partitioner, topic, (String) null);
     }
+
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final Serde<K> keySerde,
                                 final Serde<V> valSerde,
@@ -436,15 +448,17 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return through(keySerde, valSerde, null, topic, queryableStoreName);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final Serde<K> keySerde,
                                 final Serde<V> valSerde,
                                 final String topic,
-                                final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return through(keySerde, valSerde, null, topic, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final Serde<K> keySerde,
                                 final Serde<V> valSerde,
@@ -452,6 +466,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return through(keySerde, valSerde, null, topic, (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                                 final String topic,
@@ -459,49 +474,57 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return through(null, null, partitioner, topic, queryableStoreName);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                                 final String topic,
-                                final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return through(null, null, partitioner, topic, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                                 final String topic) {
         return through(null, null, partitioner, topic, (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final String topic,
                                 final String queryableStoreName) {
         return through(null, null, null, topic, queryableStoreName);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final String topic,
-                                final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return through(null, null, null, topic, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final String topic) {
         return through(null, null, null, topic, (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void to(final String topic) {
         to(null, null, null, topic);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void to(final StreamPartitioner<? super K, ? super V> partitioner,
                    final String topic) {
         to(null, null, partitioner, topic);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void to(final Serde<K> keySerde,
                    final Serde<V> valSerde,
@@ -509,6 +532,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         this.toStream().to(keySerde, valSerde, null, topic);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void to(final Serde<K> keySerde,
                    final Serde<V> valSerde,
@@ -552,6 +576,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doJoin(other, joiner, new MaterializedInternal<>(materialized), false, false);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -560,10 +585,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doJoin(other, joiner, false, false, joinSerde, queryableStoreName);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doJoin(other, joiner, false, false, storeSupplier);
     }
@@ -581,6 +607,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doJoin(other, joiner, new MaterializedInternal<>(materialized), true, true);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -589,10 +616,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doJoin(other, joiner, true, true, joinSerde, queryableStoreName);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-                                          final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                          final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doJoin(other, joiner, true, true, storeSupplier);
     }
@@ -614,6 +642,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                       false);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
                                          final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -622,15 +651,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doJoin(other, joiner, true, false, joinSerde, queryableStoreName);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
                                          final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-                                         final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doJoin(other, joiner, true, false, storeSupplier);
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other,
                                         final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                         final boolean leftOuter,
@@ -640,16 +670,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
-        final StateStoreSupplier storeSupplier = queryableStoreName == null ? null : keyValueStore(this.keySerde, joinSerde, queryableStoreName);
+        final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier
+            = queryableStoreName == null ? null : keyValueStore(this.keySerde, joinSerde, queryableStoreName);
 
         return doJoin(other, joiner, leftOuter, rightOuter, storeSupplier);
     }
 
+    @SuppressWarnings({"unchecked", "deprecation"})
     private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other,
                                         final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                         final boolean leftOuter,
                                         final boolean rightOuter,
-                                        final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                        final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         final String joinMergeName = builder.newName(MERGE_NAME);
@@ -668,6 +700,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return result;
     }
 
+    @SuppressWarnings("unchecked")
     private <VO, VR> KTable<K, VR> doJoin(final KTable<K, VO> other,
                                           final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                           final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
@@ -692,6 +725,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return result;
     }
 
+    @SuppressWarnings("unchecked")
     private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K> other,
                                            final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                            final boolean leftOuter,
@@ -740,6 +774,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return new KTableImpl<>(builder, joinMergeName, joinMerge, allSourceNodes, internalQueryableName, internalQueryableName != null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector,
                                                   final Serde<K1> keySerde,

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
index ffb3697..b4e5d44 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
@@ -43,6 +43,7 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
     // Default constructor needed by Kafka
     public WindowedSerializer() {}
 
+    @SuppressWarnings("unchecked")
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
         if (inner == null) {
@@ -76,12 +77,12 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
         inner.close();
     }
 
-    public byte[] serializeBaseKey(String topic, Windowed<T> data) {
+    byte[] serializeBaseKey(String topic, Windowed<T> data) {
         return inner.serialize(topic, data.key());
     }
 
     // Only for testing
-    public Serializer<T> innerSerializer() {
+    Serializer<T> innerSerializer() {
         return inner;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index ad0f236..66dfa27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -50,6 +50,7 @@ import java.util.regex.Pattern;
  *
  * @deprecated use {@link Topology} instead
  */
+@SuppressWarnings("unchecked")
 @Deprecated
 public class TopologyBuilder {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index c24686e..52465ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -46,7 +46,7 @@ public abstract class AbstractTask implements Task {
     final ProcessorTopology topology;
     final ProcessorStateManager stateMgr;
     final Set<TopicPartition> partitions;
-    final Consumer consumer;
+    final Consumer<byte[], byte[]> consumer;
     final String logPrefix;
     final boolean eosEnabled;
     final Logger log;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 06405ef..f2cbf51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
@@ -179,10 +178,10 @@ public class InternalTopologyBuilder {
 
     private static class StateStoreSupplierFactory extends AbstractStateStoreFactory {
         @SuppressWarnings("deprecation")
-        private final StateStoreSupplier supplier;
+        private final org.apache.kafka.streams.processor.StateStoreSupplier supplier;
 
         @SuppressWarnings("deprecation")
-        StateStoreSupplierFactory(final StateStoreSupplier<?> supplier) {
+        StateStoreSupplierFactory(final org.apache.kafka.streams.processor.StateStoreSupplier<?> supplier) {
             super(supplier.name(),
                   supplier.loggingEnabled(),
                   supplier instanceof WindowStoreSupplier,
@@ -196,6 +195,7 @@ public class InternalTopologyBuilder {
             return supplier.get();
         }
 
+        @SuppressWarnings("deprecation")
         @Override
         public long retentionPeriod() {
             if (!isWindowStore()) {
@@ -498,7 +498,7 @@ public class InternalTopologyBuilder {
     }
 
     @SuppressWarnings("deprecation")
-    public final void addStateStore(final StateStoreSupplier supplier,
+    public final void addStateStore(final org.apache.kafka.streams.processor.StateStoreSupplier supplier,
                                     final String... processorNames) {
         Objects.requireNonNull(supplier, "supplier can't be null");
         if (stateFactories.containsKey(supplier.name())) {
@@ -531,7 +531,7 @@ public class InternalTopologyBuilder {
     }
 
     @SuppressWarnings("deprecation")
-    public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+    public final void addGlobalStore(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
                                      final String sourceName,
                                      final TimestampExtractor timestampExtractor,
                                      final Deserializer keyDeserializer,
@@ -925,6 +925,7 @@ public class InternalTopologyBuilder {
         return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values()));
     }
 
+    @SuppressWarnings("unchecked")
     private void buildSinkNode(final Map<String, ProcessorNode> processorMap,
                                final Map<String, SinkNode> topicSinkMap,
                                final SinkNodeFactory sinkNodeFactory,

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
index b395d42..47cd61d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
@@ -52,6 +52,7 @@ public class QuickUnion<T> {
         return current;
     }
 
+    @SuppressWarnings("unchecked")
     public void unite(T id1, T... idList) {
         for (T id2 : idList) {
             unitePair(id1, id2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index b254bb8..03bbceb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -112,7 +112,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 
 
     private Map<String, String> constructTags(final String scopeName, final String entityName, final String... tags) {
-        List<String> updatedTagList = new ArrayList(Arrays.asList(tags));
+        List<String> updatedTagList = new ArrayList<>(Arrays.asList(tags));
         updatedTagList.add(scopeName + "-id");
         updatedTagList.add(entityName);
         return tagMap(updatedTagList.toArray(new String[updatedTagList.size()]));

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 0173c1d..c9c44af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
@@ -271,7 +270,7 @@ public class Stores {
                                     }
 
                                     @Override
-                                    public StateStoreSupplier build() {
+                                    public org.apache.kafka.streams.processor.StateStoreSupplier build() {
                                         log.trace("Defining InMemory Store name={} capacity={} logged={}", name, capacity, logged);
                                         if (capacity < Integer.MAX_VALUE) {
                                             return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig);
@@ -335,7 +334,7 @@ public class Stores {
                                     }
 
                                     @Override
-                                    public StateStoreSupplier build() {
+                                    public org.apache.kafka.streams.processor.StateStoreSupplier build() {
                                         log.trace("Defining RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged);
                                         if (sessionWindows) {
                                             return new RocksDBSessionStoreSupplier<>(name, retentionPeriod, keySerde, valueSerde, logged, logConfig, cachingEnabled);
@@ -535,6 +534,7 @@ public class Stores {
      * @param <K> the type of keys
      * @param <V> the type of values
      */
+    @Deprecated
     public interface InMemoryKeyValueFactory<K, V> {
         /**
          * Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is
@@ -567,7 +567,7 @@ public class Stores {
          * Return the instance of StateStoreSupplier of new key-value store.
          * @return the state store supplier; never null
          */
-        StateStoreSupplier build();
+        org.apache.kafka.streams.processor.StateStoreSupplier build();
     }
 
     /**
@@ -576,6 +576,7 @@ public class Stores {
      * @param <K> the type of keys
      * @param <V> the type of values
      */
+    @Deprecated
     public interface PersistentKeyValueFactory<K, V> {
 
         /**
@@ -614,11 +615,12 @@ public class Stores {
          * @return the factory to create a persistent key-value store
          */
         PersistentKeyValueFactory<K, V> enableCaching();
+
         /**
          * Return the instance of StateStoreSupplier of new key-value store.
          * @return the key-value store; never null
          */
-        StateStoreSupplier build();
+        org.apache.kafka.streams.processor.StateStoreSupplier build();
 
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
index 92e8ce0..10d0fe2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
@@ -19,12 +19,11 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 import java.util.Map;
 
-
-abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements StateStoreSupplier<T> {
+@Deprecated
+abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements org.apache.kafka.streams.processor.StateStoreSupplier<T> {
     protected final String name;
     protected final Serde<K> keySerde;
     protected final Serde<V> valueSerde;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 6d1e6dd..f955421 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -34,6 +34,7 @@ import java.util.Map;
  *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
+@Deprecated
 public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
 
     public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index c93bacb..0f897ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -29,6 +29,7 @@ import java.util.Map;
  * @param <V> The value type
  *
  */
+@Deprecated
 public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
 
     private final int capacity;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
index 7ac8bab..a6ff8d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
@@ -95,6 +95,7 @@ public class MeteredKeyValueBytesStore<K, V> extends WrappedStateStore.AbstractS
         }, time);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void init(ProcessorContext context, StateStore root) {
         this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 8c10987..8d9065c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -57,6 +57,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
         this.time = time;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         //noinspection unchecked

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 49d8050..20c7c43 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -57,7 +57,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
         this.valueSerde = valueSerde;
     }
 
-
+    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = context;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index d629c1c..4b233f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -29,7 +29,7 @@ import java.util.Map;
  * @param <V> the type of values
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
-
+@Deprecated
 public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
 
     private final KeyValueStoreBuilder<K, V> builder;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
index f5432dc..1552f7d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
@@ -30,6 +30,7 @@ import java.util.Map;
  *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
+@Deprecated
 public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, SessionStore> implements WindowStoreSupplier<SessionStore> {
 
     static final int NUM_SEGMENTS = 3;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index b899f5e..2a82f79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -30,7 +30,7 @@ import java.util.Map;
  *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
-
+@Deprecated
 public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> implements WindowStoreSupplier<WindowStore> {
     public static final int MIN_SEGMENTS = 2;
     private final long retentionPeriod;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index ffeb7d8..b9b7181 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -21,8 +21,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
 
-import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS;
-
 public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSupplier {
     private final String name;
     private final long retentionPeriod;
@@ -38,12 +36,14 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
         return name;
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public SessionStore<Bytes, byte[]> get() {
-        final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name,
-                                                                                    retentionPeriod,
-                                                                                    NUM_SEGMENTS,
-                                                                                    new SessionKeySchema());
+        final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(
+            name,
+            retentionPeriod,
+            org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS,
+            new SessionKeySchema());
         return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), Serdes.ByteArray());
     }
 
@@ -52,8 +52,11 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
         return "rocksdb-session";
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public long segmentIntervalMs() {
-        return Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
+        return Segments.segmentInterval(
+            retentionPeriod,
+            org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index a0500b6..e873435 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -20,8 +20,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 
-import static org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS;
-
 public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier {
     private final String name;
     private final long retentionPeriod;
@@ -29,13 +27,14 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
     private final long windowSize;
     private final boolean retainDuplicates;
 
+    @SuppressWarnings("deprecation")
     public RocksDbWindowBytesStoreSupplier(final String name,
                                            final long retentionPeriod,
                                            final int segments,
                                            final long windowSize,
                                            final boolean retainDuplicates) {
-        if (segments < MIN_SEGMENTS) {
-            throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
+        if (segments < org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS) {
+            throw new IllegalArgumentException("numSegments must be >= " + org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS);
         }
         this.name = name;
         this.retentionPeriod = retentionPeriod;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
index ad24c25..3495352 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
@@ -17,14 +17,14 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 /**
- * A windowed state store supplier that extends the {@link StateStoreSupplier} interface.
+ * A windowed state store supplier that extends the {@link org.apache.kafka.streams.processor.StateStoreSupplier} interface.
  *
  * @param <T> State store type
  */
-public interface WindowStoreSupplier<T extends StateStore> extends StateStoreSupplier<T> {
+@Deprecated
+public interface WindowStoreSupplier<T extends StateStore> extends org.apache.kafka.streams.processor.StateStoreSupplier<T> {
 
     // window retention period in milli-second
     long retentionPeriod();

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index a4b3118..100a11c 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Map;
 
-@SuppressWarnings("deprecation")
+@Deprecated
 public class MockStateStoreSupplier implements StateStoreSupplier {
     private String name;
     private boolean persistent;


[2/2] kafka git commit: MINOR: add suppress warnings annotations in Streams API

Posted by gu...@apache.org.
MINOR: add suppress warnings annotations in Streams API

 - fixes examples with regard to new API
 - fixes `Topology#addGlobalStore` parameters

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>

Closes #4003 from mjsax/minor-deprecated


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/713a67fd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/713a67fd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/713a67fd

Branch: refs/heads/trunk
Commit: 713a67fddaec3fa9cd7cce53dd6fef5ab6e0cdab
Parents: 51c652c
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Oct 4 14:42:07 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 4 14:42:07 2017 -0700

----------------------------------------------------------------------
 .../examples/pageview/PageViewTypedDemo.java    | 77 +++++++++----------
 .../examples/pageview/PageViewUntypedDemo.java  | 66 ++++++++--------
 .../examples/temperature/TemperatureDemo.java   | 54 ++++++-------
 .../examples/wordcount/WordCountDemo.java       | 29 +++----
 .../java/org/apache/kafka/streams/Topology.java |  6 +-
 .../kafka/streams/kstream/KGroupedStream.java   | 80 +++++++++-----------
 .../kafka/streams/kstream/KGroupedTable.java    |  7 +-
 .../kafka/streams/kstream/KStreamBuilder.java   | 30 ++++----
 .../apache/kafka/streams/kstream/KTable.java    | 23 +++---
 .../kstream/internals/AbstractStream.java       | 10 +--
 .../internals/InternalStreamsBuilder.java       |  5 +-
 .../kstream/internals/KGroupedStreamImpl.java   | 59 ++++++++++-----
 .../kstream/internals/KGroupedTableImpl.java    | 36 +++++----
 .../streams/kstream/internals/KStreamImpl.java  | 36 ++++++++-
 .../kstream/internals/KStreamKStreamJoin.java   |  1 +
 .../streams/kstream/internals/KTableImpl.java   | 77 ++++++++++++++-----
 .../kstream/internals/WindowedSerializer.java   |  5 +-
 .../streams/processor/TopologyBuilder.java      |  1 +
 .../processor/internals/AbstractTask.java       |  2 +-
 .../internals/InternalTopologyBuilder.java      | 11 +--
 .../streams/processor/internals/QuickUnion.java |  1 +
 .../processor/internals/StreamsMetricsImpl.java |  2 +-
 .../org/apache/kafka/streams/state/Stores.java  | 12 +--
 .../state/internals/AbstractStoreSupplier.java  |  5 +-
 .../InMemoryKeyValueStoreSupplier.java          |  1 +
 .../InMemoryLRUCacheStoreSupplier.java          |  1 +
 .../internals/MeteredKeyValueBytesStore.java    |  1 +
 .../state/internals/MeteredSessionStore.java    |  1 +
 .../state/internals/MeteredWindowStore.java     |  2 +-
 .../internals/RocksDBKeyValueStoreSupplier.java |  2 +-
 .../internals/RocksDBSessionStoreSupplier.java  |  1 +
 .../internals/RocksDBWindowStoreSupplier.java   |  2 +-
 .../RocksDbSessionBytesStoreSupplier.java       | 17 +++--
 .../RocksDbWindowBytesStoreSupplier.java        |  7 +-
 .../state/internals/WindowStoreSupplier.java    |  6 +-
 .../kafka/test/MockStateStoreSupplier.java      |  2 +-
 36 files changed, 394 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 068eece..101cd23 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
@@ -150,45 +151,45 @@ public class PageViewTypedDemo {
                                                           Consumed.with(Serdes.String(), userProfileSerde));
 
         KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
-                .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
-                    @Override
-                    public PageViewByRegion apply(PageView view, UserProfile profile) {
-                        PageViewByRegion viewByRegion = new PageViewByRegion();
-                        viewByRegion.user = view.user;
-                        viewByRegion.page = view.page;
-
-                        if (profile != null) {
-                            viewByRegion.region = profile.region;
-                        } else {
-                            viewByRegion.region = "UNKNOWN";
-                        }
-                        return viewByRegion;
+            .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
+                @Override
+                public PageViewByRegion apply(PageView view, UserProfile profile) {
+                    PageViewByRegion viewByRegion = new PageViewByRegion();
+                    viewByRegion.user = view.user;
+                    viewByRegion.page = view.page;
+
+                    if (profile != null) {
+                        viewByRegion.region = profile.region;
+                    } else {
+                        viewByRegion.region = "UNKNOWN";
                     }
-                })
-                .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
-                    @Override
-                    public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
-                        return new KeyValue<>(viewRegion.region, viewRegion);
-                    }
-                })
-                .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
-                .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
-                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
-                .toStream()
-                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
-                    @Override
-                    public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
-                        WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
-                        wViewByRegion.windowStart = key.window().start();
-                        wViewByRegion.region = key.key();
-
-                        RegionCount rCount = new RegionCount();
-                        rCount.region = key.key();
-                        rCount.count = value;
-
-                        return new KeyValue<>(wViewByRegion, rCount);
-                    }
-                });
+                    return viewByRegion;
+                }
+            })
+            .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
+                @Override
+                public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
+                    return new KeyValue<>(viewRegion.region, viewRegion);
+                }
+            })
+            .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
+            .windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1)))
+            .count()
+            .toStream()
+            .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
+                @Override
+                public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
+                    WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
+                    wViewByRegion.windowStart = key.window().start();
+                    wViewByRegion.region = key.key();
+
+                    RegionCount rCount = new RegionCount();
+                    rCount.region = key.key();
+                    rCount.count = value;
+
+                    return new KeyValue<>(wViewByRegion, rCount);
+                }
+            });
 
         // write to the result topic
         regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde, regionCountSerde));

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index c20c077..ae72042 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -87,39 +87,39 @@ public class PageViewUntypedDemo {
         });
 
         KStream<JsonNode, JsonNode> regionCount = views
-                .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
-                    @Override
-                    public JsonNode apply(JsonNode view, String region) {
-                        ObjectNode jNode = JsonNodeFactory.instance.objectNode();
-
-                        return jNode.put("user", view.get("user").textValue())
-                                .put("page", view.get("page").textValue())
-                                .put("region", region == null ? "UNKNOWN" : region);
-                    }
-                })
-                .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
-                    @Override
-                    public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
-                        return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
-                    }
-                })
-                .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
-                .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
-                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
-                .toStream()
-                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
-                    @Override
-                    public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
-                        ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
-                        keyNode.put("window-start", key.window().start())
-                                .put("region", key.key());
-
-                        ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
-                        valueNode.put("count", value);
-
-                        return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
-                    }
-                });
+            .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
+                @Override
+                public JsonNode apply(JsonNode view, String region) {
+                    ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+                    return jNode.put("user", view.get("user").textValue())
+                            .put("page", view.get("page").textValue())
+                            .put("region", region == null ? "UNKNOWN" : region);
+                }
+            })
+            .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
+                @Override
+                public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
+                    return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
+                }
+            })
+            .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
+            .windowedBy(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
+            .count()
+            .toStream()
+            .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
+                @Override
+                public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
+                    ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
+                    keyNode.put("window-start", key.window().start())
+                            .put("region", key.key());
+
+                    ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
+                    valueNode.put("count", value);
+
+                    return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
+                }
+            });
 
         // write to the result topic
         regionCount.to("streams-pageviewstats-untyped-output", Produced.with(jsonSerde, jsonSerde));

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index 2039ca5..ea81dd6 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -87,38 +88,39 @@ public class TemperatureDemo {
         KStream<String, String> source = builder.stream("iot-temperature");
 
         KStream<Windowed<String>, String> max = source
-                // temperature values are sent without a key (null), so in order
-                // to group and reduce them, a key is needed ("temp" has been chosen)
-                .selectKey(new KeyValueMapper<String, String, String>() {
-                    @Override
-                    public String apply(String key, String value) {
-                        return "temp";
-                    }
-                })
-                .groupByKey()
-                .reduce(new Reducer<String>() {
-                    @Override
-                    public String apply(String value1, String value2) {
-                        if (Integer.parseInt(value1) > Integer.parseInt(value2))
-                            return value1;
-                        else
-                            return value2;
-                    }
-                }, TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
-                .toStream()
-                .filter(new Predicate<Windowed<String>, String>() {
-                    @Override
-                    public boolean test(Windowed<String> key, String value) {
-                        return Integer.parseInt(value) > TEMPERATURE_THRESHOLD;
-                    }
-                });
+            // temperature values are sent without a key (null), so in order
+            // to group and reduce them, a key is needed ("temp" has been chosen)
+            .selectKey(new KeyValueMapper<String, String, String>() {
+                @Override
+                public String apply(String key, String value) {
+                    return "temp";
+                }
+            })
+            .groupByKey()
+            .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
+            .reduce(new Reducer<String>() {
+                @Override
+                public String apply(String value1, String value2) {
+                    if (Integer.parseInt(value1) > Integer.parseInt(value2))
+                        return value1;
+                    else
+                        return value2;
+                }
+            })
+            .toStream()
+            .filter(new Predicate<Windowed<String>, String>() {
+                @Override
+                public boolean test(Windowed<String> key, String value) {
+                    return Integer.parseInt(value) > TEMPERATURE_THRESHOLD;
+                }
+            });
 
         WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
         WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), TEMPERATURE_WINDOW_SIZE);
         Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
 
         // need to override key serde to Windowed<String> type
-        max.to(windowedSerde, Serdes.String(), "iot-temperature-max");
+        max.to("iot-temperature-max", Produced.with(windowedSerde, Serdes.String()));
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final CountDownLatch latch = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 5689d50..7535315 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueMapper;
 
 import java.util.Arrays;
@@ -63,22 +64,22 @@ public class WordCountDemo {
         KStream<String, String> source = builder.stream("streams-plaintext-input");
 
         KTable<String, Long> counts = source
-                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
-                    @Override
-                    public Iterable<String> apply(String value) {
-                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
-                    }
-                })
-                .groupBy(new KeyValueMapper<String, String, String>() {
-                    @Override
-                    public String apply(String key, String value) {
-                        return value;
-                    }
-                })
-                .count("Counts");
+            .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                @Override
+                public Iterable<String> apply(String value) {
+                    return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
+                }
+            })
+            .groupBy(new KeyValueMapper<String, String, String>() {
+                @Override
+                public String apply(String key, String value) {
+                    return value;
+                }
+            })
+            .count();
 
         // need to override value serde to Long type
-        counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
+        counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final CountDownLatch latch = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 1409b97..3b1ac6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 
 import java.util.regex.Pattern;
 
@@ -573,6 +572,7 @@ public class Topology {
      * @return itself
      * @throws TopologyException if the processor of state is already registered
      */
+    @SuppressWarnings("unchecked")
     public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
                                                 final String sourceName,
                                                 final Deserializer keyDeserializer,
@@ -609,7 +609,8 @@ public class Topology {
      * @return itself
      * @throws TopologyException if the processor of state is already registered
      */
-    public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder,
+    @SuppressWarnings("unchecked")
+    public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
                                                 final String sourceName,
                                                 final TimestampExtractor timestampExtractor,
                                                 final Deserializer keyDeserializer,
@@ -641,6 +642,7 @@ public class Topology {
      *
      * @return a description of the topology.
      */
+
     public synchronized TopologyDescription describe() {
         return internalTopologyBuilder.describe();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 17f2db4..b3945f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.SessionStore;
@@ -132,7 +131,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * String queryableStoreName = storeSupplier.name();
@@ -149,7 +148,7 @@ public interface KGroupedStream<K, V> {
      * @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
-    KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+    KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Count the number of records in this stream by the grouped key.
@@ -290,7 +289,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * String queryableStoreName = storeSupplier.name();
@@ -312,7 +311,7 @@ public interface KGroupedStream<K, V> {
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
-                                                       final StateStoreSupplier<WindowStore> storeSupplier);
+                                                       final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier);
 
 
     /**
@@ -333,10 +332,8 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
-     * Sting queryableStoreName = storeSupplier.name();
      * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
      * String key = "some-key";
      * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -398,7 +395,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
      * Sting queryableStoreName = storeSupplier.name();
@@ -418,7 +415,7 @@ public interface KGroupedStream<K, V> {
      */
     @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
-                                    final StateStoreSupplier<SessionStore> storeSupplier);
+                                    final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier);
 
     /**
      * Combine the values of records in this stream by the grouped key.
@@ -522,7 +519,7 @@ public interface KGroupedStream<K, V> {
      * Combine the value of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
      * Combining implies that the type of the aggregate result is the same as the type of the input value
-     * (c.f. {@link #aggregate(Initializer, Aggregator, StateStoreSupplier)}).
+     * (c.f. {@link #aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)}).
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * provided by the given {@code storeSupplier}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -540,8 +537,8 @@ public interface KGroupedStream<K, V> {
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
      * value as-is.
-     * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
-     * max.
+     * Thus, {@code reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to compute
+     * aggregate functions like sum, min, or max.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same key.
@@ -552,7 +549,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
      * String queryableStoreName = storeSupplier.name();
@@ -571,7 +568,7 @@ public interface KGroupedStream<K, V> {
      */
     @Deprecated
     KTable<K, V> reduce(final Reducer<V> reducer,
-                        final StateStoreSupplier<KeyValueStore> storeSupplier);
+                        final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Combine the value of records in this stream by the grouped key.
@@ -595,7 +592,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
      * value as-is.
-     * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
+     * Thus, {@code reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
      * max.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
@@ -767,8 +764,8 @@ public interface KGroupedStream<K, V> {
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
      * value as-is.
-     * Thus, {@code reduce(Reducer, Windows, StateStoreSupplier)} can be used to compute aggregate functions like sum,
-     * min, or max.
+     * Thus, {@code reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to
+     * compute aggregate functions like sum, min, or max.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -779,7 +776,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
      * Sting queryableStoreName = storeSupplier.name();
@@ -803,7 +800,7 @@ public interface KGroupedStream<K, V> {
     @Deprecated
     <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                      final Windows<W> windows,
-                                                     final StateStoreSupplier<WindowStore> storeSupplier);
+                                                     final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier);
 
     /**
      * Combine values of this stream by the grouped key into {@link SessionWindows}.
@@ -841,10 +838,8 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
-     * Sting queryableStoreName = storeSupplier.name();
      * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
      * String key = "some-key";
      * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -933,8 +928,8 @@ public interface KGroupedStream<K, V> {
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
      * value as-is.
-     * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute aggregate functions like
-     * sum, min, or max.
+     * Thus, {@code reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used
+     * to compute aggregate functions like sum, min, or max.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -945,7 +940,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
      * Sting queryableStoreName = storeSupplier.name();
@@ -975,7 +970,7 @@ public interface KGroupedStream<K, V> {
     @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                   final SessionWindows sessionWindows,
-                                  final StateStoreSupplier<SessionStore> storeSupplier);
+                                  final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier);
 
 
     /**
@@ -1188,8 +1183,8 @@ public interface KGroupedStream<K, V> {
     /**
      * Aggregate the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, StateStoreSupplier) combining via reduce(...)} as it,
-     * for example, allows the result to have a different type than the input values.
+     * Aggregating is a generalization of {@link #reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)
+     * combining via reduce(...)} as it, for example, allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * provided by the given {@code storeSupplier}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -1199,8 +1194,8 @@ public interface KGroupedStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@code aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to compute aggregate functions
-     * like count (c.f. {@link #count()}).
+     * Thus, {@code aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)} can be
+     * used to compute aggregate functions like count (c.f. {@link #count()}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same key.
@@ -1211,7 +1206,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // some aggregation on value type double
      * Sting queryableStoreName = storeSupplier.name();
@@ -1233,7 +1228,7 @@ public interface KGroupedStream<K, V> {
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> aggregator,
-                                 final StateStoreSupplier<KeyValueStore> storeSupplier);
+                                 final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Aggregate the values of records in this stream by the grouped key and defined windows.
@@ -1362,8 +1357,9 @@ public interface KGroupedStream<K, V> {
     /**
      * Aggregate the values of records in this stream by the grouped key and defined windows.
      * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, Windows, StateStoreSupplier) combining via
-     * reduce(...)} as it, for example, allows the result to have a different type than the input values.
+     * Aggregating is a generalization of
+     * {@link #reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) combining via reduce(...)}
+     * as it, for example, allows the result to have a different type than the input values.
      * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
      * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
      * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
@@ -1377,8 +1373,8 @@ public interface KGroupedStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@code aggregate(Initializer, Aggregator, Windows, StateStoreSupplier)} can be used to compute aggregate
-     * functions like count (c.f. {@link #count(Windows)}).
+     * Thus, {@code aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)}
+     * can be used to compute aggregate functions like count (c.f. {@link #count(Windows)}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1389,7 +1385,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // some windowed aggregation on value type Long
      * Sting queryableStoreName = storeSupplier.name();
@@ -1416,7 +1412,7 @@ public interface KGroupedStream<K, V> {
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                              final Aggregator<? super K, ? super V, VR> aggregator,
                                                              final Windows<W> windows,
-                                                             final StateStoreSupplier<WindowStore> storeSupplier);
+                                                             final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier);
 
     /**
      * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
@@ -1446,10 +1442,8 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // some windowed aggregation on value type double
-     * Sting queryableStoreName = storeSupplier.name();
      * ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore());
      * String key = "some-key";
      * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -1540,8 +1534,8 @@ public interface KGroupedStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, StateStoreSupplier)} can be used
-     * to compute aggregate functions like count (c.f. {@link #count(SessionWindows)}).
+     * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, org.apache.kafka.streams.processor.StateStoreSupplier)}
+     * can be used to compute aggregate functions like count (c.f. {@link #count(SessionWindows)}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1552,7 +1546,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // some windowed aggregation on value type double
      * Sting queryableStoreName = storeSupplier.name();
@@ -1583,7 +1577,7 @@ public interface KGroupedStream<K, V> {
                                          final Merger<? super K, T> sessionMerger,
                                          final SessionWindows sessionWindows,
                                          final Serde<T> aggValueSerde,
-                                         final StateStoreSupplier<SessionStore> storeSupplier);
+                                         final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier);
 
     /**
      * Create a new {@link TimeWindowedKStream} instance that can be used to perform windowed aggregations.

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index f814eaf..d0a38cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 
@@ -195,7 +194,7 @@ public interface KGroupedTable<K, V> {
      * @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier)}
      */
     @Deprecated
-    KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+    KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -467,7 +466,7 @@ public interface KGroupedTable<K, V> {
     @Deprecated
     KTable<K, V> reduce(final Reducer<V> adder,
                         final Reducer<V> subtractor,
-                        final StateStoreSupplier<KeyValueStore> storeSupplier);
+                        final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -945,6 +944,6 @@ public interface KGroupedTable<K, V> {
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> adder,
                                  final Aggregator<? super K, ? super V, VR> subtractor,
-                                 final StateStoreSupplier<KeyValueStore> storeSupplier);
+                                 final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index d4642da..6b51c86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
 import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
@@ -431,7 +430,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final String topic,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         return table(null, null, null, null, topic, storeSupplier);
     }
 
@@ -525,7 +524,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
                                      final String topic,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         return table(offsetReset, null, null, null, topic, storeSupplier);
     }
 
@@ -702,7 +701,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
     public <K, V> KTable<K, V> table(final Serde<K> keySerde,
                                      final Serde<V> valSerde,
                                      final String topic,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         return table(null, null, keySerde, valSerde, topic, storeSupplier);
     }
 
@@ -737,7 +736,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
                                         final Serde<V> valSerde,
                                         final TimestampExtractor timestampExtractor,
                                         final String topic,
-                                        final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                        final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
                                         final boolean isQueryable) {
         try {
             final String source = newName(KStreamImpl.SOURCE_NAME);
@@ -882,6 +881,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
      * @return a {@link KTable} for the specified topic
      */
+    @SuppressWarnings("unchecked")
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
                                      final TimestampExtractor timestampExtractor,
                                      final Serde<K> keySerde,
@@ -889,12 +889,13 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
                                      final String topic,
                                      final String queryableStoreName) {
         final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
-        final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
-                keySerde,
-                valSerde,
-                false,
-                Collections.<String, String>emptyMap(),
-                true);
+        final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(
+            internalStoreName,
+            keySerde,
+            valSerde,
+            false,
+            Collections.<String, String>emptyMap(),
+            true);
         return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null);
     }
 
@@ -965,7 +966,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
                                      final Serde<K> keySerde,
                                      final Serde<V> valSerde,
                                      final String topic,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, true);
     }
@@ -1096,7 +1097,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
     public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
                                                  final Serde<V> valSerde,
                                                  final String topic,
-                                                 final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                                 final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
     }
 
@@ -1172,7 +1173,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
                                                     final Serde<V> valSerde,
                                                     final TimestampExtractor timestampExtractor,
                                                     final String topic,
-                                                    final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                                    final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         try {
             Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
             final String sourceName = newName(KStreamImpl.SOURCE_NAME);
@@ -1224,6 +1225,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * @param streams the {@link KStream}s to be merged
      * @return a {@link KStream} containing all records of the given streams
      */
+    @SuppressWarnings("unchecked")
     public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
         Objects.requireNonNull(streams, "streams can't be null");
         if (streams.length <= 1) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 1abc5e7..33e56aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -206,7 +205,8 @@ public interface KTable<K, V> {
      * @deprecated use {@link #filter(Predicate, Materialized) filter(predicate, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
-    KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
+    KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
+                        final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
@@ -300,7 +300,8 @@ public interface KTable<K, V> {
      * @deprecated use {@link #filterNot(Predicate, Materialized) filterNot(predicate, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
-    KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
+    KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
+                           final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
@@ -512,7 +513,7 @@ public interface KTable<K, V> {
     @Deprecated
     <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
                                  final Serde<VR> valueSerde,
-                                 final StateStoreSupplier<KeyValueStore> storeSupplier);
+                                 final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
 
     /**
@@ -811,7 +812,7 @@ public interface KTable<K, V> {
      */
     @Deprecated
     KTable<K, V> through(final String topic,
-                         final StateStoreSupplier<KeyValueStore> storeSupplier);
+                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
@@ -913,7 +914,7 @@ public interface KTable<K, V> {
     @Deprecated
     KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                          final String topic,
-                         final StateStoreSupplier<KeyValueStore> storeSupplier);
+                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
@@ -978,7 +979,7 @@ public interface KTable<K, V> {
     KTable<K, V> through(final Serde<K> keySerde,
                          final Serde<V> valSerde,
                          final String topic,
-                         final StateStoreSupplier<KeyValueStore> storeSupplier);
+                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
@@ -1080,7 +1081,7 @@ public interface KTable<K, V> {
                          final Serde<V> valSerde,
                          final StreamPartitioner<? super K, ? super V> partitioner,
                          final String topic,
-                         final StateStoreSupplier<KeyValueStore> storeSupplier);
+                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
@@ -1590,7 +1591,7 @@ public interface KTable<K, V> {
     @Deprecated
     <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
-                                final StateStoreSupplier<KeyValueStore> storeSupplier);
+                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
 
     /**
@@ -1934,7 +1935,7 @@ public interface KTable<K, V> {
     @Deprecated
     <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                     final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
-                                    final StateStoreSupplier<KeyValueStore> storeSupplier);
+                                    final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
 
     /**
@@ -2275,7 +2276,7 @@ public interface KTable<K, V> {
     @Deprecated
     <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                      final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier);
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Get the name of the local state store used that can be used to query this {@code KTable}.

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index b5de562..26e404e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
@@ -80,8 +79,8 @@ public abstract class AbstractStream<K> {
         };
     }
 
-    @SuppressWarnings("unchecked")
-    static <T, K>  StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
+    @SuppressWarnings({"unchecked", "deprecation"})
+    static <T, K>  org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
                                                                    final Serde<T> aggValueSerde,
                                                                    final String storeName) {
         Objects.requireNonNull(storeName, "storeName can't be null");
@@ -89,8 +88,8 @@ public abstract class AbstractStream<K> {
         return storeFactory(keySerde, aggValueSerde, storeName).build();
     }
 
-    @SuppressWarnings("unchecked")
-    static  <W extends Window, T, K> StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
+    @SuppressWarnings({"unchecked", "deprecation"})
+    static  <W extends Window, T, K> org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
                                                                                    final Serde<T> aggValSerde,
                                                                                    final Windows<W> windows,
                                                                                    final String storeName) {
@@ -101,6 +100,7 @@ public abstract class AbstractStream<K> {
                 .build();
     }
 
+    @SuppressWarnings("deprecation")
     static  <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<K> keySerde,
                                                                        final Serde<T> aggValueSerde,
                                                                        final String storeName) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 357a70c..0df1524 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -70,9 +69,10 @@ public class InternalStreamsBuilder {
         return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
 
+    @SuppressWarnings("deprecation")
     public <K, V> KTable<K, V> table(final String topic,
                                      final ConsumedInternal<K, V> consumed,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         final String source = newName(KStreamImpl.SOURCE_NAME);
         final String name = newName(KTableImpl.SOURCE_NAME);
@@ -132,6 +132,7 @@ public class InternalStreamsBuilder {
                                 consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeName, isQueryable);
     }
 
+    @SuppressWarnings("unchecked")
     public <K, V> GlobalKTable<K, V> globalTable(final String topic,
                                                  final ConsumedInternal<K, V> consumed,
                                                  final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 64dfd19..dafaa62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -28,11 +28,10 @@ import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindowedKStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -78,6 +77,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         } // no need for else {} since isQueryable is true by default
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> reduce(final Reducer<V> reducer,
                                final String queryableStoreName) {
@@ -91,9 +91,10 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return reduce(reducer, (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> reduce(final Reducer<V> reducer,
-                               final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                               final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doAggregate(
@@ -115,7 +116,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                 materializedInternal);
     }
 
-
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                             final Windows<W> windows,
@@ -124,17 +125,18 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, getOrCreateName(queryableStoreName, REDUCE_NAME)));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                             final Windows<W> windows) {
         return windowedBy(windows).reduce(reducer);
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                             final Windows<W> windows,
-                                                            final StateStoreSupplier<WindowStore> storeSupplier) {
+                                                            final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) {
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(windows, "windows can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -145,6 +147,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         );
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> aggregator,
@@ -176,7 +179,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     }
 
     @Override
-    public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator) {
+    public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                        final Aggregator<? super K, ? super V, VR> aggregator) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         final String storeName = builder.newStoreName(AGGREGATE_NAME);
@@ -189,6 +193,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> aggregator,
@@ -196,10 +201,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return aggregate(initializer, aggregator, aggValueSerde, null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> aggregator,
-                                      final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                      final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -209,6 +215,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                 storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                                   final Aggregator<? super K, ? super V, T> aggregator,
@@ -219,6 +226,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                                   final Aggregator<? super K, ? super V, T> aggregator,
@@ -230,12 +238,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                                      .withValueSerde(aggValueSerde));
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                                   final Aggregator<? super K, ? super V, T> aggregator,
                                                                   final Windows<W> windows,
-                                                                  final StateStoreSupplier<WindowStore> storeSupplier) {
+                                                                  final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(windows, "windows can't be null");
@@ -247,6 +255,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         );
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, Long> count(final String queryableStoreName) {
         determineIsQueryable(queryableStoreName);
@@ -258,8 +267,9 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return count((String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
-    public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
+    public KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, storeSupplier);
     }
 
@@ -274,6 +284,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, materialized);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
                                                               final String queryableStoreName) {
@@ -281,14 +292,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return count(windows, windowedStore(keySerde, Serdes.Long(), windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows) {
         return windowedBy(windows).count();
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
-                                                              final StateStoreSupplier<WindowStore> storeSupplier) {
+                                                              final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) {
         return aggregate(
                 aggregateBuilder.countInitializer,
                 aggregateBuilder.countAggregator,
@@ -296,7 +309,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                 storeSupplier);
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                 final Aggregator<? super K, ? super V, T> aggregator,
@@ -316,6 +329,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                 final Aggregator<? super K, ? super V, T> aggregator,
@@ -330,14 +344,14 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                                             .withValueSerde(aggValueSerde));
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                 final Aggregator<? super K, ? super V, T> aggregator,
                                                 final Merger<? super K, T> sessionMerger,
                                                 final SessionWindows sessionWindows,
                                                 final Serde<T> aggValueSerde,
-                                                final StateStoreSupplier<SessionStore> storeSupplier) {
+                                                final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
@@ -373,7 +387,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                                 aggregateBuilder);
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName) {
         Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized = Materialized.<K, Long, SessionStore<Bytes, byte[]>>as(getOrCreateName(queryableStoreName, AGGREGATE_NAME))
                 .withKeySerde(keySerde)
@@ -381,13 +395,15 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return windowedBy(sessionWindows).count(materialized);
     }
 
+    @SuppressWarnings("deprecation")
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows) {
         return windowedBy(sessionWindows).count();
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
-                                           final StateStoreSupplier<SessionStore> storeSupplier) {
+                                           final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) {
         Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         final Merger<K, Long> sessionMerger = new Merger<K, Long>() {
@@ -406,7 +422,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     }
 
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                          final SessionWindows sessionWindows,
@@ -418,6 +434,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                               .sessionWindowed(sessionWindows.maintainMs()).build());
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                          final SessionWindows sessionWindows) {
@@ -425,10 +442,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return windowedBy(sessionWindows).reduce(reducer);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                          final SessionWindows sessionWindows,
-                                         final StateStoreSupplier<SessionStore> storeSupplier) {
+                                         final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) {
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -471,10 +489,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     }
 
+    @SuppressWarnings("deprecation")
     private <T> KTable<K, T> doAggregate(
             final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
             final String functionName,
-            final StateStoreSupplier storeSupplier) {
+            final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier) {
 
         final String aggFunctionName = builder.newName(functionName);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index e69d4f9..507944a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -21,15 +21,14 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedTable;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Collections;
@@ -71,11 +70,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         }
     };
 
-    public KGroupedTableImpl(final InternalStreamsBuilder builder,
-                             final String name,
-                             final String sourceName,
-                             final Serde<? extends K> keySerde,
-                             final Serde<? extends V> valSerde) {
+    KGroupedTableImpl(final InternalStreamsBuilder builder,
+                      final String name,
+                      final String sourceName,
+                      final Serde<? extends K> keySerde,
+                      final Serde<? extends V> valSerde) {
         super(builder, name, Collections.singleton(sourceName));
         this.keySerde = keySerde;
         this.valSerde = valSerde;
@@ -88,6 +87,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         } // no need for else {} since isQueryable is true by default
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> adder,
@@ -98,6 +98,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> adder,
@@ -106,6 +107,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return aggregate(initializer, adder, subtractor, aggValueSerde, null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> adder,
@@ -122,11 +124,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return aggregate(initializer, adder, subtractor, (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> adder,
                                       final Aggregator<? super K, ? super V, T> subtractor,
-                                      final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                      final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(adder, "adder can't be null");
         Objects.requireNonNull(subtractor, "subtractor can't be null");
@@ -135,9 +138,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return doAggregate(aggregateSupplier, AGGREGATE_NAME, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
                                          final String functionName,
-                                         final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         final String sinkName = builder.newName(KStreamImpl.SINK_NAME);
         final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
         final String funcName = builder.newName(functionName);
@@ -194,6 +198,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), materialized.storeName(), isQueryable);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> reduce(final Reducer<V> adder,
                                final Reducer<V> subtractor,
@@ -223,10 +228,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return reduce(adder, subtractor, (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> reduce(final Reducer<V> adder,
                                final Reducer<V> subtractor,
-                               final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                               final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(adder, "adder can't be null");
         Objects.requireNonNull(subtractor, "subtractor can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -234,6 +240,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return doAggregate(aggregateSupplier, REDUCE_NAME, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, Long> count(final String queryableStoreName) {
         determineIsQueryable(queryableStoreName);
@@ -271,8 +278,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return count((String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
-    public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
+    public KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         return this.aggregate(
                 countInitializer,
                 countAdder,