You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/10/03 14:36:35 UTC

[1/2] kafka git commit: MINOR: fix JavaDocs warnings

Repository: kafka
Updated Branches:
  refs/heads/trunk 716330a5b -> 3dcbbf703


http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 66ec0d7..1abc5e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -166,7 +166,7 @@ public interface KTable<K, V> {
      *                          (i.e., that would be equivalent to calling {@link KTable#filter(Predicate)}.
      * @return a {@code KTable} that contains only those records that satisfy the given predicate
      * @see #filterNot(Predicate, Materialized)
-     * @deprecated use {@link #filter(Predicate, Materialized)}
+     * @deprecated use {@link #filter(Predicate, Materialized) filter(predicate, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
@@ -203,7 +203,7 @@ public interface KTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains only those records that satisfy the given predicate
      * @see #filterNot(Predicate, Materialized)
-     * @deprecated use {@link #filter(Predicate, Materialized)}
+     * @deprecated use {@link #filter(Predicate, Materialized) filter(predicate, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -297,7 +297,7 @@ public interface KTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
      * @see #filter(Predicate, Materialized)
-     * @deprecated use {@link #filterNot(Predicate, Materialized)}
+     * @deprecated use {@link #filterNot(Predicate, Materialized) filterNot(predicate, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -336,7 +336,7 @@ public interface KTable<K, V> {
      * (i.e., that would be equivalent to calling {@link KTable#filterNot(Predicate)}.
      * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
      * @see #filter(Predicate, Materialized)
-     * @deprecated use {@link #filter(Predicate, Materialized)}
+     * @deprecated use {@link #filter(Predicate, Materialized) filterNot(predicate, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
@@ -463,7 +463,7 @@ public interface KTable<K, V> {
      * @param <VR>   the value type of the result {@code KTable}
      *
      * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
-     * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
+     * @deprecated use {@link #mapValues(ValueMapper, Materialized) mapValues(mapper, Materialized.as(queryableStoreName).withValueSerde(valueSerde))}
      */
     @Deprecated
     <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);
@@ -507,7 +507,7 @@ public interface KTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @param <VR>   the value type of the result {@code KTable}
      * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
-     * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
+     * @deprecated use {@link #mapValues(ValueMapper, Materialized) mapValues(mapper, Materialized.as(KeyValueByteStoreSupplier).withValueSerde(valueSerde))}
      */
     @Deprecated
     <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
@@ -530,7 +530,8 @@ public interface KTable<K, V> {
      * update record.
      * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link KStream#print()} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) print(Printed.toSysOut())} on the result.
      */
     @Deprecated
     void print();
@@ -551,7 +552,8 @@ public interface KTable<K, V> {
      * @param label the name used to label the key/value pairs printed to the console
      * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link KStream#print(String)} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) print(Printed.toSysOut().withLabel(lable))} on the result.
      */
     @Deprecated
     void print(final String label);
@@ -574,7 +576,8 @@ public interface KTable<K, V> {
      * @param valSerde value serde used to deserialize value if type is {@code byte[]}
      * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde, Serde)} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) print(Printed.toSysOut().withKeyValueMapper(...)} on the result.
      */
     @Deprecated
     void print(final Serde<K> keySerde,
@@ -598,7 +601,8 @@ public interface KTable<K, V> {
      * @param label the name used to label the key/value pairs printed to the console
      * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde, Serde, String)} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(...)} on the result.
      */
     @Deprecated
     void print(final Serde<K> keySerde,
@@ -622,7 +626,8 @@ public interface KTable<K, V> {
      * @param filePath name of file to write to
      * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String)}} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) print(Printed.toFile(filePath)} on the result.
      */
     @Deprecated
     void writeAsText(final String filePath);
@@ -644,7 +649,8 @@ public interface KTable<K, V> {
      * @param label the name used to label the key/value pairs printed out to the console
      * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, String)}} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) print(Printed.toFile(filePath).withLabel(label)} on the result.
      */
     @Deprecated
     void writeAsText(final String filePath,
@@ -669,7 +675,8 @@ public interface KTable<K, V> {
      * @param valSerde value serde used to deserialize value if type is {@code byte[]}
      * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, Serde, Serde)}} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(...)} on the result.
      */
     @Deprecated
     void  writeAsText(final String filePath,
@@ -695,8 +702,8 @@ public interface KTable<K, V> {
      * @param valSerde value serde used to deserialize value if type is {@code byte[]}
      * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, String, Serde, Serde)}} on the result.
-
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(...)} on the result.
      */
     @Deprecated
     void writeAsText(final String filePath,
@@ -714,7 +721,8 @@ public interface KTable<K, V> {
      * @param action an action to perform on each record
      * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a KStream using {@code toStream()} and then use {@link KStream#foreach(ForeachAction)}} on the result.
+     * convert to a {@link KStream} using {@link #toStream()} and then use
+     * {@link KStream#foreach(ForeachAction) foreach(action)} on the result.
      */
     @Deprecated
     void foreach(final ForeachAction<? super K, ? super V> action);
@@ -763,18 +771,19 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
      *
      * @param topic     the topic name
      * @param queryableStoreName the state store name used for the result {@code KTable}; valid characters are ASCII
-     *                  alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)()}
+     *                  alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)}
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
+     * to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final String topic,
@@ -787,17 +796,18 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
      *
      * @param topic     the topic name
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and
+     * and {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier))}
+     * to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final String topic,
@@ -810,15 +820,15 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
+     * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
-     * {@link StreamsBuilder#table(String)})
+     * {@link KStreamBuilder#table(String)})
      *
      * @param topic     the topic name
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and
+     * and {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final String topic);
@@ -831,17 +841,18 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
+     * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
-     * {@link StreamsBuilder#table(String)})
+     * {@link KStreamBuilder#table(String)})
      *
      * @param partitioner the function used to determine how records are distributed among partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be used
      * @param topic       the topic name
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and
+     * {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -855,10 +866,10 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      *
      * @param partitioner the function used to determine how records are distributed among partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be used
@@ -866,8 +877,10 @@ public interface KTable<K, V> {
      * @param queryableStoreName   the state store name used for the result {@code KTable}.
      *                             If {@code null} this is the equivalent of {@link KTable#through(StreamPartitioner, String)}
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
+     * to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -882,18 +895,20 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      *
      * @param partitioner the function used to determine how records are distributed among partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be used
      * @param topic       the topic name
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier)}
+     * to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -909,10 +924,10 @@ public interface KTable<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      *
      * @param keySerde  key serde used to send key-value pairs,
      *                  if not specified the default key serde defined in the configuration will be used
@@ -920,13 +935,16 @@ public interface KTable<K, V> {
      *                  if not specified the default value serde defined in the configuration will be used
      * @param topic     the topic name
      * @param queryableStoreName the state store name used for the result {@code KTable}.
-     *                           If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, String)()}
+     *                           If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, String)}
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))} and
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
+     * to read back as a {@code KTable}
      */
     @Deprecated
-    KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+    KTable<K, V> through(final Serde<K> keySerde,
+                         final Serde<V> valSerde,
                          final String topic,
                          final String queryableStoreName);
 
@@ -939,7 +957,7 @@ public interface KTable<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
      * {@link StreamsBuilder#table(String, Materialized)})
@@ -951,11 +969,14 @@ public interface KTable<K, V> {
      * @param topic     the topic name
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))} and
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier)}
+     * to read back as a {@code KTable}
      */
     @Deprecated
-    KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+    KTable<K, V> through(final Serde<K> keySerde,
+                         final Serde<V> valSerde,
                          final String topic,
                          final StateStoreSupplier<KeyValueStore> storeSupplier);
 
@@ -968,10 +989,10 @@ public interface KTable<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
-     * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
+     * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with an interna; store name (cf.
-     * {@link StreamsBuilder#table(String)})
+     * {@link KStreamBuilder#table(String)})
      *
      * @param keySerde  key serde used to send key-value pairs,
      *                  if not specified the default key serde defined in the configuration will be used
@@ -979,11 +1000,13 @@ public interface KTable<K, V> {
      *                  if not specified the default value serde defined in the configuration will be used
      * @param topic     the topic name
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))}
+     * and {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
      */
     @Deprecated
-    KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+    KTable<K, V> through(final Serde<K> keySerde,
+                         final Serde<V> valSerde,
                          final String topic);
 
     /**
@@ -994,10 +1017,10 @@ public interface KTable<K, V> {
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
      * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in the configuration will be used
@@ -1011,8 +1034,10 @@ public interface KTable<K, V> {
      * @param queryableStoreName  the state store name used for the result {@code KTable}.
      *                            If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, StreamPartitioner, String)()}
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
+     * to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final Serde<K> keySerde,
@@ -1029,10 +1054,10 @@ public interface KTable<K, V> {
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
      * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
+     * {@link KStreamBuilder#table(String, String)})
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in the configuration will be used
@@ -1045,8 +1070,10 @@ public interface KTable<K, V> {
      * @param topic      the topic name
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and
+     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier))}
+     * to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final Serde<K> keySerde,
@@ -1063,10 +1090,10 @@ public interface KTable<K, V> {
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
      * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
+     * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
-     * {@link StreamsBuilder#table(String)})
+     * {@link KStreamBuilder#table(String)})
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in the configuration will be used
@@ -1078,8 +1105,9 @@ public interface KTable<K, V> {
      *                    be used
      * @param topic      the topic name
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
-     * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and
+     * {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
      */
     @Deprecated
     KTable<K, V> through(final Serde<K> keySerde,
@@ -1094,7 +1122,7 @@ public interface KTable<K, V> {
      * started).
      *
      * @param topic the topic name
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
+     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)}
      */
     @Deprecated
     void to(final String topic);
@@ -1108,7 +1136,8 @@ public interface KTable<K, V> {
      * @param partitioner the function used to determine how records are distributed among partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be used
      * @param topic       the topic name
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.withStreamPartitioner(partitioner)}
      */
     @Deprecated
     void to(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -1127,7 +1156,8 @@ public interface KTable<K, V> {
      * @param valSerde value serde used to send key-value pairs,
      *                 if not specified the default value serde defined in the configuration will be used
      * @param topic    the topic name
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde)}
      */
     @Deprecated
     void to(final Serde<K> keySerde,
@@ -1149,7 +1179,8 @@ public interface KTable<K, V> {
      *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} will
      *                    be used
      * @param topic      the topic name
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+     * @deprecated use {@link #toStream()} followed by
+     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partioner)}
      */
     @Deprecated
     void to(final Serde<K> keySerde,
@@ -1243,7 +1274,7 @@ public interface KTable<K, V> {
      * @param <KR>       the key type of the result {@link KGroupedTable}
      * @param <VR>       the value type of the result {@link KGroupedTable}
      * @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
-     * @deprecated use {@link #groupBy(KeyValueMapper, Serialized)}
+     * @deprecated use {@link #groupBy(KeyValueMapper, Serialized) groupBy(selector, Serialized.with(keySerde, valueSerde)}
      */
     @Deprecated
     <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
@@ -1475,7 +1506,7 @@ public interface KTable<K, V> {
      * {@link ValueJoiner}, one for each matched record-pair with the same key
      * @see #leftJoin(KTable, ValueJoiner, Materialized)
      * @see #outerJoin(KTable, ValueJoiner, Materialized)
-     * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)}
+     * @deprecated use {@link #join(KTable, ValueJoiner, Materialized) join(other, joiner, Materialized.as(queryableStoreName).withValueSerde(joinSerde)}
      */
     @Deprecated
     <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
@@ -1554,7 +1585,7 @@ public interface KTable<K, V> {
      * {@link ValueJoiner}, one for each matched record-pair with the same key
      * @see #leftJoin(KTable, ValueJoiner, Materialized)
      * @see #outerJoin(KTable, ValueJoiner, Materialized)
-     * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)}
+     * @deprecated use {@link #join(KTable, ValueJoiner, Materialized) join(other, joiner, Materialized.as(KeyValueByteStoreSupplier)}
      */
     @Deprecated
     <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
@@ -1811,7 +1842,7 @@ public interface KTable<K, V> {
      * left {@code KTable}
      * @see #join(KTable, ValueJoiner, Materialized)
      * @see #outerJoin(KTable, ValueJoiner, Materialized)
-     * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)}
+     * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized) leftJoin(other, joiner, Materialized.as(queryableStoreName).withValueSerde(joinSerde)}
      */
     @Deprecated
     <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
@@ -1898,7 +1929,7 @@ public interface KTable<K, V> {
      * left {@code KTable}
      * @see #join(KTable, ValueJoiner, Materialized)
      * @see #outerJoin(KTable, ValueJoiner, Materialized)
-     * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)}
+     * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized) leftJoin(other, joiner, Materialized.as(KeyValueByteStoreSupplier)}
      */
     @Deprecated
     <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
@@ -2153,7 +2184,7 @@ public interface KTable<K, V> {
      * both {@code KTable}s
      * @see #join(KTable, ValueJoiner, Materialized)
      * @see #leftJoin(KTable, ValueJoiner, Materialized)
-     * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)}
+     * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized) outerJoin(other, joiner, Materialized.as(queryableStoreName).withValueSerde(joinSerde)}
      */
     @Deprecated
     <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
@@ -2239,7 +2270,7 @@ public interface KTable<K, V> {
      * both {@code KTable}s
      * @see #join(KTable, ValueJoiner)
      * @see #leftJoin(KTable, ValueJoiner)
-     * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)}
+     * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized) outerJoin(other, joiner, Materialized.as(KeyValueByteStoreSupplier)}
      */
     @Deprecated
     <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
index d3db087..8d2c22a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
@@ -105,7 +105,6 @@ public class Printed<K, V> {
      * The example below shows how to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
index e7cc234..1acd587 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
@@ -28,12 +28,12 @@ import org.apache.kafka.streams.KeyValue;
  * {@code Reducer} can be used to implement aggregation functions like sum, min, or max.
  *
  * @param <V> value type
- * @see KGroupedStream#reduce(Reducer, String)
- * @see KGroupedStream#reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, Windows, String)
- * @see KGroupedStream#reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, String)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#reduce(Reducer)
+ * @see KGroupedStream#reduce(Reducer, Materialized)
+ * @see TimeWindowedKStream#reduce(Reducer)
+ * @see TimeWindowedKStream#reduce(Reducer, Materialized)
+ * @see SessionWindowedKStream#reduce(Reducer)
+ * @see SessionWindowedKStream#reduce(Reducer, Materialized)
  * @see Aggregator
  */
 public interface Reducer<V> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index 3c3ef7e..ddc0371 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -127,13 +127,13 @@ public interface SessionWindowedKStream<K, V> {
      * @param initializer    the instance of {@link Initializer}
      * @param aggregator     the instance of {@link Aggregator}
      * @param sessionMerger  the instance of {@link Merger}
-     * @param <T>           the value type of the resulting {@link KTable}
+     * @param <VR>            the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
      */
-    <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
-                                         final Aggregator<? super K, ? super V, T> aggregator,
-                                         final Merger<? super K, T> sessionMerger);
+    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                           final Aggregator<? super K, ? super V, VR> aggregator,
+                                           final Merger<? super K, VR> sessionMerger);
 
     /**
      * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
@@ -222,10 +222,10 @@ public interface SessionWindowedKStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -260,8 +260,8 @@ public interface SessionWindowedKStream<K, V> {
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
      * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
-     * @param reducer           a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
-     * @param materialized     an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}
+     * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
+     * @param materializedAs an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 6e06461..693bee0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -58,12 +58,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
  * @see TimeWindows
  * @see UnlimitedWindows
  * @see JoinWindows
- * @see KGroupedStream#count(SessionWindows, String)
- * @see KGroupedStream#count(SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, String)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#windowedBy(SessionWindows)
  * @see TimestampExtractor
  */
 public final class SessionWindows {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 38362ad..f9090c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -45,12 +45,7 @@ import java.util.Map;
  * @see SessionWindows
  * @see UnlimitedWindows
  * @see JoinWindows
- * @see KGroupedStream#count(Windows, String)
- * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, Windows, String)
- * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#windowedBy(Windows)
  * @see TimestampExtractor
  */
 public final class TimeWindows extends Windows<TimeWindow> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index e116a8b..a3b9338 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -34,12 +34,7 @@ import java.util.Map;
  * @see TimeWindows
  * @see SessionWindows
  * @see JoinWindows
- * @see KGroupedStream#count(Windows, String)
- * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, Windows, String)
- * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#windowedBy(Windows)
  * @see TimestampExtractor
  */
 public final class UnlimitedWindows extends Windows<UnlimitedWindow> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index 576706e..7728317 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -26,18 +26,8 @@ package org.apache.kafka.streams.kstream;
  * Thus, a windowed {@link KTable} has type {@code <Windowed<K>,V>}.
  *
  * @param <K> type of the key
- * @see KGroupedStream#count(Windows, String)
- * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#count(SessionWindows, String)
- * @see KGroupedStream#count(SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, Windows, String)
- * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, String)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
+ * @see KGroupedStream#windowedBy(Windows)
+ * @see KGroupedStream#windowedBy(SessionWindows)
  */
 public class Windowed<K> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index e6bc112..4d07f9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -33,7 +33,7 @@ public class QueryableStoreTypes {
      * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore}
      * @param <K>   key type of the store
      * @param <V>   value type of the store
-     * @return  {@link KeyValueStoreType}
+     * @return  {@link QueryableStoreTypes.KeyValueStoreType}
      */
     public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore() {
         return new KeyValueStoreType<>();
@@ -43,7 +43,7 @@ public class QueryableStoreTypes {
      * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}
      * @param <K>   key type of the store
      * @param <V>   value type of the store
-     * @return  {@link WindowStoreType}
+     * @return  {@link QueryableStoreTypes.WindowStoreType}
      */
     public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStore() {
         return new WindowStoreType<>();
@@ -53,7 +53,7 @@ public class QueryableStoreTypes {
      * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}
      * @param <K>   key type of the store
      * @param <V>   value type of the store
-     * @return  {@link SessionStoreType}
+     * @return  {@link QueryableStoreTypes.SessionStoreType}
      */
     public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> sessionStore() {
         return new SessionStoreType<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c0beb9e..05ebd33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -190,7 +190,7 @@ public class Stores {
      *
      * @param name the name of the store
      * @return the factory that can be used to specify other options or configurations for the store; never null
-     * @deprected use {@link #persistentKeyValueStore(String)}, {@link #persistentWindowStore(String, long, int, long, boolean)}
+     * @deprecated use {@link #persistentKeyValueStore(String)}, {@link #persistentWindowStore(String, long, int, long, boolean)}
      * {@link #persistentSessionStore(String, long)}, {@link #lruMap(String, int)}, or {@link #inMemoryKeyValueStore(String)}
      */
     @Deprecated


[2/2] kafka git commit: MINOR: fix JavaDocs warnings

Posted by da...@apache.org.
MINOR: fix JavaDocs warnings

 - add some missing annotations for deprecated methods

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Michael G. Noll <mi...@confluent.io>, Damian Guy <da...@gmail.com>

Closes #4005 from mjsax/minor-fix-javadoc-warnings


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3dcbbf70
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3dcbbf70
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3dcbbf70

Branch: refs/heads/trunk
Commit: 3dcbbf703017985c9e212ad69cc7afdddbf358eb
Parents: 716330a
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue Oct 3 07:35:42 2017 -0700
Committer: Damian Guy <da...@gmail.com>
Committed: Tue Oct 3 07:35:42 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |   9 +-
 .../kafka/streams/TopologyDescription.java      |   4 +-
 .../kafka/streams/kstream/Aggregator.java       |  12 +-
 .../kafka/streams/kstream/GlobalKTable.java     |   2 +-
 .../kafka/streams/kstream/Initializer.java      |  12 +-
 .../kafka/streams/kstream/JoinWindows.java      |   2 +-
 .../kafka/streams/kstream/KGroupedStream.java   | 124 ++++++------
 .../kafka/streams/kstream/KGroupedTable.java    |  27 +--
 .../apache/kafka/streams/kstream/KStream.java   |  88 ++++-----
 .../kafka/streams/kstream/KStreamBuilder.java   |  15 +-
 .../apache/kafka/streams/kstream/KTable.java    | 189 +++++++++++--------
 .../apache/kafka/streams/kstream/Printed.java   |   1 -
 .../apache/kafka/streams/kstream/Reducer.java   |  12 +-
 .../streams/kstream/SessionWindowedKStream.java |  14 +-
 .../kafka/streams/kstream/SessionWindows.java   |   7 +-
 .../kafka/streams/kstream/TimeWindows.java      |   7 +-
 .../kafka/streams/kstream/UnlimitedWindows.java |   7 +-
 .../apache/kafka/streams/kstream/Windowed.java  |  14 +-
 .../streams/state/QueryableStoreTypes.java      |   6 +-
 .../org/apache/kafka/streams/state/Stores.java  |   2 +-
 20 files changed, 288 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 928d0e9..fd9c729 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
@@ -948,10 +949,10 @@ public class KafkaStreams {
      * <p>
      * This will use the default Kafka Streams partitioner to locate the partition.
      * If a {@link StreamPartitioner custom partitioner} has been
-     * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig},
-     * {@link KStream#through(StreamPartitioner, String)}, or {@link KTable#through(StreamPartitioner, String, String)},
-     * or if the original {@link KTable}'s input {@link StreamsBuilder#table(String, String) topic} is partitioned
-     * differently, please use {@link #metadataForKey(String, Object, StreamPartitioner)}.
+     * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig} or
+     * {@link KStream#through(String, Produced)}, or if the original {@link KTable}'s input
+     * {@link StreamsBuilder#table(String) topic} is partitioned differently, please use
+     * {@link #metadataForKey(String, Object, StreamPartitioner)}.
      * <p>
      * Note:
      * <ul>

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 1b520c6..01af8bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -55,9 +55,9 @@ public interface TopologyDescription {
     }
 
     /**
-     * Represents a {@link Topology#addGlobalStore(org.apache.kafka.streams.processor.StateStoreSupplier, String,
+     * Represents a {@link Topology#addGlobalStore(org.apache.kafka.streams.state.StoreBuilder, String,
      * org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
-     * String, org.apache.kafka.streams.processor.ProcessorSupplier)} global store}.
+     * String, org.apache.kafka.streams.processor.ProcessorSupplier) global store}.
      * Adding a global store results in adding a source node and one stateful processor node.
      * Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different
      * global stores are not connected to each other.

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index 4eec4f5..217a145 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -29,12 +29,12 @@ package org.apache.kafka.streams.kstream;
  * @param <V> input value type
  * @param <VA> aggregate value type
  * @see Initializer
- * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Materialized)
+ * @see TimeWindowedKStream#aggregate(Initializer, Aggregator)
+ * @see TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)
+ * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger)
+ * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized)
  * @see Reducer
  */
 public interface Aggregator<K, V, VA> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index 81aa405..72286c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -61,7 +61,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  * @param <K> Type of primary keys
  * @param <V> Type of value changes
  * @see KTable
- * @see StreamsBuilder#globalTable(String, String)
+ * @see StreamsBuilder#globalTable(String)
  * @see KStream#join(GlobalKTable, KeyValueMapper, ValueJoiner)
  * @see KStream#leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
index d41c638..1b59c64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
@@ -23,12 +23,12 @@ package org.apache.kafka.streams.kstream;
  *
  * @param <VA> aggregate value type
  * @see Aggregator
- * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Materialized)
+ * @see TimeWindowedKStream#aggregate(Initializer, Aggregator)
+ * @see TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)
+ * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger)
+ * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized)
  */
 public interface Initializer<VA> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index ef9ed01..863ae95 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -59,7 +59,7 @@ import java.util.Map;
  * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows)
  * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
  * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
- * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
+ * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
  * @see TimestampExtractor
  */
 public final class JoinWindows extends Windows<Window> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 1c72ebf..17f2db4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -85,7 +85,7 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
-     * @deprecated use {@link #count(Materialized)
+     * @deprecated use {@link #count(Materialized) count(Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<K, Long> count(final String queryableStoreName);
@@ -146,7 +146,7 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
-     * @deprecated use {@link #count(Materialized)}
+     * @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -229,7 +229,8 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count(Windows)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window.
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#count(Materialized) count(Materialized.as(queryableStoreName))}
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
@@ -264,7 +265,7 @@ public interface KGroupedStream<K, V> {
      * @param windows   the specification of the aggregation {@link Windows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by {@link TimeWindowedKStream#count() count()}
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows);
@@ -306,7 +307,8 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
@@ -344,10 +346,11 @@ public interface KGroupedStream<K, V> {
      *
      * @param sessionWindows the specification of the aggregation {@link SessionWindows}
      * @param queryableStoreName  the name of the state store created from this operation; valid characters are ASCII
-     * alphanumerics, '.', '_' and '-. If {@code null} then this will be equivalent to {@link KGroupedStream#count(SessionWindows)} ()}.
+     * alphanumerics, '.', '_' and '-. If {@code null} then this will be equivalent to {@link KGroupedStream#count(SessionWindows)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#count(Materialized) count(Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName);
@@ -371,7 +374,8 @@ public interface KGroupedStream<K, V> {
      * @param sessionWindows the specification of the aggregation {@link SessionWindows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#count() count()}
      */
     @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows);
@@ -409,7 +413,8 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier  user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
@@ -419,7 +424,7 @@ public interface KGroupedStream<K, V> {
      * Combine the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
      * Combining implies that the type of the aggregate result is the same as the type of the input value
-     * (c.f. {@link #aggregate(Initializer, Aggregator, Serde, String)}).
+     * (c.f. {@link #aggregate(Initializer, Aggregator)}).
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * that can be queried using the provided {@code queryableStoreName}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -464,10 +469,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -506,7 +511,7 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer)} ()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprectated use {@link #reduce(Reducer, Materialized)}
+     * @deprecated  use {@link #reduce(Reducer, Materialized) reduce(reducer, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<K, V> reduce(final Reducer<V> reducer,
@@ -527,10 +532,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -562,7 +567,7 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprectated use {@link #reduce(Reducer, Materialized)}
+     * @deprecated use {@link #reduce(Reducer, Materialized) reduce(reducer, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<K, V> reduce(final Reducer<V> reducer,
@@ -582,10 +587,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -638,10 +643,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -680,10 +685,11 @@ public interface KGroupedStream<K, V> {
      * @param reducer   a {@link Reducer} that computes a new aggregate result
      * @param windows   the specification of the aggregation {@link Windows}
      * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
-     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, Windows)} ()}.
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, Windows)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#reduce(Reducer, Materialized) reduce(reducer, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -728,7 +734,8 @@ public interface KGroupedStream<K, V> {
      * @param windows   the specification of the aggregation {@link Windows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#reduce(Reducer) reduce(reducer)}
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -752,10 +759,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -790,7 +797,8 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#reduce(Reducer, Materialized) reduce(reducer, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -813,10 +821,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -855,10 +863,11 @@ public interface KGroupedStream<K, V> {
      * @param reducer           a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
      * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
      * @param queryableStoreName     the name of the state store created from this operation; valid characters are ASCII
-     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)} ()}.
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#reduce(Reducer, Materialized) reduce(reducer, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -893,7 +902,8 @@ public interface KGroupedStream<K, V> {
      * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#reduce(Reducer) reduce(reducer)}
      */
     @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -915,10 +925,10 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * // At the example of a Reducer<Long>
      * new Reducer<Long>() {
-     *   @Override
      *   public Long apply(Long aggValue, Long currValue) {
      *     return aggValue + currValue;
      *   }
+     * }
      * }</pre>
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -959,7 +969,8 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier     user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#reduce(Reducer, Materialized) reduce(reducer, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -1016,11 +1027,11 @@ public interface KGroupedStream<K, V> {
      * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will be used
      * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
-     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Serde)} ()} ()}.
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Serde)}.
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as(queryableStoreName).withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -1031,7 +1042,7 @@ public interface KGroupedStream<K, V> {
     /**
      * Aggregate the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
      * allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * that can be queried using the provided {@code queryableStoreName}.
@@ -1043,7 +1054,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
-     * count (c.f. {@link #count(String)}).
+     * count (c.f. {@link #count()}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same key.
@@ -1087,7 +1098,7 @@ public interface KGroupedStream<K, V> {
     /**
      * Aggregate the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
      * allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * that can be queried using the provided {@code queryableStoreName}.
@@ -1099,7 +1110,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
-     * count (c.f. {@link #count(String)}).
+     * count (c.f. {@link #count()}).
      * <p>
      * The default value serde from config will be used for serializing the result.
      * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
@@ -1127,10 +1138,11 @@ public interface KGroupedStream<K, V> {
      */
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> aggregator);
+
     /**
      * Aggregate the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
      * allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * that can be queried using the provided {@code queryableStoreName}.
@@ -1142,7 +1154,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
-     * count (c.f. {@link #count(String)}).
+     * count (c.f. {@link #count()}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same key.
@@ -1166,14 +1178,13 @@ public interface KGroupedStream<K, V> {
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> aggregator,
                                  final Serde<VR> aggValueSerde);
 
-
     /**
      * Aggregate the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
@@ -1189,7 +1200,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to compute aggregate functions
-     * like count (c.f. {@link #count(String)}).
+     * like count (c.f. {@link #count()}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same key.
@@ -1217,7 +1228,7 @@ public interface KGroupedStream<K, V> {
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -1243,7 +1254,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
-     * functions like count (c.f. {@link #count(String)}).
+     * functions like count (c.f. {@link #count(Windows)}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1282,10 +1293,11 @@ public interface KGroupedStream<K, V> {
      *                      if not specified the default serdes defined in the configs will be used
      * @param <VR>          the value type of the resulting {@link KTable}
      * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
-     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Windows, Serde)} ()} ()}.
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Windows, Serde)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as(queryableStoreName).withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
@@ -1313,7 +1325,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
-     * functions like count (c.f. {@link #count(String)}).
+     * functions like count (c.f. {@link #count(Windows)}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1338,7 +1350,8 @@ public interface KGroupedStream<K, V> {
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)} aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
@@ -1346,7 +1359,6 @@ public interface KGroupedStream<K, V> {
                                                              final Windows<W> windows,
                                                              final Serde<VR> aggValueSerde);
 
-
     /**
      * Aggregate the values of records in this stream by the grouped key and defined windows.
      * Records with {@code null} key or value are ignored.
@@ -1366,7 +1378,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Windows, StateStoreSupplier)} can be used to compute aggregate
-     * functions like count (c.f. {@link #count(String)}).
+     * functions like count (c.f. {@link #count(Windows)}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1397,7 +1409,8 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(Windows)}
+     * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+     * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
@@ -1422,7 +1435,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)} can be used to compute
-     * aggregate functions like count (c.f. {@link #count(String)})
+     * aggregate functions like count (c.f. {@link #count(SessionWindows)})
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1452,10 +1465,11 @@ public interface KGroupedStream<K, V> {
      *                      if not specified the default serdes defined in the configs will be used
      * @param <T>           the value type of the resulting {@link KTable}
      * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
-     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde)} ()} ()}.
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as(queryableStoreName).withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
@@ -1482,7 +1496,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)} can be used to compute
-     * aggregate functions like count (c.f. {@link #count(String)})
+     * aggregate functions like count (c.f. {@link #count(SessionWindows)})
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1500,7 +1514,8 @@ public interface KGroupedStream<K, V> {
      * @param <T>           the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
@@ -1526,7 +1541,7 @@ public interface KGroupedStream<K, V> {
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
      * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, StateStoreSupplier)} can be used
-     * to compute aggregate functions like count (c.f. {@link #count(String)}).
+     * to compute aggregate functions like count (c.f. {@link #count(SessionWindows)}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
      * the same window and key.
@@ -1559,7 +1574,8 @@ public interface KGroupedStream<K, V> {
      * @param <T>           the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
-     * @deprecated use {@link #windowedBy(SessionWindows)}
+     * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+     * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as(KeyValueByteStoreSupplier).withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index f854320..f814eaf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -81,7 +81,7 @@ public interface KGroupedTable<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#count()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
-     * @deprecated use {@link #count(Materialized)}
+     * @deprecated use {@link #count(Materialized) count(Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<K, Long> count(final String queryableStoreName);
@@ -192,7 +192,7 @@ public interface KGroupedTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
-     * @deprecated use {@link #count(Materialized)}
+     * @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier)}
      */
     @Deprecated
     KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -264,7 +264,7 @@ public interface KGroupedTable<K, V> {
      * '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#reduce(Reducer, Reducer)} ()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #reduce(Reducer, Reducer, Materialized)}
+     * @deprecated use {@link #reduce(Reducer, Reducer, Materialized) reduce(adder, subtractor, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     KTable<K, V> reduce(final Reducer<V> adder,
@@ -346,7 +346,7 @@ public interface KGroupedTable<K, V> {
      * mapped} to the same key into a new instance of {@link KTable}.
      * Records with {@code null} key are ignored.
      * Combining implies that the type of the aggregate result is the same as the type of the input value
-     * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Serde, String)}).
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator)}).
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * that can be queried using the provided {@code queryableStoreName}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -462,7 +462,7 @@ public interface KGroupedTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #reduce(Reducer, Reducer, Materialized)}
+     * @deprecated use {@link #reduce(Reducer, Reducer, Materialized) reduce(adder, subtractor, Materialized.as(KeyValueByteStoreSupplier))}
      */
     @Deprecated
     KTable<K, V> reduce(final Reducer<V> adder,
@@ -547,7 +547,7 @@ public interface KGroupedTable<K, V> {
      * @param <VR>        the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as(queryableStoreName))}
      */
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -559,7 +559,7 @@ public interface KGroupedTable<K, V> {
      * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
      * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
      * Records with {@code null} key are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, Materialized) combining via reduce(...)} as it,
      * for example, allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * provided by the given {@code storeSupplier}.
@@ -639,11 +639,10 @@ public interface KGroupedTable<K, V> {
      * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
      * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
      * Records with {@code null} key are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer) combining via reduce(...)} as it,
      * for example, allows the result to have a different type than the input values.
      * If the result value type does not match the {@link StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value
-     * serde} you should use {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde, String)
-     * aggregate(Initializer, Aggregator, Aggregator, Serde, String)}.
+     * serde} you should use {@link #aggregate(Initializer, Aggregator, Aggregator, Serde)}.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * provided by the given {@code storeSupplier}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -784,7 +783,7 @@ public interface KGroupedTable<K, V> {
      * @param <VR>          the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as(queryableStoreName).withValueSerde(aggValueSerde))}
      */
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -797,7 +796,7 @@ public interface KGroupedTable<K, V> {
      * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
      * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
      * Records with {@code null} key are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, Materialized) combining via reduce(...)} as it,
      * for example, allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * that can be queried using the provided {@code queryableStoreName}.
@@ -857,7 +856,9 @@ public interface KGroupedTable<K, V> {
      * @param <VR>          the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
      */
+    @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> adder,
                                  final Aggregator<? super K, ? super V, VR> subtractor,
@@ -938,7 +939,9 @@ public interface KGroupedTable<K, V> {
      * @param <VR>          the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as(KeyValueByteStoreSupplier))}
      */
+    @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> adder,
                                  final Aggregator<? super K, ? super V, VR> subtractor,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index c56a4ed..0d1d201 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -290,7 +291,7 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param label the name used to label the key/value pairs printed to the console
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut()}
      */
     @Deprecated
     void print(final String label);
@@ -308,7 +309,7 @@ public interface KStream<K, V> {
      *
      * @param keySerde key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde value serde used to deserialize value if type is {@code byte[]},
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withKeyValueMapper(...)}
      */
     @Deprecated
     void print(final Serde<K> keySerde,
@@ -326,7 +327,7 @@ public interface KStream<K, V> {
      * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde   value serde used to deserialize value if type is {@code byte[]},
      * @param label the name used to label the key/value pairs printed to the console
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(...)}
      */
     @Deprecated
     void print(final Serde<K> keySerde,
@@ -342,7 +343,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -352,7 +352,7 @@ public interface KStream<K, V> {
      * The KeyValueMapper's mapped value type must be {@code String}.
      *
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withKeyValueMapper(mapper)}
      */
     @Deprecated
     void print(final KeyValueMapper<? super K, ? super V, String> mapper);
@@ -366,7 +366,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -377,7 +376,7 @@ public interface KStream<K, V> {
      *
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
      * @param label The given name which labels output will be printed.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(mapper)}
      */
     @Deprecated
     void print(final KeyValueMapper<? super K, ? super V, String> mapper, final String label);
@@ -391,7 +390,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -404,9 +402,9 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code byte[]}.
-     * @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code byte[]}.
-     * @deprecated use {@code print(Printed)}
+     * @param keySerde a {@link Serde} used to deserialize key if type is {@code byte[]}.
+     * @param valSerde a {@link Serde} used to deserialize value if type is {@code byte[]}.
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withKeyValueMapper(mapper)}
      */
     @Deprecated
     void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde);
@@ -420,7 +418,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -433,10 +430,10 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code byte[]}.
-     * @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code byte[]}.
+     * @param keySerde a {@link Serde} used to deserialize key if type is {@code byte[]}.
+     * @param valSerde a {@link Serde} used to deserialize value if type is {@code byte[]}.
      * @param label The given name which labels output will be printed.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(mapper)}
      */
     @Deprecated
     void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label);
@@ -455,7 +452,7 @@ public interface KStream<K, V> {
      * Relative order is preserved within each input stream though (ie, records within one input
      * stream are processed in order).
      *
-     * @param a stream which is to be merged into this stream
+     * @param stream a stream which is to be merged into this stream
      * @return a merged stream containing all records from this and the provided {@code KStream}
      */
     KStream<K, V> merge(final KStream<K, V> stream);
@@ -472,7 +469,7 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param filePath name of the file to write to
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath)}
      */
     @Deprecated
     void writeAsText(final String filePath);
@@ -489,7 +486,7 @@ public interface KStream<K, V> {
      *
      * @param filePath   name of the file to write to
      * @param label the name used to label the key/value pairs written to the file
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withLabel(label)}
      */
     @Deprecated
     void writeAsText(final String filePath,
@@ -509,7 +506,7 @@ public interface KStream<K, V> {
      * @param filePath name of the file to write to
      * @param keySerde key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde value serde used to deserialize value if type is {@code byte[]},
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(...)}
      */
     @Deprecated
     void writeAsText(final String filePath,
@@ -530,7 +527,7 @@ public interface KStream<K, V> {
      * @param label the name used to label the key/value pairs written to the file
      * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde   value serde used deserialize value if type is {@code byte[]},
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(...)}
      */
     @Deprecated
     void writeAsText(final String filePath,
@@ -549,7 +546,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -560,7 +556,7 @@ public interface KStream<K, V> {
      *
      * @param filePath path of the file to write to.
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(mapper)}
      */
     @Deprecated
     void writeAsText(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper);
@@ -576,7 +572,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -588,7 +583,7 @@ public interface KStream<K, V> {
      * @param filePath path of the file to write to.
      * @param label the name used to label records written to file.
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(mapper)}
      */
     @Deprecated
     void writeAsText(final String filePath, final String label, final KeyValueMapper<? super K, ? super V, String> mapper);
@@ -604,7 +599,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -620,7 +614,7 @@ public interface KStream<K, V> {
      * @param keySerde key serde used to deserialize key if type is {@code byte[]}.
      * @param valSerde value serde used to deserialize value if type is {@code byte[]}.
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(mapper)}
      */
     @Deprecated
     void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper);
@@ -636,7 +630,6 @@ public interface KStream<K, V> {
      * The example below shows the way to customize output data.
      * <pre>{@code
      * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
-     *     @Override
      *     public String apply(Integer key, String value) {
      *         return String.format("(%d, %s)", key, value);
      *     }
@@ -653,8 +646,9 @@ public interface KStream<K, V> {
      * @param keySerde key serde used to deserialize key if type is {@code byte[]}.
      * @param valSerde value serde used to deserialize value if type is {@code byte[]}.
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
-     * @deprecated use {@code print(Printed)}
+     * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(mapper)}
      */
+    @Deprecated
     void writeAsText(final String filePath, final String label, final Serde<K> keySerde, final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper);
 
     /**
@@ -704,8 +698,7 @@ public interface KStream<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link StreamsBuilder#stream(String)
-     * StreamsBuilder#stream(someTopicName)}.
+     * {@link StreamsBuilder#stream(String) StreamsBuilder#stream(someTopicName)}.
      *
      * @param topic the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
@@ -725,7 +718,7 @@ public interface KStream<K, V> {
      *                    if not specified producer's {@link DefaultPartitioner} will be used
      * @param topic       the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
-     * @deprecated use {@code through(String, Produced)}
+     * @deprecated use {@link #through(String, Produced) through(topic, Produced.withStreamPartitioner(partitioner))}
      */
     @Deprecated
     KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -740,7 +733,7 @@ public interface KStream<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valSerde, someTopicName)} and
-     * {@link StreamsBuilder#stream(Serde, Serde, String...) StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
+     * {@link KStreamBuilder#stream(Serde, Serde, String...) KStreamBuilder#stream(keySerde, valSerde, someTopicName)}.
      *
      * @param keySerde key serde used to send key-value pairs,
      *                 if not specified the default key serde defined in the configuration will be used
@@ -748,7 +741,7 @@ public interface KStream<K, V> {
      *                 if not specified the default value serde defined in the configuration will be used
      * @param topic    the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
-     * @deprecated use {@code through(String, Produced)}
+     * @deprecated use {@link #through(String, Produced) through(topic, Produced.with(keySerde, valSerde))}
      */
     @Deprecated
     KStream<K, V> through(final Serde<K> keySerde,
@@ -762,8 +755,8 @@ public interface KStream<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) #to(keySerde, valSerde,
-     * StreamPartitioner, someTopicName)} and {@link StreamsBuilder#stream(Serde, Serde, String...)
-     * StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
+     * StreamPartitioner, someTopicName)} and {@link KStreamBuilder#stream(Serde, Serde, String...)
+     * KStreamBuilder#stream(keySerde, valSerde, someTopicName)}.
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in the configuration will be used
@@ -775,7 +768,7 @@ public interface KStream<K, V> {
      *                    be used
      * @param topic       the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
-     * @deprecated use {@code through(String, Produced)}
+     * @deprecated use {@link #through(String, Produced) through(topic, Produced.with(keySerde, valSerde, partitioner))}
      */
     @Deprecated
     KStream<K, V> through(final Serde<K> keySerde,
@@ -791,8 +784,7 @@ public interface KStream<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String, Produced) to(someTopic, Produced.with(keySerde, valueSerde)}
-     * and {@link StreamsBuilder#stream(Serde, Serde, String...)
-     * StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
+     * and {@link StreamsBuilder#stream(String, Consumed) StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde))}.
      *
      * @param topic
      * @param produced
@@ -820,7 +812,7 @@ public interface KStream<K, V> {
      * @param partitioner the function used to determine how records are distributed among partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be used
      * @param topic       the topic name
-     * @deprecated use {@code to(String, Produced}
+     * @deprecated use {@link #to(String, Produced) to(topic, Produced.withStreamPartitioner(partitioner))}
      */
     @Deprecated
     void to(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -838,7 +830,7 @@ public interface KStream<K, V> {
      * @param valSerde value serde used to send key-value pairs,
      *                 if not specified the default serde defined in the configs will be used
      * @param topic    the topic name
-     * @deprecated use {@code to(String, Produced}
+     * @deprecated use {@link #to(String, Produced) to(topic, Produced.with(keySerde, valSerde))}
      */
     @Deprecated
     void to(final Serde<K> keySerde,
@@ -860,7 +852,7 @@ public interface KStream<K, V> {
      *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} will
      *                    be used
      * @param topic       the topic name
-     * @deprecated use {@code to(String, Produced}
+     * @deprecated use {@link #to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner)}
      */
     @Deprecated
     void to(final Serde<K> keySerde,
@@ -1159,7 +1151,7 @@ public interface KStream<K, V> {
      * @param valSerde value serdes for materializing this stream,
      *                 if not specified the default serdes defined in the configs will be used
      * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
-     * @deprecated use {@code groupByKey(Serialized)}
+     * @deprecated use {@link #groupByKey(Serialized) groupByKey(Serialized.with(keySerde, valSerde))}
      */
     @Deprecated
     KGroupedStream<K, V> groupByKey(final Serde<K> keySerde,
@@ -1244,7 +1236,7 @@ public interface KStream<K, V> {
      * @param <KR>     the key type of the result {@link KGroupedStream}
      * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
      * @see #groupByKey()
-     * @deprecated use {@code groupBy(KeyValueMapper, Serialized}
+     * @deprecated use {@link #groupBy(KeyValueMapper, Serialized) groupBy(selector, Serialized.with(keySerde, valSerde))}
      */
     @Deprecated
     <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
@@ -1480,7 +1472,7 @@ public interface KStream<K, V> {
      * {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals
      * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
      * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
-     * @deprecated use {@link #join(KStream, ValueJoiner, JoinWindows, Joined)}
+     * @deprecated use {@link #join(KStream, ValueJoiner, JoinWindows, Joined) join(otherStream, joiner, windows, Joined.with(keySerde, thisValueSerde, otherValueSerde))}
      */
     @Deprecated
     <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
@@ -1734,7 +1726,7 @@ public interface KStream<K, V> {
      * this {@code KStream} and within the joining window intervals
      * @see #join(KStream, ValueJoiner, JoinWindows, Joined)
      * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
-     * @deprecated use {@link #leftJoin(KStream, ValueJoiner, JoinWindows, Joined}
+     * @deprecated use {@link #leftJoin(KStream, ValueJoiner, JoinWindows, Joined) leftJoin(otherStream, joiner, windows, Joined.with(keySerde, thisValSerde, otherValueSerde))}
      */
     @Deprecated
     <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
@@ -1988,7 +1980,7 @@ public interface KStream<K, V> {
      * both {@code KStream}s and within the joining window intervals
      * @see #join(KStream, ValueJoiner, JoinWindows, Joined)
      * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
-     * @deprecated use {@link #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)}
+     * @deprecated use {@link #outerJoin(KStream, ValueJoiner, JoinWindows, Joined) outerJoin(otherStream, joiner, windows, Joined.with(keySerde, thisValueSerde, otherValueSerde))}
      */
     @Deprecated
     <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
@@ -2222,7 +2214,7 @@ public interface KStream<K, V> {
      * {@link ValueJoiner}, one for each matched record-pair with the same key
      * @see #leftJoin(KTable, ValueJoiner, Joined)
      * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
-     * @deprecated use {@link #join(KTable, ValueJoiner, Joined)}
+     * @deprecated use {@link #join(KTable, ValueJoiner, Joined) join(table, joiner, Joined.with(keySerde, valSerde, null))}
      */
     @Deprecated
     <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
@@ -2461,7 +2453,9 @@ public interface KStream<K, V> {
      * {@link ValueJoiner}, one output for each input {@code KStream} record
      * @see #join(KTable, ValueJoiner, Serde, Serde)
      * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
+     * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Joined) leftJoin(table, joiner, Joined.with(keySerde, valSerde, null))}
      */
+    @Deprecated
     <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                      final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                      final Serde<K> keySerde,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index ab666ba..d4642da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -479,10 +479,11 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param offsetReset       the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                          offsets are available
-     * @param topic             the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, String)} ()}.
+     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                           offsets are available
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
+     * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, String) table(AutoOffsetReset, String)}.
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -791,7 +792,8 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * @param valSerde           value serde used to send key-value pairs,
      *                           if not specified the default value serde defined in the configuration will be used
      * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
+     * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -876,7 +878,8 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * @param valSerde           value serde used to send key-value pairs,
      *                           if not specified the default value serde defined in the configuration will be used
      * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
+     * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,