You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/07 16:44:50 UTC

[kafka] branch trunk updated: MINOR: Remove deprecated KTable#writeAs, print, foreach, to, through (#4910)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 42771eb  MINOR: Remove deprecated KTable#writeAs, print, foreach, to, through (#4910)
42771eb is described below

commit 42771eb37d6abd0a34220d30773e559680bdf9b0
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon May 7 09:44:45 2018 -0700

    MINOR: Remove deprecated KTable#writeAs, print, foreach, to, through (#4910)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../org/apache/kafka/streams/kstream/KTable.java   | 640 ---------------------
 .../streams/kstream/internals/KTableImpl.java      | 258 ---------
 .../KStreamAggregationDedupIntegrationTest.java    |   6 +-
 .../KStreamAggregationIntegrationTest.java         |   9 +-
 .../kstream/internals/KGroupedTableImplTest.java   |   2 +-
 .../kstream/internals/KTableFilterTest.java        |   1 +
 .../kstream/internals/KTableForeachTest.java       | 111 ----
 .../streams/kstream/internals/KTableImplTest.java  | 109 ++--
 .../kstream/internals/KTableMapValuesTest.java     |  88 +--
 9 files changed, 95 insertions(+), 1129 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 55555a5..1aaad1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
@@ -24,9 +23,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
@@ -594,219 +591,6 @@ public interface KTable<K, V> {
                                  final Serde<VR> valueSerde,
                                  final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
 
-
-    /**
-     * Print the updated records of this {@code KTable} to {@code System.out}.
-     * This function will use the generated name of the parent processor node to label the key/value pairs printed to
-     * the console.
-     * <p>
-     * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
-     * {@code toString()} on the deserialized object.
-     * <p>
-     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
-     * {@link Integer} etc. to get meaningful information.
-     * <p>
-     * Note that {@code print()} is not applied to the internal state store and only called for each new {@code KTable}
-     * updated record.
-     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
-     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a {@link KStream} using {@link #toStream()} and then use
-     * {@link KStream#print(Printed) print(Printed.toSysOut())} on the result.
-     */
-    @Deprecated
-    void print();
-
-    /**
-     * Print the updated records of this {@code KTable} to {@code System.out}.
-     * This function will use the given name to label the key/value pairs printed to the console.
-     * <p>
-     * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
-     * {@code toString()} on the deserialized object.
-     * <p>
-     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
-     * {@link Integer} etc. to get meaningful information.
-     * <p>
-     * Note that {@code print()} is not applied to the internal state store and only called for each new {@code KTable}
-     * updated record.
-     *
-     * @param label the name used to label the key/value pairs printed to the console
-     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
-     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a {@link KStream} using {@link #toStream()} and then use
-     * {@link KStream#print(Printed) print(Printed.toSysOut().withLabel(lable))} on the result.
-     */
-    @Deprecated
-    void print(final String label);
-
-    /**
-     * Print the updated records of this {@code KTable} to {@code System.out}.
-     * This function will use the generated name of the parent processor node to label the key/value pairs printed to
-     * the console.
-     * <p>
-     * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
-     * {@code toString()} on the deserialized object.
-     * <p>
-     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
-     * {@link Integer} etc. to get meaningful information.
-     * <p>
-     * Note that {@code print()} is not applied to the internal state store and only called for each new {@code KTable}
-     * updated record.
-     *
-     * @param keySerde key serde used to deserialize key if type is {@code byte[]},
-     * @param valSerde value serde used to deserialize value if type is {@code byte[]}
-     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
-     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a {@link KStream} using {@link #toStream()} and then use
-     * {@link KStream#print(Printed) print(Printed.toSysOut().withKeyValueMapper(...)} on the result.
-     */
-    @Deprecated
-    void print(final Serde<K> keySerde,
-               final Serde<V> valSerde);
-
-    /**
-     * Print the updated records of this {@code KTable} to {@code System.out}.
-     * This function will use the given name to label the key/value pairs printed to the console.
-     * <p>
-     * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
-     * {@code toString()} on the deserialized object.
-     * <p>
-     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
-     * {@link Integer} etc. to get meaningful information.
-     * <p>
-     * Note that {@code print()} is not applied to the internal state store and only called for each new {@code KTable}
-     * updated record.
-     *
-     * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
-     * @param valSerde   value serde used to deserialize value if type is {@code byte[]},
-     * @param label the name used to label the key/value pairs printed to the console
-     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
-     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a {@link KStream} using {@link #toStream()} and then use
-     * {@link KStream#print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(...)} on the result.
-     */
-    @Deprecated
-    void print(final Serde<K> keySerde,
-               final Serde<V> valSerde,
-               final String label);
-
-    /**
-     * Write the updated records of this {@code KTable} to a file at the given path.
-     * This function will use the generated name of the parent processor node to label the key/value pairs printed to
-     * the file.
-     * <p>
-     * The default serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
-     * {@code toString()} on the deserialized object.
-     * <p>
-     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
-     * {@link Integer} etc. to get meaningful information.
-     * <p>
-     * Note that {@code writeAsText()} is not applied to the internal state store and only called for each new
-     * {@code KTable} updated record.
-     *
-     * @param filePath name of file to write to
-     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
-     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a {@link KStream} using {@link #toStream()} and then use
-     * {@link KStream#print(Printed) print(Printed.toFile(filePath)} on the result.
-     */
-    @Deprecated
-    void writeAsText(final String filePath);
-
-    /**
-     * Write the updated records of this {@code KTable} to a file at the given path.
-     * This function will use the given name to label the key/value printed to the file.
-     * <p>
-     * The default serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
-     * {@code toString()} on the deserialized object.
-     * <p>
-     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
-     * {@link Integer} etc. to get meaningful information.
-     * <p>
-     * Note that {@code writeAsText()} is not applied to the internal state store and only called for each new
-     * {@code KTable} updated record.
-     *
-     * @param filePath   name of file to write to
-     * @param label the name used to label the key/value pairs printed out to the console
-     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
-     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a {@link KStream} using {@link #toStream()} and then use
-     * {@link KStream#print(Printed) print(Printed.toFile(filePath).withLabel(label)} on the result.
-     */
-    @Deprecated
-    void writeAsText(final String filePath,
-                     final String label);
-
-    /**
-     * Write the updated records of this {@code KTable} to a file at the given path.
-     * This function will use the generated name of the parent processor node to label the key/value pairs printed to
-     * the file.
-     * <p>
-     * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
-     * {@code toString()} on the deserialized object.
-     * <p>
-     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
-     * {@link Integer} etc. to get meaningful information.
-     * <p>
-     * Note that {@code writeAsText()} is not applied to the internal state store and only called for each new
-     * {@code KTable} updated record.
-     *
-     * @param filePath name of file to write to
-     * @param keySerde key serde used to deserialize key if type is {@code byte[]},
-     * @param valSerde value serde used to deserialize value if type is {@code byte[]}
-     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
-     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a {@link KStream} using {@link #toStream()} and then use
-     * {@link KStream#print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(...)} on the result.
-     */
-    @Deprecated
-    void  writeAsText(final String filePath,
-                      final Serde<K> keySerde,
-                      final Serde<V> valSerde);
-
-    /**
-     * Write the updated records of this {@code KTable} to a file at the given path.
-     * This function will use the given name to label the key/value printed to the file.
-     * <p>
-     * The default serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
-     * {@code toString()} on the deserialized object.
-     * <p>
-     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
-     * {@link Integer} etc. to get meaningful information.
-     * <p>
-     * Note that {@code writeAsText()} is not applied to the internal state store and only called for each new
-     * {@code KTable} updated record.
-     *
-     * @param filePath name of file to write to
-     * @param label the name used to label the key/value pairs printed to the console
-     * @param keySerde key serde used to deserialize key if type is {@code byte[]},
-     * @param valSerde value serde used to deserialize value if type is {@code byte[]}
-     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
-     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a {@link KStream} using {@link #toStream()} and then use
-     * {@link KStream#print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(...)} on the result.
-     */
-    @Deprecated
-    void writeAsText(final String filePath,
-                     final String label,
-                     final Serde<K> keySerde,
-                     final Serde<V> valSerde);
-
-    /**
-     * Perform an action on each updated record of this {@code KTable}.
-     * Note that this is a terminal operation that returns void.
-     * <p>
-     * Note that {@code foreach()} is not applied to the internal state store and only called for each new
-     * {@code KTable} updated record.
-     *
-     * @param action an action to perform on each record
-     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
-     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
-     * convert to a {@link KStream} using {@link #toStream()} and then use
-     * {@link KStream#foreach(ForeachAction) foreach(action)} on the result.
-     */
-    @Deprecated
-    void foreach(final ForeachAction<? super K, ? super V> action);
-
     /**
      * Convert this changelog stream to a {@link KStream}.
      * <p>
@@ -845,430 +629,6 @@ public interface KTable<K, V> {
     <KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
 
     /**
-     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
-     * serializers and deserializers and producer's {@link DefaultPartitioner}.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     * <p>
-     * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
-     * <p>
-     * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
-     * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
-     *
-     * @param topic     the topic name
-     * @param queryableStoreName the state store name used for the result {@code KTable}; valid characters are ASCII
-     *                  alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)}
-     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
-     * to read back as a {@code KTable}
-     */
-    @Deprecated
-    KTable<K, V> through(final String topic,
-                         final String queryableStoreName);
-
-    /**
-     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
-     * serializers and deserializers and producer's {@link DefaultPartitioner}.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     * <p>
-     * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
-     * <p>
-     * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
-     * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
-     *
-     * @param topic     the topic name
-     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
-     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and
-     * and {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier))}
-     * to read back as a {@code KTable}
-     */
-    @Deprecated
-    KTable<K, V> through(final String topic,
-                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
-
-    /**
-     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
-     * serializers and deserializers and producer's {@link DefaultPartitioner}.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     * <p>
-     * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
-     * <p>
-     * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
-     * {@link KStreamBuilder#table(String)})
-     *
-     * @param topic     the topic name
-     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and
-     * and {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
-     */
-    @Deprecated
-    KTable<K, V> through(final String topic);
-
-    /**
-     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
-     * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of
-     * records to partitions.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     * <p>
-     * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
-     * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
-     * <p>
-     * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
-     * {@link KStreamBuilder#table(String)})
-     *
-     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
-     *                    if not specified producer's {@link DefaultPartitioner} will be used
-     * @param topic       the topic name
-     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by
-     * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and
-     * {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
-     */
-    @Deprecated
-    KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
-                         final String topic);
-
-    /**
-     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
-     * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of
-     * records to partitions.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     * <p>
-     * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
-     * <p>
-     * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
-     *
-     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
-     *                    if not specified producer's {@link DefaultPartitioner} will be used
-     * @param topic       the topic name
-     * @param queryableStoreName   the state store name used for the result {@code KTable}.
-     *                             If {@code null} this is the equivalent of {@link KTable#through(StreamPartitioner, String)}
-     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by
-     * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
-     * to read back as a {@code KTable}
-     */
-    @Deprecated
-    KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
-                         final String topic,
-                         final String queryableStoreName);
-
-    /**
-     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
-     * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of
-     * records to partitions.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     * <p>
-     * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
-     * <p>
-     * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
-     *
-     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
-     *                    if not specified producer's {@link DefaultPartitioner} will be used
-     * @param topic       the topic name
-     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
-     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by
-     * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier)}
-     * to read back as a {@code KTable}
-     */
-    @Deprecated
-    KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
-                         final String topic,
-                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
-
-    /**
-     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     * <p>
-     * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
-     * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
-     * <p>
-     * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
-     * <p>
-     * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
-     *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name
-     * @param queryableStoreName the state store name used for the result {@code KTable}.
-     *                           If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, String)}
-     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by
-     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
-     * to read back as a {@code KTable}
-     */
-    @Deprecated
-    KTable<K, V> through(final Serde<K> keySerde,
-                         final Serde<V> valSerde,
-                         final String topic,
-                         final String queryableStoreName);
-
-    /**
-     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     * <p>
-     * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
-     * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
-     * <p>
-     * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
-     * <p>
-     * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link StreamsBuilder#table(String, Materialized)})
-     *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name
-     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
-     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by
-     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier)}
-     * to read back as a {@code KTable}
-     */
-    @Deprecated
-    KTable<K, V> through(final Serde<K> keySerde,
-                         final Serde<V> valSerde,
-                         final String topic,
-                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
-
-    /**
-     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     * <p>
-     * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
-     * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
-     * <p>
-     * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
-     * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
-     * <p>
-     * The resulting {@code KTable} will be materialized in a local state store with an interna; store name (cf.
-     * {@link KStreamBuilder#table(String)})
-     *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name
-     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by
-     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))}
-     * and {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
-     */
-    @Deprecated
-    KTable<K, V> through(final Serde<K> keySerde,
-                         final Serde<V> valSerde,
-                         final String topic);
-
-    /**
-     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
-     * {@link StreamPartitioner} to determine the distribution of records to partitions.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     * <p>
-     * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
-     * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
-     * <p>
-     * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
-     *
-     * @param keySerde    key serde used to send key-value pairs,
-     *                    if not specified the default key serde defined in the configuration will be used
-     * @param valSerde    value serde used to send key-value pairs,
-     *                    if not specified the default value serde defined in the configuration will be used
-     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
-     *                    if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
-     *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} will
-     *                    be used
-     * @param topic      the topic name
-     * @param queryableStoreName  the state store name used for the result {@code KTable}.
-     *                            If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, StreamPartitioner, String)()}
-     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by
-     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
-     * to read back as a {@code KTable}
-     */
-    @Deprecated
-    KTable<K, V> through(final Serde<K> keySerde,
-                         final Serde<V> valSerde,
-                         final StreamPartitioner<? super K, ? super V> partitioner,
-                         final String topic,
-                         final String queryableStoreName);
-
-    /**
-     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
-     * {@link StreamPartitioner} to determine the distribution of records to partitions.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     * <p>
-     * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
-     * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
-     * <p>
-     * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
-     *
-     * @param keySerde    key serde used to send key-value pairs,
-     *                    if not specified the default key serde defined in the configuration will be used
-     * @param valSerde    value serde used to send key-value pairs,
-     *                    if not specified the default value serde defined in the configuration will be used
-     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
-     *                    if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
-     *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} will
-     *                    be used
-     * @param topic      the topic name
-     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
-     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by
-     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and
-     * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier))}
-     * to read back as a {@code KTable}
-     */
-    @Deprecated
-    KTable<K, V> through(final Serde<K> keySerde,
-                         final Serde<V> valSerde,
-                         final StreamPartitioner<? super K, ? super V> partitioner,
-                         final String topic,
-                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
-
-    /**
-     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
-     * {@link StreamPartitioner} to determine the distribution of records to partitions.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     * <p>
-     * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
-     * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
-     * <p>
-     * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
-     * {@link KStreamBuilder#table(String)})
-     *
-     * @param keySerde    key serde used to send key-value pairs,
-     *                    if not specified the default key serde defined in the configuration will be used
-     * @param valSerde    value serde used to send key-value pairs,
-     *                    if not specified the default value serde defined in the configuration will be used
-     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
-     *                    if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
-     *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} will
-     *                    be used
-     * @param topic      the topic name
-     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
-     * @deprecated use {@link #toStream()} followed by
-     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and
-     * {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
-     */
-    @Deprecated
-    KTable<K, V> through(final Serde<K> keySerde,
-                         final Serde<V> valSerde,
-                         final StreamPartitioner<? super K, ? super V> partitioner,
-                         final String topic);
-
-    /**
-     * Materialize this changelog stream to a topic using default serializers and deserializers and producer's
-     * {@link DefaultPartitioner}.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     *
-     * @param topic the topic name
-     * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)}
-     */
-    @Deprecated
-    void to(final String topic);
-
-    /**
-     * Materialize this changelog stream to a topic using default serializers and deserializers and a customizable
-     * {@link StreamPartitioner} to determine the distribution of records to partitions.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     *
-     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
-     *                    if not specified producer's {@link DefaultPartitioner} will be used
-     * @param topic       the topic name
-     * @deprecated use {@link #toStream()} followed by
-     * {@link KStream#to(String, Produced) to(topic, Produced.withStreamPartitioner(partitioner)}
-     */
-    @Deprecated
-    void to(final StreamPartitioner<? super K, ? super V> partitioner,
-            final String topic);
-
-    /**
-     * Materialize this changelog stream to a topic.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     * <p>
-     * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
-     * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
-     *
-     * @param keySerde key serde used to send key-value pairs,
-     *                 if not specified the default key serde defined in the configuration will be used
-     * @param valSerde value serde used to send key-value pairs,
-     *                 if not specified the default value serde defined in the configuration will be used
-     * @param topic    the topic name
-     * @deprecated use {@link #toStream()} followed by
-     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde)}
-     */
-    @Deprecated
-    void to(final Serde<K> keySerde,
-            final Serde<V> valSerde,
-            final String topic);
-
-    /**
-     * Materialize this changelog stream to a topic using a customizable {@link StreamPartitioner} to determine the
-     * distribution of records to partitions.
-     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
-     * started).
-     *
-     * @param keySerde    key serde used to send key-value pairs,
-     *                    if not specified the default key serde defined in the configuration will be used
-     * @param valSerde    value serde used to send key-value pairs,
-     *                    if not specified the default value serde defined in the configuration will be used
-     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
-     *                    if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
-     *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} will
-     *                    be used
-     * @param topic      the topic name
-     * @deprecated use {@link #toStream()} followed by
-     * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partioner)}
-     */
-    @Deprecated
-    void to(final Serde<K> keySerde,
-            final Serde<V> valSerde,
-            final StreamPartitioner<? super K, ? super V> partitioner,
-            final String topic);
-
-    /**
      * Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper} and default serializers
      * and deserializers.
      * Each {@link KeyValue} pair of this {@code KTable} is mapped to a new {@link KeyValue} pair by applying the
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 11b8c51..785f73a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,10 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.TopologyException;
-import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -32,14 +29,10 @@ import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.util.Objects;
 import java.util.Set;
 
@@ -59,8 +52,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private static final String FILTER_NAME = "KTABLE-FILTER-";
 
-    private static final String FOREACH_NAME = "KTABLE-FOREACH-";
-
     private static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
 
     private static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
@@ -69,16 +60,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private static final String MERGE_NAME = "KTABLE-MERGE-";
 
-    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
-
     private static final String SELECT_NAME = "KTABLE-SELECT-";
 
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
     private final ProcessorSupplier<?, ?> processorSupplier;
 
-    private final KeyValueMapper<K, V, String> defaultKeyValueMapper;
-
     private final String queryableStoreName;
     private final boolean isQueryable;
 
@@ -98,12 +85,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         this.keySerde = null;
         this.valSerde = null;
         this.isQueryable = isQueryable;
-        this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() {
-            @Override
-            public String apply(K key, V value) {
-                return String.format("%s, %s", key, value);
-            }
-        };
     }
 
     public KTableImpl(final InternalStreamsBuilder builder,
@@ -120,12 +101,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         this.keySerde = keySerde;
         this.valSerde = valSerde;
         this.isQueryable = isQueryable;
-        this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() {
-            @Override
-            public String apply(K key, V value) {
-                return String.format("%s, %s", key, value);
-            }
-        };
     }
 
     @Override
@@ -318,239 +293,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doMapValues(withKey(mapper), valueSerde, storeSupplier);
     }
 
-    @SuppressWarnings("deprecation")
-    @Override
-    public void print() {
-        print(null, null, this.name);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public void print(final String label) {
-        print(null, null, label);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public void print(final Serde<K> keySerde,
-                      final Serde<V> valSerde) {
-        print(keySerde, valSerde, this.name);
-    }
-
-    @SuppressWarnings({"unchecked", "deprecation"})
-    @Override
-    public void print(final Serde<K> keySerde,
-                      final Serde<V> valSerde,
-                      final String label) {
-        Objects.requireNonNull(label, "label can't be null");
-        final String name = builder.newProcessorName(PRINTING_NAME);
-        builder.internalTopologyBuilder.addProcessor(
-            name,
-            new KStreamPrint<>(new PrintForeachAction<>(System.out, defaultKeyValueMapper, label)),
-            this.name);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public void writeAsText(final String filePath) {
-        writeAsText(filePath, this.name, null, null);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public void writeAsText(final String filePath,
-                            final String label) {
-        writeAsText(filePath, label, null, null);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public void writeAsText(final String filePath,
-                            final Serde<K> keySerde,
-                            final Serde<V> valSerde) {
-        writeAsText(filePath, this.name, keySerde, valSerde);
-    }
-
-    /**
-     * @throws TopologyException if file is not found
-     */
-    @SuppressWarnings({"unchecked", "deprecation"})
-    @Override
-    public void writeAsText(final String filePath,
-                            final String label,
-                            final Serde<K> keySerde,
-                            final Serde<V> valSerde) {
-        Objects.requireNonNull(filePath, "filePath can't be null");
-        Objects.requireNonNull(label, "label can't be null");
-        if (filePath.trim().isEmpty()) {
-            throw new TopologyException("filePath can't be an empty string");
-        }
-        final String name = builder.newProcessorName(PRINTING_NAME);
-        try {
-            builder.internalTopologyBuilder.addProcessor(
-                name,
-                new KStreamPrint<>(new PrintForeachAction<>(new FileOutputStream(filePath), defaultKeyValueMapper, label)),
-                this.name);
-        } catch (final FileNotFoundException e) {
-            throw new TopologyException(String.format("Unable to write stream to file at [%s] %s", filePath, e.getMessage()));
-        }
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public void foreach(final ForeachAction<? super K, ? super V> action) {
-        Objects.requireNonNull(action, "action can't be null");
-        String name = builder.newProcessorName(FOREACH_NAME);
-        KStreamPeek<K, Change<V>> processorSupplier = new KStreamPeek<>(new ForeachAction<K, Change<V>>() {
-            @Override
-            public void apply(K key, Change<V> value) {
-                action.apply(key, value.newValue);
-            }
-        }, false);
-        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public KTable<K, V> through(final Serde<K> keySerde,
-                                final Serde<V> valSerde,
-                                final StreamPartitioner<? super K, ? super V> partitioner,
-                                final String topic,
-                                final String queryableStoreName) {
-        to(keySerde, valSerde, partitioner, topic);
-
-        return builder.table(topic,
-                             new ConsumedInternal<>(keySerde, valSerde, new FailOnInvalidTimestamp(), null),
-                             new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(keySerde, valSerde),
-                                     builder,
-                                     KTableImpl.TOSTREAM_NAME));
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public KTable<K, V> through(final Serde<K> keySerde,
-                                final Serde<V> valSerde,
-                                final StreamPartitioner<? super K, ? super V> partitioner,
-                                final String topic,
-                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
-        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        to(keySerde, valSerde, partitioner, topic);
-
-        final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(Consumed.with(keySerde, valSerde, new FailOnInvalidTimestamp(), null));
-        return builder.table(topic, consumed, storeSupplier);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public KTable<K, V> through(final Serde<K> keySerde,
-                                final Serde<V> valSerde,
-                                final StreamPartitioner<? super K, ? super V> partitioner,
-                                final String topic) {
-        return through(keySerde, valSerde, partitioner, topic, (String) null);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public KTable<K, V> through(final Serde<K> keySerde,
-                                final Serde<V> valSerde,
-                                final String topic,
-                                final String queryableStoreName) {
-        return through(keySerde, valSerde, null, topic, queryableStoreName);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public KTable<K, V> through(final Serde<K> keySerde,
-                                final Serde<V> valSerde,
-                                final String topic,
-                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
-        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        return through(keySerde, valSerde, null, topic, storeSupplier);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public KTable<K, V> through(final Serde<K> keySerde,
-                                final Serde<V> valSerde,
-                                final String topic) {
-        return through(keySerde, valSerde, null, topic, (String) null);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
-                                final String topic,
-                                final String queryableStoreName) {
-        return through(null, null, partitioner, topic, queryableStoreName);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
-                                final String topic,
-                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
-        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        return through(null, null, partitioner, topic, storeSupplier);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
-                                final String topic) {
-        return through(null, null, partitioner, topic, (String) null);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public KTable<K, V> through(final String topic,
-                                final String queryableStoreName) {
-        return through(null, null, null, topic, queryableStoreName);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public KTable<K, V> through(final String topic,
-                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
-        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        return through(null, null, null, topic, storeSupplier);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public KTable<K, V> through(final String topic) {
-        return through(null, null, null, topic, (String) null);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public void to(final String topic) {
-        to(null, null, null, topic);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public void to(final StreamPartitioner<? super K, ? super V> partitioner,
-                   final String topic) {
-        to(null, null, partitioner, topic);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public void to(final Serde<K> keySerde,
-                   final Serde<V> valSerde,
-                   final String topic) {
-        this.toStream().to(keySerde, valSerde, null, topic);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public void to(final Serde<K> keySerde,
-                   final Serde<V> valSerde,
-                   final StreamPartitioner<? super K, ? super V> partitioner,
-                   final String topic) {
-        this.toStream().to(keySerde, valSerde, partitioner, topic);
-    }
-
     @Override
     public KStream<K, V> toStream() {
         String name = builder.newProcessorName(TOSTREAM_NAME);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 44e139a..51bbb95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -126,9 +126,9 @@ public class KStreamAggregationDedupIntegrationTest {
     @Test
     public void shouldReduce() throws Exception {
         produceMessages(System.currentTimeMillis());
-        groupedStream
-            .reduce(reducer, "reduce-by-key")
-            .to(Serdes.String(), Serdes.String(), outputTopic);
+        groupedStream.reduce(reducer, "reduce-by-key")
+                .toStream()
+                .to(Serdes.String(), Serdes.String(), outputTopic);
 
         startStreams();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 52b9ee8..2efe9f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -168,6 +168,7 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(mockTime.milliseconds());
         groupedStream
             .reduce(reducer, "reduce-by-key")
+            .toStream()
             .to(Serdes.String(), Serdes.String(), outputTopic);
 
         startStreams();
@@ -294,6 +295,7 @@ public class KStreamAggregationIntegrationTest {
             aggregator,
             Serdes.Integer(),
             "aggregate-by-selected-key")
+            .toStream()
             .to(Serdes.String(), Serdes.Integer(), outputTopic);
 
         startStreams();
@@ -445,7 +447,8 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(mockTime.milliseconds());
 
         groupedStream.count("count-by-key")
-            .to(Serdes.String(), Serdes.Long(), outputTopic);
+                .toStream()
+                .to(Serdes.String(), Serdes.Long(), outputTopic);
 
         shouldCountHelper();
     }
@@ -456,7 +459,8 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(mockTime.milliseconds());
 
         groupedStream.count()
-            .to(Serdes.String(), Serdes.Long(), outputTopic);
+                .toStream()
+                .to(Serdes.String(), Serdes.Long(), outputTopic);
 
         shouldCountHelper();
     }
@@ -667,6 +671,7 @@ public class KStreamAggregationIntegrationTest {
                         return value1 + ":" + value2;
                     }
                 }, SessionWindows.with(sessionGap).until(maintainMillis), userSessionsStore)
+                .toStream()
                 .foreach(new ForeachAction<Windowed<String>, String>() {
                     @Override
                     public void apply(final Windowed<String> key, final String value) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 05d339f..b614732 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -124,7 +124,7 @@ public class KGroupedTableImplTest {
 
     private Map<String, Integer> getReducedResults(final KTable<String, Integer> inputKTable) {
         final Map<String, Integer> reducedResults = new HashMap<>();
-        inputKTable.foreach(new ForeachAction<String, Integer>() {
+        inputKTable.toStream().foreach(new ForeachAction<String, Integer>() {
             @Override
             public void apply(final String key, final Integer value) {
                 reducedResults.put(key, value);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index bde771b..2eecbc1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -506,6 +506,7 @@ public class KTableFilterTest {
             .<Integer, String>table("empty")
             .filter(numberKeyPredicate)
             .filterNot(numberKeyPredicate)
+            .toStream()
             .to("nirvana");
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
deleted file mode 100644
index a6b6c64..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.Consumed;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-@Deprecated
-public class KTableForeachTest {
-
-    final private String topicName = "topic";
-    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
-
-    @Test
-    public void testForeach() {
-        // Given
-        List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
-            new KeyValue<>(0, "zero"),
-            new KeyValue<>(1, "one"),
-            new KeyValue<>(2, "two"),
-            new KeyValue<>(3, "three")
-        );
-
-        List<KeyValue<Integer, String>> expectedRecords = Arrays.asList(
-            new KeyValue<>(0, "ZERO"),
-            new KeyValue<>(2, "ONE"),
-            new KeyValue<>(4, "TWO"),
-            new KeyValue<>(6, "THREE")
-        );
-
-        final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>();
-        ForeachAction<Integer, String> action =
-            new ForeachAction<Integer, String>() {
-                @Override
-                public void apply(Integer key, String value) {
-                    actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase(Locale.ROOT)));
-                }
-            };
-
-        // When
-        StreamsBuilder builder = new StreamsBuilder();
-        KTable<Integer, String> table = builder.table(topicName,
-                                                      Consumed.with(Serdes.Integer(), Serdes.String()),
-                                                      Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName)
-                                                              .withKeySerde(Serdes.Integer())
-                                                              .withValueSerde(Serdes.String()));
-        table.foreach(action);
-
-        // Then
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
-            for (KeyValue<Integer, String> record : inputRecords) {
-                driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
-            }
-        }
-
-        assertEquals(expectedRecords.size(), actualRecords.size());
-        for (int i = 0; i < expectedRecords.size(); i++) {
-            KeyValue<Integer, String> expectedRecord = expectedRecords.get(i);
-            KeyValue<Integer, String> actualRecord = actualRecords.get(i);
-            assertEquals(expectedRecord, actualRecord);
-        }
-    }
-
-    @Test
-    public void testTypeVariance() {
-        ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
-            @Override
-            public void apply(Number key, Object value) {}
-        };
-
-        new StreamsBuilder()
-            .<Integer, String>table("emptyTopic")
-            .foreach(consume);
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index ae1e285..fcdd0a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -22,10 +22,10 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
@@ -57,8 +57,10 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableImplTest {
 
-    final private Serde<String> stringSerde = Serdes.String();
+    private final Serde<String> stringSerde = Serdes.String();
     private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
+    private final Produced<String, String> produced = Produced.with(stringSerde, stringSerde);
+
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
@@ -78,14 +80,13 @@ public class KTableImplTest {
 
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String storeName2 = "storeName2";
 
-        KTable<String, String> table1 = builder.table(topic1, consumed);
+        final KTable<String, String> table1 = builder.table(topic1, consumed);
 
-        MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
         table1.toStream().process(supplier);
 
-        KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
+        final KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
             @Override
             public Integer apply(String value) {
                 return new Integer(value);
@@ -94,7 +95,7 @@ public class KTableImplTest {
 
         table2.toStream().process(supplier);
 
-        KTable<String, Integer> table3 = table2.filter(new Predicate<String, Integer>() {
+        final KTable<String, Integer> table3 = table2.filter(new Predicate<String, Integer>() {
             @Override
             public boolean test(String key, Integer value) {
                 return (value % 2) == 0;
@@ -103,7 +104,8 @@ public class KTableImplTest {
 
         table3.toStream().process(supplier);
 
-        KTable<String, String> table4 = table1.through(stringSerde, stringSerde, topic2, storeName2);
+        table1.toStream().to(topic2, produced);
+        final KTable<String, String> table4 = builder.table(topic2, consumed);
 
         table4.toStream().process(supplier);
 
@@ -130,46 +132,46 @@ public class KTableImplTest {
     public void testValueGetter() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        String topic1 = "topic1";
-        String topic2 = "topic2";
-        String storeName2 = "storeName2";
+        final String topic1 = "topic1";
+        final String topic2 = "topic2";
 
-        KTableImpl<String, String, String> table1 =
+        final KTableImpl<String, String, String> table1 =
                 (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
                     public Integer apply(String value) {
                         return new Integer(value);
                     }
                 });
-        KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+        final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
                 new Predicate<String, Integer>() {
                     @Override
                     public boolean test(String key, Integer value) {
                         return (value % 2) == 0;
                     }
                 });
-        KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
-                table1.through(stringSerde, stringSerde, topic2, storeName2);
 
-        KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
-        KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
-        KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
-        KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+        table1.toStream().to(topic2, produced);
+        final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed);
+
+        final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+        final KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+        final KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+        final KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
 
         driver.setUp(builder, stateDir, null, null);
 
         // two state store should be created
         assertEquals(2, driver.allStateStores().size());
 
-        KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+        final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
         getter1.init(driver.context());
-        KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+        final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
         getter2.init(driver.context());
-        KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+        final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
         getter3.init(driver.context());
-        KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+        final KTableValueGetter<String, String> getter4 = getterSupplier4.get();
         getter4.init(driver.context());
 
         driver.process(topic1, "A", "01");
@@ -255,16 +257,16 @@ public class KTableImplTest {
 
     @Test
     public void testStateStoreLazyEval() {
-        String topic1 = "topic1";
-        String topic2 = "topic2";
+        final String topic1 = "topic1";
+        final String topic2 = "topic2";
 
         final StreamsBuilder builder = new StreamsBuilder();
 
-        KTableImpl<String, String, String> table1 =
+        final KTableImpl<String, String, String> table1 =
                 (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         builder.table(topic2, consumed);
 
-        KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
+        final KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
                     public Integer apply(String value) {
@@ -288,24 +290,24 @@ public class KTableImplTest {
 
     @Test
     public void testStateStore() {
-        String topic1 = "topic1";
-        String topic2 = "topic2";
+        final String topic1 = "topic1";
+        final String topic2 = "topic2";
 
         final StreamsBuilder builder = new StreamsBuilder();
 
-        KTableImpl<String, String, String> table1 =
+        final KTableImpl<String, String, String> table1 =
                 (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        KTableImpl<String, String, String> table2 =
+        final KTableImpl<String, String, String> table2 =
                 (KTableImpl<String, String, String>) builder.table(topic2, consumed);
 
-        KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
+        final KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
                     public Integer apply(String value) {
                         return new Integer(value);
                     }
                 });
-        KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
+        final KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
                 new Predicate<String, Integer>() {
                     @Override
                     public boolean test(String key, Integer value) {
@@ -329,12 +331,12 @@ public class KTableImplTest {
 
     @Test
     public void testRepartition() throws NoSuchFieldException, IllegalAccessException {
-        String topic1 = "topic1";
-        String storeName1 = "storeName1";
+        final String topic1 = "topic1";
+        final String storeName1 = "storeName1";
 
         final StreamsBuilder builder = new StreamsBuilder();
 
-        KTableImpl<String, String, String> table1 =
+        final KTableImpl<String, String, String> table1 =
                 (KTableImpl<String, String, String>) builder.table(topic1,
                                                                    consumed,
                                                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName1)
@@ -378,11 +380,6 @@ public class KTableImplTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTopicOnTo() {
-        table.to(null);
-    }
-
-    @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullPredicateOnFilter() {
         table.filter(null);
     }
@@ -402,34 +399,6 @@ public class KTableImplTest {
         table.mapValues((ValueMapperWithKey) null);
     }
 
-    @SuppressWarnings("deprecation")
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullFilePathOnWriteAsText() {
-        table.writeAsText(null);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Test(expected = TopologyException.class)
-    public void shouldNotAllowEmptyFilePathOnWriteAsText() {
-        table.writeAsText("\t  \t");
-    }
-
-    @SuppressWarnings("deprecation")
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullActionOnForEach() {
-        table.foreach(null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldAllowNullTopicInThrough() {
-        table.through((String) null, "store");
-    }
-
-    @Test
-    public void shouldAllowNullStoreInThrough() {
-        table.through("topic", (String) null);
-    }
-
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullSelectorOnGroupBy() {
         table.groupBy(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 3cd7701..c54efd8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -44,8 +45,9 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableMapValuesTest {
 
-    final private Serde<String> stringSerde = Serdes.String();
+    private final Serde<String> stringSerde = Serdes.String();
     private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
+    private final Produced<String, String> produced = Produced.with(stringSerde, stringSerde);
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
@@ -70,17 +72,17 @@ public class KTableMapValuesTest {
     public void testKTable() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        String topic1 = "topic1";
+        final String topic1 = "topic1";
 
-        KTable<String, String> table1 = builder.table(topic1, consumed);
-        KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
+        final KTable<String, String> table1 = builder.table(topic1, consumed);
+        final KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
             @Override
             public Integer apply(CharSequence value) {
                 return value.charAt(0) - 48;
             }
         });
 
-        MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
         table2.toStream().process(supplier);
 
         doTestKTable(builder, topic1, supplier);
@@ -90,17 +92,17 @@ public class KTableMapValuesTest {
     public void testQueryableKTable() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        String topic1 = "topic1";
+        final String topic1 = "topic1";
 
-        KTable<String, String> table1 = builder.table(topic1, consumed);
-        KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
+        final KTable<String, String> table1 = builder.table(topic1, consumed);
+        final KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
             @Override
             public Integer apply(CharSequence value) {
                 return value.charAt(0) - 48;
             }
         }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyName").withValueSerde(Serdes.Integer()));
 
-        MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
         table2.toStream().process(supplier);
 
         doTestKTable(builder, topic1, supplier);
@@ -112,19 +114,19 @@ public class KTableMapValuesTest {
                                    final KTableImpl<String, String, Integer> table2,
                                    final KTableImpl<String, Integer, Integer> table3,
                                    final KTableImpl<String, String, String> table4) {
-        KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
-        KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
-        KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
-        KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+        final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+        final KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+        final KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+        final KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
 
         driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
-        KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+        final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
         getter1.init(driver.context());
-        KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+        final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
         getter2.init(driver.context());
-        KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+        final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
         getter3.init(driver.context());
-        KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+        final KTableValueGetter<String, String> getter4 = getterSupplier4.get();
         getter4.init(driver.context());
 
         driver.process(topic1, "A", "01");
@@ -209,73 +211,71 @@ public class KTableMapValuesTest {
 
     @Test
     public void testValueGetter() {
-        StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
-        String topic1 = "topic1";
-        String topic2 = "topic2";
-        String storeName2 = "storeName2";
+        final String topic1 = "topic1";
+        final String topic2 = "topic2";
 
-        KTableImpl<String, String, String> table1 =
+        final KTableImpl<String, String, String> table1 =
                 (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
                     public Integer apply(String value) {
                         return new Integer(value);
                     }
                 });
-        KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+        final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
                 new Predicate<String, Integer>() {
                     @Override
                     public boolean test(String key, Integer value) {
                         return (value % 2) == 0;
                     }
                 });
-        KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
-                table1.through(stringSerde, stringSerde, topic2, storeName2);
+        table1.toStream().to(topic2, produced);
+        final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed);
 
         doTestValueGetter(builder, topic1, table1, table2, table3, table4);
     }
 
     @Test
     public void testQueryableValueGetter() {
-        StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
-        String topic1 = "topic1";
-        String topic2 = "topic2";
-        String storeName2 = "storeName2";
+        final String topic1 = "topic1";
+        final String topic2 = "topic2";
 
-        KTableImpl<String, String, String> table1 =
+        final KTableImpl<String, String, String> table1 =
             (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
             new ValueMapper<String, Integer>() {
                 @Override
                 public Integer apply(String value) {
                     return new Integer(value);
                 }
             }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyMapName").withValueSerde(Serdes.Integer()));
-        KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+        final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
             new Predicate<String, Integer>() {
                 @Override
                 public boolean test(String key, Integer value) {
                     return (value % 2) == 0;
                 }
             }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyFilterName").withValueSerde(Serdes.Integer()));
-        KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
-            table1.through(stringSerde, stringSerde, topic2, storeName2);
+        table1.toStream().to(topic2, produced);
+        final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed);
 
         doTestValueGetter(builder, topic1, table1, table2, table3, table4);
     }
 
     @Test
     public void testNotSendingOldValue() {
-        StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
-        String topic1 = "topic1";
+        final String topic1 = "topic1";
 
-        KTableImpl<String, String, String> table1 =
+        final KTableImpl<String, String, String> table1 =
                 (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
                     public Integer apply(String value) {
@@ -320,13 +320,13 @@ public class KTableMapValuesTest {
 
     @Test
     public void testSendingOldValue() {
-        StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
 
-        String topic1 = "topic1";
+        final String topic1 = "topic1";
 
-        KTableImpl<String, String, String> table1 =
+        final KTableImpl<String, String, String> table1 =
                 (KTableImpl<String, String, String>) builder.table(topic1, consumed);
-        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+        final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
                     public Integer apply(String value) {
@@ -336,7 +336,7 @@ public class KTableMapValuesTest {
 
         table2.enableSendingOldValues();
 
-        MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
         builder.build().addProcessor("proc", supplier, table2.name);
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.