You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/11 18:38:08 UTC

[kafka] branch trunk updated: KAFKA-6813: Remove deprecated APIs in KIP-182, Part III (#4991)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new caca1fd  KAFKA-6813: Remove deprecated APIs in KIP-182, Part III (#4991)
caca1fd is described below

commit caca1fdc90b6ef877385b044b2a3230f8e6e9874
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Fri May 11 11:38:02 2018 -0700

    KAFKA-6813: Remove deprecated APIs in KIP-182, Part III (#4991)
    
    1. Remove TopologyBuilder, TopologyBuilderException, KStreamBuilder,
    
    2. Completed the leftover work of https://issues.apache.org/jira/browse/KAFKA-5660, when we remove TopologyBuilderException.
    
    3. Added MockStoreBuilder to replace MockStateStoreSupplier, remove all XXStoreSupplier except StateStoreSupplier as it is still referenced in the logical streams graph.
    
    4. Minor: rename KStreamsFineGrainedAutoResetIntegrationTest.java to FineGrainedAutoResetIntegrationTest.java.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   28 -
 .../streams/errors/TopologyBuilderException.java   |   42 -
 .../kafka/streams/kstream/KStreamBuilder.java      | 1269 --------------------
 .../kstream/internals/InternalStreamsBuilder.java  |   21 -
 .../kafka/streams/processor/ProcessorSupplier.java |    4 +-
 .../kafka/streams/processor/StreamPartitioner.java |    7 +-
 .../kafka/streams/processor/TopologyBuilder.java   |  958 ---------------
 .../internals/InternalTopologyBuilder.java         |  132 +-
 .../internals/StreamsPartitionAssignor.java        |   37 +-
 .../state/internals/AbstractStoreBuilder.java      |   12 +-
 .../state/internals/AbstractStoreSupplier.java     |   59 -
 .../internals/RocksDBKeyValueStoreSupplier.java    |   60 -
 .../kafka/streams/TopologyTestDriverWrapper.java   |   37 -
 .../org/apache/kafka/streams/TopologyWrapper.java  |    4 +
 ...va => FineGrainedAutoResetIntegrationTest.java} |   16 +-
 .../integration/RegexSourceIntegrationTest.java    |   40 +-
 .../kafka/streams/kstream/KStreamBuilderTest.java  |  457 -------
 .../streams/kstream/internals/KStreamImplTest.java |   22 +-
 .../streams/processor/TopologyBuilderTest.java     |  752 ------------
 .../CopartitionedTopicsValidatorTest.java          |    6 +-
 .../internals/InternalTopologyBuilderTest.java     |   66 +-
 .../processor/internals/ProcessorTopologyTest.java |  107 +-
 .../processor/internals/StandbyTaskTest.java       |    7 +-
 .../internals/StreamsPartitionAssignorTest.java    |   13 +-
 .../CompositeReadOnlyKeyValueStoreTest.java        |    3 +-
 .../org/apache/kafka/test/KStreamTestDriver.java   |   44 -
 .../apache/kafka/test/MockStateStoreSupplier.java  |   63 -
 .../org/apache/kafka/test/MockStoreBuilder.java}   |   28 +-
 28 files changed, 164 insertions(+), 4130 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d656181..f36bbac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -597,34 +597,6 @@ public class KafkaStreams {
      * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
      */
     @Deprecated
-    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
-                        final Properties props) {
-        this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
-    }
-
-    /**
-     * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
-     */
-    @Deprecated
-    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
-                        final StreamsConfig config) {
-        this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
-    }
-
-    /**
-     * @deprecated use {@link #KafkaStreams(Topology, Properties, KafkaClientSupplier)} instead
-     */
-    @Deprecated
-    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
-                        final StreamsConfig config,
-                        final KafkaClientSupplier clientSupplier) {
-        this(builder.internalTopologyBuilder, config, clientSupplier);
-    }
-
-    /**
-     * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
-     */
-    @Deprecated
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config) {
         this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
deleted file mode 100644
index 385d401..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.errors;
-
-
-/**
- * Indicates a pre-run time error occurred while parsing the {@link org.apache.kafka.streams.processor.TopologyBuilder
- * builder} to construct the {@link org.apache.kafka.streams.processor.internals.ProcessorTopology processor topology}.
- *
- * @deprecated use {@link org.apache.kafka.streams.Topology} instead of {@link org.apache.kafka.streams.processor.TopologyBuilder}
- */
-@Deprecated
-public class TopologyBuilderException extends StreamsException {
-
-    private static final long serialVersionUID = 1L;
-
-    public TopologyBuilderException(final String message) {
-        super("Invalid topology building" + (message == null ? "" : ": " + message));
-    }
-
-    public TopologyBuilderException(final String message, final Throwable throwable) {
-        super("Invalid topology building" + (message == null ? "" : ": " + message), throwable);
-    }
-
-    public TopologyBuilderException(final Throwable throwable) {
-        super(throwable);
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
deleted file mode 100644
index d747ce8..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ /dev/null
@@ -1,1269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl;
-import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
-import org.apache.kafka.streams.kstream.internals.KStreamImpl;
-import org.apache.kafka.streams.kstream.internals.KTableImpl;
-import org.apache.kafka.streams.kstream.internals.KTableSource;
-import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.QueryableStoreType;
-import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
-
-import java.util.Collections;
-import java.util.Objects;
-import java.util.regex.Pattern;
-
-/**
- * {@code KStreamBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.
- *
- * @see org.apache.kafka.streams.processor.TopologyBuilder
- * @see KStream
- * @see KTable
- * @see GlobalKTable
- * @deprecated Use {@link org.apache.kafka.streams.StreamsBuilder StreamsBuilder} instead
- */
-@Deprecated
-public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyBuilder {
-
-    private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(super.internalTopologyBuilder);
-
-    private Topology.AutoOffsetReset translateAutoOffsetReset(final org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset resetPolicy) {
-        if (resetPolicy == null) {
-            return null;
-        }
-        return resetPolicy == org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topics.
-     * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
-     * deserializers as specified in the {@link StreamsConfig config} are used.
-     * <p>
-     * If multiple topics are specified there is no ordering guarantee for records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param topics the topic names; must contain at least one topic name
-     * @return a {@link KStream} for the specified topics
-     */
-    public <K, V> KStream<K, V> stream(final String... topics) {
-        return stream(null, null, null, null, topics);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topics.
-     * The default {@link TimestampExtractor} and default key and value deserializers as specified in the
-     * {@link StreamsConfig config} are used.
-     * <p>
-     * If multiple topics are specified there is no ordering guarantee for records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed
-     *                    offsets are available
-     * @param topics      the topic names; must contain at least one topic name
-     * @return a {@link KStream} for the specified topics
-     */
-    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
-                                       final String... topics) {
-        return stream(offsetReset, null, null, null, topics);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topic pattern.
-     * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
-     * deserializers as specified in the {@link StreamsConfig config} are used.
-     * <p>
-     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
-     * them and there is no ordering guarantee between records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param topicPattern the pattern to match for topic names
-     * @return a {@link KStream} for topics matching the regex pattern.
-     */
-    public <K, V> KStream<K, V> stream(final Pattern topicPattern) {
-        return stream(null, null,  null, null, topicPattern);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topic pattern.
-     * The default {@link TimestampExtractor} and default key and value deserializers as specified in the
-     * {@link StreamsConfig config} are used.
-     * <p>
-     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
-     * them and there is no ordering guarantee between records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param offsetReset  the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed
-     *                     offsets are available
-     * @param topicPattern the pattern to match for topic names
-     * @return a {@link KStream} for topics matching the regex pattern.
-     */
-    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern) {
-        return stream(offsetReset, null, null, null, topicPattern);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topics.
-     * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor} as specified in the
-     * {@link StreamsConfig config} are used.
-     * <p>
-     * If multiple topics are specified there is no ordering guarantee for records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param keySerde key serde used to read this source {@link KStream},
-     *                 if not specified the default serde defined in the configs will be used
-     * @param valSerde value serde used to read this source {@link KStream},
-     *                 if not specified the default serde defined in the configs will be used
-     * @param topics   the topic names; must contain at least one topic name
-     * @return a {@link KStream} for the specified topics
-     */
-    public <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final String... topics) {
-        return stream(null, null, keySerde, valSerde, topics);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topics.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     * <p>
-     * If multiple topics are specified there is no ordering guarantee for records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed
-     *                    offsets are available
-     * @param keySerde    key serde used to read this source {@link KStream},
-     *                    if not specified the default serde defined in the configs will be used
-     * @param valSerde    value serde used to read this source {@link KStream},
-     *                    if not specified the default serde defined in the configs will be used
-     * @param topics      the topic names; must contain at least one topic name
-     * @return a {@link KStream} for the specified topics
-     */
-    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
-                                       final Serde<K> keySerde,
-                                       final Serde<V> valSerde,
-                                       final String... topics) {
-        return stream(offsetReset, null, keySerde, valSerde, topics);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topics.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * <p>
-     * If multiple topics are specified there is no ordering guarantee for records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to read this source {@link KStream}, if not specified the default
-     *                           serde defined in the configs will be used
-     * @param valSerde           value serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param topics             the topic names; must contain at least one topic name
-     * @return a {@link KStream} for the specified topics
-     */
-    public <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
-                                       final Serde<K> keySerde,
-                                       final Serde<V> valSerde,
-                                       final String... topics) {
-        return stream(null, timestampExtractor, keySerde, valSerde, topics);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topics.
-     * <p>
-     * If multiple topics are specified there is no ordering guarantee for records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topics
-     *                           if no valid committed offsets are available
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param valSerde           value serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param topics             the topic names; must contain at least one topic name
-     * @return a {@link KStream} for the specified topics
-     */
-    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
-                                       final TimestampExtractor timestampExtractor,
-                                       final Serde<K> keySerde,
-                                       final Serde<V> valSerde,
-                                       final String... topics) {
-        try {
-            final String name = newName(KStreamImpl.SOURCE_NAME);
-
-            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor,
-                keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
-
-            return new KStreamImpl<>(internalStreamsBuilder, name, Collections.singleton(name), false);
-        } catch (final org.apache.kafka.streams.errors.TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topic pattern.
-     * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
-     * as specified in the {@link StreamsConfig config} are used.
-     * <p>
-     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
-     * them and there is no ordering guarantee between records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param keySerde     key serde used to read this source {@link KStream},
-     *                     if not specified the default serde defined in the configs will be used
-     * @param valSerde     value serde used to read this source {@link KStream},
-     *                     if not specified the default serde defined in the configs will be used
-     * @param topicPattern the pattern to match for topic names
-     * @return a {@link KStream} for topics matching the regex pattern.
-     */
-    public <K, V> KStream<K, V> stream(final Serde<K> keySerde,
-                                       final Serde<V> valSerde,
-                                       final Pattern topicPattern) {
-        return stream(null, null, keySerde, valSerde, topicPattern);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topic pattern.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     * <p>
-     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
-     * them and there is no ordering guarantee between records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param offsetReset  the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed
-     *                     offsets are available
-     * @param keySerde     key serde used to read this source {@link KStream},
-     *                     if not specified the default serde defined in the configs will be used
-     * @param valSerde     value serde used to read this source {@link KStream},
-     *                     if not specified the default serde defined in the configs will be used
-     * @param topicPattern the pattern to match for topic names
-     * @return a {@link KStream} for topics matching the regex pattern.
-     */
-    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
-                                       final Serde<K> keySerde,
-                                       final Serde<V> valSerde,
-                                       final Pattern topicPattern) {
-        return stream(offsetReset, null, keySerde, valSerde, topicPattern);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topic pattern.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * <p>
-     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
-     * them and there is no ordering guarantee between records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param valSerde           value serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param topicPattern       the pattern to match for topic names
-     * @return a {@link KStream} for topics matching the regex pattern.
-     */
-    public <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
-                                       final Serde<K> keySerde,
-                                       final Serde<V> valSerde,
-                                       final Pattern topicPattern) {
-        return stream(null, timestampExtractor, keySerde, valSerde, topicPattern);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topic pattern.
-     * <p>
-     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
-     * them and there is no ordering guarantee between records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the matched topics if no valid
-     *                           committed  offsets are available
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param valSerde           value serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param topicPattern       the pattern to match for topic names
-     * @return a {@link KStream} for topics matching the regex pattern.
-     */
-    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
-                                       final TimestampExtractor timestampExtractor,
-                                       final Serde<K> keySerde,
-                                       final Serde<V> valSerde,
-                                       final Pattern topicPattern) {
-        try {
-            final String name = newName(KStreamImpl.SOURCE_NAME);
-
-            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor,
-                keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
-
-            return new KStreamImpl<>(internalStreamsBuilder, name, Collections.singleton(name), false);
-        } catch (final org.apache.kafka.streams.errors.TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and
-     * default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(String)} ()}.
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final String topic,
-                                     final String queryableStoreName) {
-        return table(null, null,  null, null, topic, queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
-     * {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param topic         the topic name; cannot be {@code null}
-     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final String topic,
-                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return table(null, null, null, null, topic, storeSupplier);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
-     * {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that store name may not be queriable through Interactive Queries.
-     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * @param topic     the topic name; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final String topic) {
-        return table(null, null, null, null, topic, (String) null);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                           offsets are available
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
-     * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, String) table(AutoOffsetReset, String)}.
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
-                                     final String topic,
-                                     final String queryableStoreName) {
-        return table(offsetReset, null, null, null, topic, queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@link TimestampExtractor} and default key and value deserializers
-     * as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                    offsets are available
-     * @param topic       the topic name; cannot be {@code null}
-     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
-                                     final String topic,
-                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return table(offsetReset, null, null, null, topic, storeSupplier);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that store name may not be queriable through Interactive Queries.
-     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                    offsets are available
-     * @param topic       the topic name; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
-                                     final String topic) {
-        return table(offsetReset, null, null, null, topic, (String) null);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers
-     * as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param storeName          the state store name; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
-                                     final String topic,
-                                     final String storeName) {
-        return table(null, timestampExtractor, null, null, topic, storeName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                    offsets are available
-     * @param topic       the topic name; cannot be {@code null}
-     * @param storeName   the state store name; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
-                                     final TimestampExtractor timestampExtractor,
-                                     final String topic,
-                                     final String storeName) {
-        return table(offsetReset, timestampExtractor, null, null, topic, storeName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
-     * as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valSerde           value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(Serde, Serde, String)} ()}.
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final Serde<K> keySerde,
-                                     final Serde<V> valSerde,
-                                     final String topic,
-                                     final String queryableStoreName) {
-        return table(null, null, keySerde, valSerde, topic, queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param keySerde      key serde used to send key-value pairs,
-     *                      if not specified the default key serde defined in the configuration will be used
-     * @param valSerde      value serde used to send key-value pairs,
-     *                      if not specified the default value serde defined in the configuration will be used
-     * @param topic         the topic name; cannot be {@code null}
-     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final Serde<K> keySerde,
-                                     final Serde<V> valSerde,
-                                     final String topic,
-                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return table(null, null, keySerde, valSerde, topic, storeSupplier);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that store name may not be queriable through Interactive Queries.
-     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final Serde<K> keySerde,
-                                     final Serde<V> valSerde,
-                                     final String topic) {
-        return table(null, null, keySerde, valSerde, topic, (String) null);
-    }
-
-    private <K, V> KTable<K, V> doTable(final AutoOffsetReset offsetReset,
-                                        final Serde<K> keySerde,
-                                        final Serde<V> valSerde,
-                                        final TimestampExtractor timestampExtractor,
-                                        final String topic,
-                                        final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
-                                        final boolean isQueryable) {
-        try {
-            final String source = newName(KStreamImpl.SOURCE_NAME);
-            final String name = newName(KTableImpl.SOURCE_NAME);
-            final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
-
-            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), source, timestampExtractor,
-                keySerde == null ? null : keySerde.deserializer(),
-                valSerde == null ? null : valSerde.deserializer(),
-                topic);
-            internalTopologyBuilder.addProcessor(name, processorSupplier, source);
-
-            final KTableImpl<K, ?, V> kTable = new KTableImpl<>(internalStreamsBuilder, name, processorSupplier,
-                    keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
-
-            addStateStore(storeSupplier, name);
-            connectSourceStoreAndTopic(storeSupplier.name(), topic);
-
-            return kTable;
-        } catch (final org.apache.kafka.streams.errors.TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                           offsets are available
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valSerde           value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
-     * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
-                                     final Serde<K> keySerde,
-                                     final Serde<V> valSerde,
-                                     final String topic,
-                                     final String queryableStoreName) {
-        return table(offsetReset, null, keySerde, valSerde, topic, queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valSerde           value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param storeName          the state store name; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
-                                     final Serde<K> keySerde,
-                                     final Serde<V> valSerde,
-                                     final String topic,
-                                     final String storeName) {
-        return table(null, timestampExtractor, keySerde, valSerde, topic, storeName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
-     *                           committed offsets are available
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valSerde           value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
-     * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
-     * @return a {@link KTable} for the specified topic
-     */
-    @SuppressWarnings("unchecked")
-    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
-                                     final TimestampExtractor timestampExtractor,
-                                     final Serde<K> keySerde,
-                                     final Serde<V> valSerde,
-                                     final String topic,
-                                     final String queryableStoreName) {
-        final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
-        final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(
-            internalStoreName,
-            keySerde,
-            valSerde,
-            false,
-            Collections.<String, String>emptyMap(),
-            true);
-        return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that store name may not be queriable through Interactive Queries.
-     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                    offsets are available
-     * @param keySerde    key serde used to send key-value pairs,
-     *                    if not specified the default key serde defined in the configuration will be used
-     * @param valSerde    value serde used to send key-value pairs,
-     *                    if not specified the default value serde defined in the configuration will be used
-     * @param topic       the topic name; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
-                                     final Serde<K> keySerde,
-                                     final Serde<V> valSerde,
-                                     final String topic) {
-        return table(offsetReset, null, keySerde, valSerde, topic, (String) null);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                    offsets are available
-     * @param keySerde    key serde used to send key-value pairs,
-     *                    if not specified the default key serde defined in the configuration will be used
-     * @param valSerde    value serde used to send key-value pairs,
-     *                    if not specified the default value serde defined in the configuration will be used
-     * @param topic       the topic name; cannot be {@code null}
-     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
-     * @return a {@link KTable} for the specified topic
-     */
-    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
-                                     final TimestampExtractor timestampExtractor,
-                                     final Serde<K> keySerde,
-                                     final Serde<V> valSerde,
-                                     final String topic,
-                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
-        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, true);
-    }
-
-    /**
-     * Create a {@link GlobalKTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key);
-     * }</pre>
-     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
-     *
-     * @param topic     the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(String)}
-     * @return a {@link GlobalKTable} for the specified topic
-     */
-    public <K, V> GlobalKTable<K, V> globalTable(final String topic,
-                                                 final String queryableStoreName) {
-        return globalTable(null, null, null,  topic, queryableStoreName);
-    }
-
-    /**
-     * Create a {@link GlobalKTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that store name may not be queriable through Interactive Queries.
-     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
-     *
-     * @param topic     the topic name; cannot be {@code null}
-     * @return a {@link GlobalKTable} for the specified topic
-     */
-    public <K, V> GlobalKTable<K, V> globalTable(final String topic) {
-        return globalTable(null, null, null, topic, null);
-    }
-
-    /**
-     * Create a {@link GlobalKTable} for the specified topic.
-     * The default {@link TimestampExtractor} and default key and value deserializers as specified in
-     * the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key);
-     * }</pre>
-     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
-     *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
-     * @return a {@link GlobalKTable} for the specified topic
-     */
-    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                 final Serde<V> valSerde,
-                                                 final TimestampExtractor timestampExtractor,
-                                                 final String topic,
-                                                 final String queryableStoreName) {
-        final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
-        return doGlobalTable(keySerde, valSerde, timestampExtractor, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
-                            keySerde,
-                            valSerde,
-                    false,
-                            Collections.<String, String>emptyMap(),
-                    true));
-    }
-
-    /**
-     * Create a {@link GlobalKTable} for the specified topic.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key);
-     * }</pre>
-     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
-     *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name; cannot be {@code null}
-     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
-     * @return a {@link GlobalKTable} for the specified topic
-     */
-    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                 final Serde<V> valSerde,
-                                                 final String topic,
-                                                 final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
-    }
-
-    /**
-     * Create a {@link GlobalKTable} for the specified topic.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key);
-     * }</pre>
-     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
-     *
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valSerde           value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
-     *                           {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
-     * @return a {@link GlobalKTable} for the specified topic
-     */
-    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                 final Serde<V> valSerde,
-                                                 final String topic,
-                                                 final String queryableStoreName) {
-        return globalTable(keySerde, valSerde, null, topic, queryableStoreName);
-    }
-
-    /**
-     * Create a {@link GlobalKTable} for the specified topic.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key);
-     * }</pre>
-     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link GlobalKTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valSerde           value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param storeSupplier      user defined state store supplier. Cannot be {@code null}.
-     * @return a {@link GlobalKTable} for the specified topic
-     */
-    @SuppressWarnings("unchecked")
-    private <K, V> GlobalKTable<K, V> doGlobalTable(final Serde<K> keySerde,
-                                                    final Serde<V> valSerde,
-                                                    final TimestampExtractor timestampExtractor,
-                                                    final String topic,
-                                                    final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
-        try {
-            Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-            final String sourceName = newName(KStreamImpl.SOURCE_NAME);
-            final String processorName = newName(KTableImpl.SOURCE_NAME);
-            final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
-
-            final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
-            final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
-
-            internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
-            return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
-        } catch (final org.apache.kafka.streams.errors.TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-    }
-
-    /**
-     * Create a {@link GlobalKTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that store name may not be queriable through Interactive Queries.
-     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
-     *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name; cannot be {@code null}
-     * @return a {@link GlobalKTable} for the specified topic
-     */
-    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                 final Serde<V> valSerde,
-                                                 final String topic) {
-
-        return globalTable(keySerde, valSerde, null, topic, (String) null);
-    }
-
-    /**
-     * Create a new instance of {@link KStream} by merging the given {@link KStream}s.
-     * <p>
-     * There is no ordering guarantee for records from different {@link KStream}s.
-     *
-     * @param streams the {@link KStream}s to be merged
-     * @return a {@link KStream} containing all records of the given streams
-     */
-    @SuppressWarnings("unchecked")
-    public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
-        Objects.requireNonNull(streams, "streams can't be null");
-        if (streams.length <= 1) {
-            throw new IllegalArgumentException("Number of arguments required needs to be greater than one.");
-        }
-        try {
-            KStream<K, V> mergedStream = streams[0];
-            for (int i = 1; i < streams.length; i++) {
-                mergedStream = mergedStream.merge(streams[i]);
-            }
-            return mergedStream;
-        } catch (final org.apache.kafka.streams.errors.TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-    }
-
-    /**
-     * <strong>This function is only for internal usage only and should not be called.</strong>
-     * <p>
-     * Create a unique processor name used for translation into the processor topology.
-     *
-     * @param prefix processor name prefix
-     * @return a new unique name
-     */
-    public String newName(final String prefix) {
-        return internalStreamsBuilder.newProcessorName(prefix);
-    }
-
-    /**
-     * <strong>This function is only for internal usage only and should not be called.</strong>
-     * <p>
-     * Create a unique state store name.
-     *
-     * @param prefix processor name prefix
-     * @return a new unique name
-     */
-    public String newStoreName(final String prefix) {
-        return internalStreamsBuilder.newStoreName(prefix);
-    }
-
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index fa47444..480794c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -68,27 +68,6 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
 
-    @SuppressWarnings("deprecation")
-    public <K, V> KTable<K, V> table(final String topic,
-                                     final ConsumedInternal<K, V> consumed,
-                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
-        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
-        final String name = newProcessorName(KTableImpl.SOURCE_NAME);
-
-        final KTable<K, V> kTable = createKTable(consumed,
-                                                 topic,
-                                                 storeSupplier.name(),
-                                                 true,
-                                                 source,
-                                                 name);
-
-        internalTopologyBuilder.addStateStore(storeSupplier, name);
-        internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic);
-
-        return kTable;
-    }
-
     @SuppressWarnings("unchecked")
     public <K, V> KTable<K, V> table(final String topic,
                                      final ConsumedInternal<K, V> consumed,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
index 25bbaab..a253638 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
@@ -16,10 +16,12 @@
  */
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.streams.Topology;
+
 /**
  * A processor supplier that can create one or more {@link Processor} instances.
  *
- * It is used in {@link TopologyBuilder} for adding new processor operators, whose generated
+ * It is used in {@link Topology} for adding new processor operators, whose generated
  * topology can then be replicated (and thus creating one or more {@link Processor} instances)
  * and distributed to multiple stream threads.
  *
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
index 1e622e5..1fa5e3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.streams.Topology;
 
 /**
  * Determine how records are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
@@ -36,16 +37,16 @@ import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
  * determine to which partition each record should be written.
  * <p>
  * To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner
- * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
+ * when {@link Topology#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
  * for that topic.
  * <p>
  * All StreamPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
  * 
  * @param <K> the type of keys
  * @param <V> the type of values
- * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
+ * @see Topology#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
  *      org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...)
- * @see TopologyBuilder#addSink(String, String, StreamPartitioner, String...)
+ * @see Topology#addSink(String, String, StreamPartitioner, String...)
  */
 public interface StreamPartitioner<K, V> {
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
deleted file mode 100644
index dab7bd7..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ /dev/null
@@ -1,958 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.errors.TopologyException;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.SinkNode;
-import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.SubscriptionUpdates;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-/**
- * A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
- * and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to
- * its child nodes. A {@link Processor processor} is a node in the graph that receives input records from upstream nodes,
- * processes that records, and optionally forwarding new records to one or all of its children. Finally, a {@link SinkNode sink}
- * is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic. This builder allows you
- * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams}
- * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing records}.
- *
- * @deprecated use {@link Topology} instead
- */
-@SuppressWarnings("unchecked")
-@Deprecated
-public class TopologyBuilder {
-
-    /**
-     * NOTE this member would not needed by developers working with the processor APIs, but only used
-     * for internal functionalities.
-     */
-    public final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
-
-    private Topology.AutoOffsetReset translateAutoOffsetReset(final TopologyBuilder.AutoOffsetReset resetPolicy) {
-        if (resetPolicy == null) {
-            return null;
-        }
-        return resetPolicy == TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
-    }
-
-    /**
-     * NOTE this class would not needed by developers working with the processor APIs, but only used
-     * for internal functionalities.
-     */
-    public static class TopicsInfo {
-        public Set<String> sinkTopics;
-        public Set<String> sourceTopics;
-        public Map<String, InternalTopicConfig> stateChangelogTopics;
-        public Map<String, InternalTopicConfig> repartitionSourceTopics;
-
-        public TopicsInfo(final Set<String> sinkTopics,
-                          final Set<String> sourceTopics,
-                          final Map<String, InternalTopicConfig> repartitionSourceTopics,
-                          final Map<String, InternalTopicConfig> stateChangelogTopics) {
-            this.sinkTopics = sinkTopics;
-            this.sourceTopics = sourceTopics;
-            this.stateChangelogTopics = stateChangelogTopics;
-            this.repartitionSourceTopics = repartitionSourceTopics;
-        }
-
-        @Override
-        public boolean equals(final Object o) {
-            if (o instanceof TopicsInfo) {
-                final TopicsInfo other = (TopicsInfo) o;
-                return other.sourceTopics.equals(sourceTopics) && other.stateChangelogTopics.equals(stateChangelogTopics);
-            } else {
-                return false;
-            }
-        }
-
-        @Override
-        public int hashCode() {
-            final long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode();
-            return (int) (n % 0xFFFFFFFFL);
-        }
-
-        @Override
-        public String toString() {
-            return "TopicsInfo{" +
-                    "sinkTopics=" + sinkTopics +
-                    ", sourceTopics=" + sourceTopics +
-                    ", repartitionSourceTopics=" + repartitionSourceTopics +
-                    ", stateChangelogTopics=" + stateChangelogTopics +
-                    '}';
-        }
-    }
-
-    /**
-     * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}.
-     */
-    public enum AutoOffsetReset {
-        EARLIEST, LATEST
-    }
-
-    /**
-     * Create a new builder.
-     */
-    public TopologyBuilder() {}
-
-    /** This class is not part of public API and should never be used by a developer. */
-    public synchronized final TopologyBuilder setApplicationId(final String applicationId) {
-        internalTopologyBuilder.setApplicationId(applicationId);
-        return this;
-    }
-
-    /**
-     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     *
-     * @param name the unique name of the source used to reference this node when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param topics the name of one or more Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized final TopologyBuilder addSource(final String name,
-                                                        final String... topics) {
-        try {
-            internalTopologyBuilder.addSource(null, name, null, null, null, topics);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     *
-     * @param offsetReset the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latest
-     * @param name the unique name of the source used to reference this node when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param topics the name of one or more Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
-                                                        final String name,
-                                                        final String... topics) {
-        try {
-            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topics);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for this source,
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param name               the unique name of the source used to reference this node when
-     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param topics             the name of one or more Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
-                                                        final String name,
-                                                        final String... topics) {
-        try {
-            internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
-     *
-     * @param offsetReset        the auto offset reset policy to use for this source if no committed offsets found;
-     *                           acceptable values earliest or latest
-     * @param timestampExtractor the stateless timestamp extractor used for this source,
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param name               the unique name of the source used to reference this node when
-     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param topics             the name of one or more Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
-                                                        final TimestampExtractor timestampExtractor,
-                                                        final String name,
-                                                        final String... topics) {
-        try {
-            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topics);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new source that consumes from topics matching the given pattern
-     * and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     *
-     * @param name the unique name of the source used to reference this node when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized final TopologyBuilder addSource(final String name,
-                                                        final Pattern topicPattern) {
-        try {
-            internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new source that consumes from topics matching the given pattern
-     * and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     *
-     * @param offsetReset the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest.
-     * @param name the unique name of the source used to reference this node when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
-                                                        final String name,
-                                                        final Pattern topicPattern) {
-        try {
-            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topicPattern);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-
-    /**
-     * Add a new source that consumes from topics matching the given pattern
-     * and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for this source,
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param name               the unique name of the source used to reference this node when
-     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
-                                                        final String name,
-                                                        final Pattern topicPattern) {
-        try {
-            internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new source that consumes from topics matching the given pattern
-     * and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
-     *
-     * @param offsetReset        the auto offset reset policy value for this source if no committed offsets found;
-     *                           acceptable values earliest or latest.
-     * @param timestampExtractor the stateless timestamp extractor used for this source,
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param name               the unique name of the source used to reference this node when
-     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
-                                                        final TimestampExtractor timestampExtractor,
-                                                        final String name,
-                                                        final Pattern topicPattern) {
-        try {
-            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topicPattern);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
-     * The source will use the specified key and value deserializers.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     *
-     * @param name               the unique name of the source used to reference this node when
-     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
-     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
-     *                           key deserializer defined in the configs will be used
-     * @param valDeserializer    value deserializer used to read this source,
-     *                           if not specified the default value deserializer defined in the configs will be used
-     * @param topics             the name of one or more Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by another source
-     */
-    public synchronized final TopologyBuilder addSource(final String name,
-                                                        final Deserializer keyDeserializer,
-                                                        final Deserializer valDeserializer,
-                                                        final String... topics) {
-        try {
-            internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
-     * The source will use the specified key and value deserializers.
-     *
-     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
-     *                           acceptable values are earliest or latest.
-     * @param name               the unique name of the source used to reference this node when
-     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param timestampExtractor the stateless timestamp extractor used for this source,
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
-     *                           key deserializer defined in the configs will be used
-     * @param valDeserializer    value deserializer used to read this source,
-     *                           if not specified the default value deserializer defined in the configs will be used
-     * @param topics             the name of one or more Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by another source
-     */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
-                                                        final String name,
-                                                        final TimestampExtractor timestampExtractor,
-                                                        final Deserializer keyDeserializer,
-                                                        final Deserializer valDeserializer,
-                                                        final String... topics) {
-        try {
-            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topics);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Adds a global {@link StateStore} to the topology. The {@link StateStore} sources its data
-     * from all partitions of the provided input topic. There will be exactly one instance of this
-     * {@link StateStore} per Kafka Streams instance.
-     * <p>
-     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving
-     * from the partitions of the input topic.
-     * <p>
-     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will
-     * receive all records forwarded from the {@link SourceNode}. This
-     * {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     *
-     * @param storeSupplier         user defined state store supplier
-     * @param sourceName            name of the {@link SourceNode} that will be automatically added
-     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
-     * @param topic                 the topic to source the data from
-     * @param processorName         the name of the {@link ProcessorSupplier}
-     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized TopologyBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
-                                                       final String sourceName,
-                                                       final Deserializer keyDeserializer,
-                                                       final Deserializer valueDeserializer,
-                                                       final String topic,
-                                                       final String processorName,
-                                                       final ProcessorSupplier stateUpdateSupplier) {
-        try {
-            internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Adds a global {@link StateStore} to the topology. The {@link StateStore} sources its data
-     * from all partitions of the provided input topic. There will be exactly one instance of this
-     * {@link StateStore} per Kafka Streams instance.
-     * <p>
-     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving
-     * from the partitions of the input topic.
-     * <p>
-     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will
-     * receive all records forwarded from the {@link SourceNode}. This
-     * {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
-     *
-     * @param storeSupplier         user defined state store supplier
-     * @param sourceName            name of the {@link SourceNode} that will be automatically added
-     * @param timestampExtractor    the stateless timestamp extractor used for this source,
-     *                              if not specified the default extractor defined in the configs will be used
-     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
-     * @param topic                 the topic to source the data from
-     * @param processorName         the name of the {@link ProcessorSupplier}
-     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized TopologyBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
-                                                       final String sourceName,
-                                                       final TimestampExtractor timestampExtractor,
-                                                       final Deserializer keyDeserializer,
-                                                       final Deserializer valueDeserializer,
-                                                       final String topic,
-                                                       final String processorName,
-                                                       final ProcessorSupplier stateUpdateSupplier) {
-        try {
-            internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new source that consumes from topics matching the given pattern
-     * and forwards the records to child processor and/or sink nodes.
-     * The source will use the specified key and value deserializers. The provided
-     * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
-     * topics that share the same key-value data format.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     *
-     * @param name               the unique name of the source used to reference this node when
-     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
-     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
-     *                           key deserializer defined in the configs will be used
-     * @param valDeserializer    value deserializer used to read this source,
-     *                           if not specified the default value deserializer defined in the configs will be used
-     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by name
-     */
-    public synchronized final TopologyBuilder addSource(final String name,
-                                                        final Deserializer keyDeserializer,
-                                                        final Deserializer valDeserializer,
-                                                        final Pattern topicPattern) {
-        try {
-            internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new source that consumes from topics matching the given pattern
-     * and forwards the records to child processor and/or sink nodes.
-     * The source will use the specified key and value deserializers. The provided
-     * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
-     * topics that share the same key-value data format.
-     *
-     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
-     *                           acceptable values are earliest or latest
-     * @param name               the unique name of the source used to reference this node when
-     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param timestampExtractor the stateless timestamp extractor used for this source,
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
-     *                           key deserializer defined in the configs will be used
-     * @param valDeserializer    value deserializer used to read this source,
-     *                           if not specified the default value deserializer defined in the configs will be used
-     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by name
-     */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
-                                                        final String name,
-                                                        final TimestampExtractor timestampExtractor,
-                                                        final Deserializer keyDeserializer,
-                                                        final Deserializer valDeserializer,
-                                                        final Pattern topicPattern) {
-        try {
-            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new source that consumes from topics matching the given pattern
-     * and forwards the records to child processor and/or sink nodes.
-     * The source will use the specified key and value deserializers. The provided
-     * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
-     * topics that share the same key-value data format.
-     *
-     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
-     *                           acceptable values are earliest or latest
-     * @param name               the unique name of the source used to reference this node when
-     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
-     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
-     *                           key deserializer defined in the configs will be used
-     * @param valDeserializer    value deserializer used to read this source,
-     *                           if not specified the default value deserializer defined in the configs will be used
-     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by name
-     */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
-                                                        final String name,
-                                                        final Deserializer keyDeserializer,
-                                                        final Deserializer valDeserializer,
-                                                        final Pattern topicPattern) {
-        try {
-            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, keyDeserializer, valDeserializer, topicPattern);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
-     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
-     *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should write its records
-     * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
-     * and write to its topic
-     * @return this builder instance so methods can be chained together; never null
-     * @see #addSink(String, String, StreamPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
-     */
-    public synchronized final TopologyBuilder addSink(final String name,
-                                                      final String topic,
-                                                      final String... predecessorNames) {
-        try {
-            internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic, using
-     * the supplied partitioner.
-     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
-     * <p>
-     * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
-     * the named Kafka topic's partitions. Such control is often useful with topologies that use
-     * {@link #addStateStore(StateStoreSupplier, String...) state stores}
-     * in its processors. In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
-     * records among partitions using Kafka's default partitioning logic.
-     *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should write its records
-     * @param partitioner the function that should be used to determine the partition for each record processed by the sink
-     * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
-     * and write to its topic
-     * @return this builder instance so methods can be chained together; never null
-     * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
-     */
-    public synchronized final TopologyBuilder addSink(final String name,
-                                                      final String topic,
-                                                      final StreamPartitioner partitioner,
-                                                      final String... predecessorNames) {
-        try {
-            internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
-     * The sink will use the specified key and value serializers.
-     *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should write its records
-     * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
-     * and write to its topic
-     * @return this builder instance so methods can be chained together; never null
-     * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
-     */
-    public synchronized final TopologyBuilder addSink(final String name,
-                                                      final String topic,
-                                                      final Serializer keySerializer,
-                                                      final Serializer valSerializer,
-                                                      final String... predecessorNames) {
-        try {
-            internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
-     * The sink will use the specified key and value serializers, and the supplied partitioner.
-     *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should write its records
-     * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param partitioner the function that should be used to determine the partition for each record processed by the sink
-     * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
-     * and write to its topic
-     * @return this builder instance so methods can be chained together; never null
-     * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
-     */
-    public synchronized final <K, V> TopologyBuilder addSink(final String name,
-                                                             final String topic,
-                                                             final Serializer<K> keySerializer,
-                                                             final Serializer<V> valSerializer,
-                                                             final StreamPartitioner<? super K, ? super V> partitioner,
-                                                             final String... predecessorNames) {
-        try {
-            internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Add a new processor node that receives and processes records output by one or more predecessor source or processor node.
-     * Any new record output by this processor will be forwarded to its child processor or sink nodes.
-     * @param name the unique name of the processor node
-     * @param supplier the supplier used to obtain this node's {@link Processor} instance
-     * @param predecessorNames the name of one or more source or processor nodes whose output records this processor should receive
-     * and process
-     * @return this builder instance so methods can be chained together; never null
-     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
-     */
-    public synchronized final TopologyBuilder addProcessor(final String name,
-                                                           final ProcessorSupplier supplier,
-                                                           final String... predecessorNames) {
-        try {
-            internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Adds a state store
-     *
-     * @param supplier the supplier used to obtain this state store {@link StateStore} instance
-     * @return this builder instance so methods can be chained together; never null
-     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if state store supplier is already added
-     */
-    public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier,
-                                                            final String... processorNames) {
-        try {
-            internalTopologyBuilder.addStateStore(supplier, processorNames);
-        } catch (final TopologyException e) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-        }
-        return this;
-    }
-
-    /**
-     * Connects the processor and the state stores
-     *
-     * @param processorName the name of the processor
-     * @param stateStoreNames the names of state stores that the processor uses
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized final TopologyBuilder connectProcessorAndStateStores(final String processorName,
-                                                                             final String... stateStoreNames) {
-        if (stateStoreNames != null && stateStoreNames.length > 0) {
-            try {
-                internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
-            } catch (final TopologyException e) {
-                throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
-            }
-        }
-        return this;
-    }
-
-    /**
-     * This is used only for KStreamBuilder: when adding a KTable from a source topic,
-     * we need to add the topic as the KTable's materialized state store's changelog.
-     *
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     */
-    protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String sourceStoreName,
-                                                                            final String topic) {
-        internalTopologyBuilder.connectSourceStoreAndTopic(sourceStoreName, topic);
-        return this;
-    }
-
-    /**
-     * Connects a list of processors.
-     *
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     *
-     * @param processorNames the name of the processors
-     * @return this builder instance so methods can be chained together; never null
-     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
-     */
-    public synchronized final TopologyBuilder connectProcessors(final String... processorNames) {
-        internalTopologyBuilder.connectProcessors(processorNames);
-        return this;
-    }
-
-    /**
-     * Adds an internal topic
-     *
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     *
-     * @param topicName the name of the topic
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized final TopologyBuilder addInternalTopic(final String topicName) {
-        internalTopologyBuilder.addInternalTopic(topicName);
-        return this;
-    }
-
-    /**
-     * Asserts that the streams of the specified source nodes must be copartitioned.
-     *
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     *
-     * @param sourceNodes a set of source node names
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized final TopologyBuilder copartitionSources(final Collection<String> sourceNodes) {
-        internalTopologyBuilder.copartitionSources(sourceNodes);
-        return this;
-    }
-
-    /**
-     * Returns the map of node groups keyed by the topic group id.
-     *
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     *
-     * @return groups of node names
-     */
-    public synchronized Map<Integer, Set<String>> nodeGroups() {
-        return internalTopologyBuilder.nodeGroups();
-    }
-
-    /**
-     * Build the topology for the specified topic group. This is called automatically when passing this builder into the
-     * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)} constructor.
-     *
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     *
-     * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
-     */
-    public synchronized ProcessorTopology build(final Integer topicGroupId) {
-        return internalTopologyBuilder.build(topicGroupId);
-    }
-
-    /**
-     * Builds the topology for any global state stores
-     *
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     *
-     * @return ProcessorTopology
-     */
-    public synchronized ProcessorTopology buildGlobalStateTopology() {
-        return internalTopologyBuilder.buildGlobalStateTopology();
-    }
-
-    /**
-     * Get any global {@link StateStore}s that are part of the
-     * topology
-     *
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     *
-     * @return map containing all global {@link StateStore}s
-     */
-    public Map<String, StateStore> globalStateStores() {
-        return internalTopologyBuilder.globalStateStores();
-    }
-
-    /**
-     * Returns the map of topic groups keyed by the group id.
-     * A topic group is a group of topics in the same task.
-     *
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     *
-     * @return groups of topic names
-     */
-    public synchronized Map<Integer, TopicsInfo> topicGroups() {
-        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroupsWithNewTopicsInfo = internalTopologyBuilder.topicGroups();
-        final Map<Integer, TopicsInfo> topicGroupsWithDeprecatedTopicInfo = new HashMap<>();
-
-        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroupsWithNewTopicsInfo.entrySet()) {
-            final InternalTopologyBuilder.TopicsInfo newTopicsInfo = entry.getValue();
-
-            topicGroupsWithDeprecatedTopicInfo.put(entry.getKey(), new TopicsInfo(
-                newTopicsInfo.sinkTopics,
-                newTopicsInfo.sourceTopics,
-                newTopicsInfo.repartitionSourceTopics,
-                newTopicsInfo.stateChangelogTopics));
-        }
-
-        return topicGroupsWithDeprecatedTopicInfo;
-    }
-
-    /**
-     * Get the Pattern to match all topics requiring to start reading from earliest available offset
-     *
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     *
-     * @return the Pattern for matching all topics reading from earliest offset, never null
-     */
-    public synchronized Pattern earliestResetTopicsPattern() {
-        return internalTopologyBuilder.earliestResetTopicsPattern();
-    }
-
-    /**
-     * Get the Pattern to match all topics requiring to start reading from latest available offset
-     *
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     *
-     * @return the Pattern for matching all topics reading from latest offset, never null
-     */
-    public synchronized Pattern latestResetTopicsPattern() {
-        return internalTopologyBuilder.latestResetTopicsPattern();
-    }
-
-    /**
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     *
-     * @return a mapping from state store name to a Set of source Topics.
-     */
-    public Map<String, List<String>> stateStoreNameToSourceTopics() {
-        return internalTopologyBuilder.stateStoreNameToSourceTopics();
-    }
-
-    /**
-     * Returns the copartition groups.
-     * A copartition group is a group of source topics that are required to be copartitioned.
-     *
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     *
-     * @return groups of topic names
-     */
-    public synchronized Collection<Set<String>> copartitionGroups() {
-        return internalTopologyBuilder.copartitionGroups();
-    }
-
-    /**
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     */
-    public SubscriptionUpdates subscriptionUpdates() {
-        SubscriptionUpdates clonedSubscriptionUpdates = new SubscriptionUpdates();
-        clonedSubscriptionUpdates.updateTopics(internalTopologyBuilder.subscriptionUpdates().getUpdates());
-        return clonedSubscriptionUpdates;
-    }
-
-    /**
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     */
-    public synchronized Pattern sourceTopicPattern() {
-        return internalTopologyBuilder.sourceTopicPattern();
-    }
-
-    /**
-     * NOTE this function would not needed by developers working with the processor APIs, but only used
-     * for the high-level DSL parsing functionalities.
-     */
-    public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
-                                                 final String threadId) {
-        internalTopologyBuilder.updateSubscribedTopics(new HashSet<>(subscriptionUpdates.getUpdates()), "stream-thread [" + threadId + "] ");
-    }
-
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index bfe8cda..70437e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
-import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -175,35 +174,6 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private static class StateStoreSupplierFactory extends AbstractStateStoreFactory {
-        @SuppressWarnings("deprecation")
-        private final org.apache.kafka.streams.processor.StateStoreSupplier supplier;
-
-        @SuppressWarnings("deprecation")
-        StateStoreSupplierFactory(final org.apache.kafka.streams.processor.StateStoreSupplier<?> supplier) {
-            super(supplier.name(),
-                  supplier.loggingEnabled(),
-                  supplier instanceof WindowStoreSupplier,
-                  supplier.logConfig());
-            this.supplier = supplier;
-
-        }
-
-        @Override
-        public StateStore build() {
-            return supplier.get();
-        }
-
-        @SuppressWarnings("deprecation")
-        @Override
-        public long retentionPeriod() {
-            if (!isWindowStore()) {
-                throw new IllegalStateException("retentionPeriod is not supported when not a window store");
-            }
-            return ((WindowStoreSupplier) supplier).retentionPeriod();
-        }
-    }
-
     private static class StoreBuilderFactory extends AbstractStateStoreFactory {
         private final StoreBuilder builder;
 
@@ -432,7 +402,19 @@ public class InternalTopologyBuilder {
 
         for (final String sourceTopicName : sourceTopicNames) {
             if (topicPattern.matcher(sourceTopicName).matches()) {
-                throw new TopologyException("Pattern  " + topicPattern + " will match a topic that has already been registered by another source.");
+                throw new TopologyException("Pattern " + topicPattern + " will match a topic that has already been registered by another source.");
+            }
+        }
+
+        for (final Pattern otherPattern : earliestResetPatterns) {
+            if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
+                throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
+            }
+        }
+
+        for (final Pattern otherPattern : latestResetPatterns) {
+            if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
+                throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
             }
         }
 
@@ -498,24 +480,6 @@ public class InternalTopologyBuilder {
         nodeGrouper.unite(name, predecessorNames);
     }
 
-    @SuppressWarnings("deprecation")
-    public final void addStateStore(final org.apache.kafka.streams.processor.StateStoreSupplier supplier,
-                                    final String... processorNames) {
-        Objects.requireNonNull(supplier, "supplier can't be null");
-        if (stateFactories.containsKey(supplier.name())) {
-            throw new TopologyException("StateStore " + supplier.name() + " is already added.");
-        }
-
-        stateFactories.put(supplier.name(), new StateStoreSupplierFactory(supplier));
-
-        if (processorNames != null) {
-            for (final String processorName : processorNames) {
-                Objects.requireNonNull(processorName, "processor name must not be null");
-                connectProcessorAndStateStore(processorName, supplier.name());
-            }
-        }
-    }
-
     public final void addStateStore(final StoreBuilder storeBuilder,
                                     final String... processorNames) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
@@ -533,36 +497,6 @@ public class InternalTopologyBuilder {
         }
     }
 
-    @SuppressWarnings("deprecation")
-    public final void addGlobalStore(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
-                                     final String sourceName,
-                                     final TimestampExtractor timestampExtractor,
-                                     final Deserializer keyDeserializer,
-                                     final Deserializer valueDeserializer,
-                                     final String topic,
-                                     final String processorName,
-                                     final ProcessorSupplier stateUpdateSupplier) {
-        Objects.requireNonNull(storeSupplier, "store supplier must not be null");
-        final String name = storeSupplier.name();
-        validateGlobalStoreArguments(sourceName,
-                                     topic,
-                                     processorName,
-                                     stateUpdateSupplier,
-                                     name,
-                                     storeSupplier.loggingEnabled());
-        validateTopicNotAlreadyRegistered(topic);
-        addGlobalStore(sourceName,
-                       timestampExtractor,
-                       keyDeserializer,
-                       valueDeserializer,
-                       topic,
-                       processorName,
-                       stateUpdateSupplier,
-                       name,
-                       storeSupplier.get());
-    }
-
-
     public final void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
                                      final String sourceName,
                                      final TimestampExtractor timestampExtractor,
@@ -1145,47 +1079,18 @@ public class InternalTopologyBuilder {
     }
 
     public synchronized Pattern earliestResetTopicsPattern() {
-        return resetTopicsPattern(earliestResetTopics, earliestResetPatterns, latestResetTopics, latestResetPatterns);
+        return resetTopicsPattern(earliestResetTopics, earliestResetPatterns);
     }
 
     public synchronized Pattern latestResetTopicsPattern() {
-        return resetTopicsPattern(latestResetTopics, latestResetPatterns, earliestResetTopics, earliestResetPatterns);
+        return resetTopicsPattern(latestResetTopics, latestResetPatterns);
     }
 
     private Pattern resetTopicsPattern(final Set<String> resetTopics,
-                                       final Set<Pattern> resetPatterns,
-                                       final Set<String> otherResetTopics,
-                                       final Set<Pattern> otherResetPatterns) {
+                                       final Set<Pattern> resetPatterns) {
         final List<String> topics = maybeDecorateInternalSourceTopics(resetTopics);
-        final Pattern pattern = buildPatternForOffsetResetTopics(topics, resetPatterns);
-
-        ensureNoRegexOverlap(pattern, otherResetPatterns, otherResetTopics);
-
-        return pattern;
-    }
-
-    // TODO: we should check regex overlap at topology construction time and then throw TopologyException
-    //       instead of at runtime. See KAFKA-5660
-    private void ensureNoRegexOverlap(final Pattern builtPattern,
-                                      final Set<Pattern> otherPatterns,
-                                      final Set<String> otherTopics) {
-        for (final Pattern otherPattern : otherPatterns) {
-            if (builtPattern.pattern().contains(otherPattern.pattern())) {
-                throw new TopologyException(
-                    String.format("Found overlapping regex [%s] against [%s] for a KStream with auto offset resets",
-                        otherPattern.pattern(),
-                        builtPattern.pattern()));
-            }
-        }
 
-        for (final String otherTopic : otherTopics) {
-            if (builtPattern.matcher(otherTopic).matches()) {
-                throw new TopologyException(
-                    String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets",
-                        builtPattern.pattern(),
-                        otherTopic));
-            }
-        }
+        return buildPatternForOffsetResetTopics(topics, resetPatterns);
     }
 
     private static Pattern buildPatternForOffsetResetTopics(final Collection<String> sourceTopics,
@@ -1885,6 +1790,9 @@ public class InternalTopologyBuilder {
         updateSubscriptions(subscriptionUpdates, logPrefix);
     }
 
+
+    // following functions are for test only
+
     public synchronized Set<String> getSourceTopicNames() {
         return sourceTopicNames;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 1f00c04..d7a9b33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -755,38 +755,6 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         }
     }
 
-    /**
-     * Used to capture subscribed topic via Patterns discovered during the
-     * partition assignment process.
-     *
-     * // TODO: this is a duplicate of the InternalTopologyBuilder#SubscriptionUpdates
-     *          and is maintained only for compatibility of the deprecated TopologyBuilder API
-     */
-    public static class SubscriptionUpdates {
-
-        private final Set<String> updatedTopicSubscriptions = new HashSet<>();
-
-        public void updateTopics(final Collection<String> topicNames) {
-            updatedTopicSubscriptions.clear();
-            updatedTopicSubscriptions.addAll(topicNames);
-        }
-
-        public Collection<String> getUpdates() {
-            return Collections.unmodifiableSet(updatedTopicSubscriptions);
-        }
-
-        public boolean hasUpdates() {
-            return !updatedTopicSubscriptions.isEmpty();
-        }
-
-        @Override
-        public String toString() {
-            return "SubscriptionUpdates{" +
-                    "updatedTopicSubscriptions=" + updatedTopicSubscriptions +
-                    '}';
-        }
-    }
-
     static class CopartitionedTopicsValidator {
         private final String logPrefix;
 
@@ -794,7 +762,6 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
             this.logPrefix = logPrefix;
         }
 
-        @SuppressWarnings("deprecation")
         void validate(final Set<String> copartitionGroup,
                       final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
                       final Cluster metadata) {
@@ -805,7 +772,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                     final Integer partitions = metadata.partitionCountForTopic(topic);
 
                     if (partitions == null) {
-                        throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic not found: %s", logPrefix, topic));
+                        throw new org.apache.kafka.streams.errors.TopologyException(String.format("%sTopic not found: %s", logPrefix, topic));
                     }
 
                     if (numPartitions == UNKNOWN) {
@@ -813,7 +780,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                     } else if (numPartitions != partitions) {
                         final String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
                         Arrays.sort(topics);
-                        throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
+                        throw new org.apache.kafka.streams.errors.TopologyException(String.format("%sTopics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
                     }
                 } else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) {
                     numPartitions = NOT_AVAILABLE;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
index 39b9d03..fdcd2e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
@@ -25,19 +25,19 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-abstract class AbstractStoreBuilder<K, V, T extends StateStore> implements StoreBuilder<T> {
-    private final String name;
+abstract public class AbstractStoreBuilder<K, V, T extends StateStore> implements StoreBuilder<T> {
     private Map<String, String> logConfig = new HashMap<>();
+    protected final String name;
     final Serde<K> keySerde;
     final Serde<V> valueSerde;
     final Time time;
     boolean enableCaching;
     boolean enableLogging = true;
 
-    AbstractStoreBuilder(final String name,
-                         final Serde<K> keySerde,
-                         final Serde<V> valueSerde,
-                         final Time time) {
+    public AbstractStoreBuilder(final String name,
+                                final Serde<K> keySerde,
+                                final Serde<V> valueSerde,
+                                final Time time) {
         Objects.requireNonNull(name, "name can't be null");
         Objects.requireNonNull(time, "time can't be null");
         this.name = name;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
deleted file mode 100644
index 10d0fe2..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
-
-import java.util.Map;
-
-@Deprecated
-abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements org.apache.kafka.streams.processor.StateStoreSupplier<T> {
-    protected final String name;
-    protected final Serde<K> keySerde;
-    protected final Serde<V> valueSerde;
-    protected final Time time;
-    protected final boolean logged;
-    protected final Map<String, String> logConfig;
-
-    AbstractStoreSupplier(final String name,
-                          final Serde<K> keySerde,
-                          final Serde<V> valueSerde,
-                          final Time time,
-                          final boolean logged,
-                          final Map<String, String> logConfig) {
-        this.time = time;
-        this.name = name;
-        this.valueSerde = valueSerde;
-        this.keySerde = keySerde;
-        this.logged = logged;
-        this.logConfig = logConfig;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    public Map<String, String> logConfig() {
-        return logConfig;
-    }
-
-    public boolean loggingEnabled() {
-        return logged;
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
deleted file mode 100644
index 3bc56c2..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-import java.util.Map;
-
-/**
- * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- */
-@Deprecated
-public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
-
-    private final KeyValueStoreBuilder<K, V> builder;
-
-    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean cached) {
-        this(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig, cached);
-    }
-
-    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig, boolean cached) {
-        super(name, keySerde, valueSerde, time, logged, logConfig);
-        builder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier(name),
-                                             keySerde,
-                                             valueSerde,
-                                             time);
-        if (cached) {
-            builder.withCachingEnabled();
-        }
-        // logged by default so we only need to worry about when it is disabled.
-        if (!logged) {
-            builder.withLoggingDisabled();
-        }
-    }
-
-    public KeyValueStore get() {
-        return builder.build();
-    }
-
-
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
deleted file mode 100644
index fa976a8..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams;
-
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-
-import java.util.Properties;
-
-/**
- *  This class allows the instantiation of a {@link TopologyTestDriver} using a
- *  {@link InternalTopologyBuilder} by exposing a protected constructor.
- *
- *  It should be used only for testing, and should be removed once the deprecated
- *  classes {@link org.apache.kafka.streams.kstream.KStreamBuilder} and
- *  {@link org.apache.kafka.streams.processor.TopologyBuilder} are removed.
- */
-public class TopologyTestDriverWrapper extends TopologyTestDriver {
-
-    public TopologyTestDriverWrapper(final InternalTopologyBuilder builder,
-                                     final Properties config) {
-        super(builder, config);
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
index f106766..60f0e6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
@@ -24,6 +24,10 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
  */
 public class TopologyWrapper extends Topology {
 
+    static public InternalTopologyBuilder getInternalTopologyBuilder(final Topology topology) {
+        return topology.internalTopologyBuilder;
+    }
+
     public InternalTopologyBuilder getInternalBuilder() {
         return internalTopologyBuilder;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
similarity index 95%
rename from streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
rename to streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 0f7df6b..a9f6fa8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -64,7 +63,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 @Category({IntegrationTest.class})
-public class KStreamsFineGrainedAutoResetIntegrationTest {
+public class FineGrainedAutoResetIntegrationTest {
 
     private static final int NUM_BROKERS = 1;
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
@@ -246,16 +245,13 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
     }
 
     @Test
-    public void shouldThrowExceptionOverlappingPattern() throws  Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
+    public void shouldThrowExceptionOverlappingPattern() {
+        final StreamsBuilder builder = new StreamsBuilder();
         //NOTE this would realistically get caught when building topology, the test is for completeness
-        builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
-        builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1"));
+        builder.stream(Pattern.compile("topic-[A-D]_1"), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
 
-        // TODO: we should check regex overlap at topology construction time and then throw TopologyException
-        //       instead of at runtime. See KAFKA-5660
         try {
-            builder.earliestResetTopicsPattern();
+            builder.stream(Pattern.compile("topic-[A-D]_1"), Consumed.with(Topology.AutoOffsetReset.LATEST));
             fail("Should have thrown TopologyException");
         } catch (final TopologyException expected) {
             // do nothing
@@ -263,7 +259,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
     }
 
     @Test
-    public void shouldThrowExceptionOverlappingTopic() throws  Exception {
+    public void shouldThrowExceptionOverlappingTopic() {
         final StreamsBuilder builder = new StreamsBuilder();
         //NOTE this would realistically get caught when building topology, the test is for completeness
         builder.stream(Pattern.compile("topic-[A-D]_1"), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index e5160e1..770a3ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -37,9 +37,9 @@ import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStoreBuilder;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -58,11 +58,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 
 /**
  * End-to-end integration test based on using regex and named topics for creating sources, using
@@ -229,7 +230,7 @@ public class RegexSourceIntegrationTest {
     public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException {
 
         final ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
-        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("testStateStore"), Serdes.String(), Serdes.String());
+        final StoreBuilder storeBuilder = new MockStoreBuilder("testStateStore", false);
         final long thirtySecondTimeout = 30 * 1000;
 
         final TopologyWrapper topology = new TopologyWrapper();
@@ -258,7 +259,6 @@ public class RegexSourceIntegrationTest {
         }
     }
 
-
     @Test
     public void testShouldReadFromRegexAndNamedTopics() throws Exception {
 
@@ -375,30 +375,31 @@ public class RegexSourceIntegrationTest {
 
     }
 
-    // TODO should be updated to expected = TopologyBuilderException after KAFKA-3708
-    @Test(expected = AssertionError.class)
+    @Test
     public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
-
-        final String fooMessage = "fooMessage";
         final String fMessage = "fMessage";
-
-
+        final String fooMessage = "fooMessage";
         final Serde<String> stringSerde = Serdes.String();
-
         final StreamsBuilder builder = new StreamsBuilder();
 
-
-        // overlapping patterns here, no messages should be sent as TopologyBuilderException
+        // overlapping patterns here, no messages should be sent as TopologyException
         // will be thrown when the processor topology is built.
-
         final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*"));
         final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*"));
 
-
         pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
         pattern2Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
 
+        final AtomicBoolean expectError = new AtomicBoolean(false);
+
         streams = new KafkaStreams(builder.build(), streamsConfiguration);
+        streams.setStateListener(new KafkaStreams.StateListener() {
+            @Override
+            public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
+                if (newState == KafkaStreams.State.ERROR)
+                    expectError.set(true);
+            }
+        });
         streams.start();
 
         final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
@@ -407,9 +408,14 @@ public class RegexSourceIntegrationTest {
         IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig, mockTime);
 
         final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
+        try {
+            IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000);
+            throw new IllegalStateException("This should not happen: an assertion error should have been thrown before this.");
+        } catch (final AssertionError e) {
+            // this is fine
+        }
 
-        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000);
-        fail("Should not get here");
+        assertThat(expectError.get(), is(true));
     }
 
     private static class TheConsumerRebalanceListener implements ConsumerRebalanceListener {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
deleted file mode 100644
index 27f0833..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.TopologyTestDriverWrapper;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
-import org.apache.kafka.streams.kstream.internals.KStreamImpl;
-import org.apache.kafka.streams.kstream.internals.KTableImpl;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockTimestampExtractor;
-import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-@SuppressWarnings("deprecation")
-public class KStreamBuilderTest {
-
-    private static final String APP_ID = "app-id";
-
-    private final KStreamBuilder builder = new KStreamBuilder();
-    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private TopologyTestDriverWrapper driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        builder.setApplicationId(APP_ID);
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-builder-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testFrom() {
-        builder.stream("topic-1", "topic-2");
-
-        builder.addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
-    }
-
-    @Test
-    public void testNewName() {
-        assertEquals("X-0000000000", builder.newName("X-"));
-        assertEquals("Y-0000000001", builder.newName("Y-"));
-        assertEquals("Z-0000000002", builder.newName("Z-"));
-
-        final KStreamBuilder newBuilder = new KStreamBuilder();
-
-        assertEquals("X-0000000000", newBuilder.newName("X-"));
-        assertEquals("Y-0000000001", newBuilder.newName("Y-"));
-        assertEquals("Z-0000000002", newBuilder.newName("Z-"));
-    }
-
-
-    @Test
-    public void shouldProcessFromSinkTopic() {
-        final KStream<String, String> source = builder.stream("topic-source");
-        source.to("topic-sink");
-
-        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
-
-        source.process(processorSupplier);
-
-        driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
-        driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
-
-        // no exception was thrown
-        assertEquals(Utils.mkList("A:aa"), processorSupplier.theCapturedProcessor().processed);
-    }
-
-    @Test
-    public void shouldProcessViaThroughTopic() {
-        final KStream<String, String> source = builder.stream("topic-source");
-        final KStream<String, String> through = source.through("topic-sink");
-
-        final MockProcessorSupplier<String, String> sourceProcessorSupplier = new MockProcessorSupplier<>();
-        final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>();
-
-        source.process(sourceProcessorSupplier);
-        through.process(throughProcessorSupplier);
-
-        driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
-        driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
-
-        assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.theCapturedProcessor().processed);
-        assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.theCapturedProcessor().processed);
-    }
-
-    @Test
-    public void testNewStoreName() {
-        assertEquals("X-STATE-STORE-0000000000", builder.newStoreName("X-"));
-        assertEquals("Y-STATE-STORE-0000000001", builder.newStoreName("Y-"));
-        assertEquals("Z-STATE-STORE-0000000002", builder.newStoreName("Z-"));
-
-        KStreamBuilder newBuilder = new KStreamBuilder();
-
-        assertEquals("X-STATE-STORE-0000000000", newBuilder.newStoreName("X-"));
-        assertEquals("Y-STATE-STORE-0000000001", newBuilder.newStoreName("Y-"));
-        assertEquals("Z-STATE-STORE-0000000002", newBuilder.newStoreName("Z-"));
-    }
-
-    @Test
-    public void testMerge() {
-        final String topic1 = "topic-1";
-        final String topic2 = "topic-2";
-
-        final KStream<String, String> source1 = builder.stream(topic1);
-        final KStream<String, String> source2 = builder.stream(topic2);
-        final KStream<String, String> merged = builder.merge(source1, source2);
-
-        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
-        merged.process(processorSupplier);
-
-        driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
-
-        driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
-        driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
-        driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
-        driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
-
-        assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
-    }
-
-    @Test
-    public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() {
-        final String topic1 = "topic-1";
-        final String topic2 = "topic-2";
-        final String topic3 = "topic-3";
-        final KStream<String, String> source1 = builder.stream(topic1);
-        final KStream<String, String> source2 = builder.stream(topic2);
-        final KStream<String, String> source3 = builder.stream(topic3);
-        final KStream<String, String> processedSource1 =
-                source1.mapValues(new ValueMapper<String, String>() {
-                    @Override
-                    public String apply(final String value) {
-                        return value;
-                    }
-                }).filter(new Predicate<String, String>() {
-                    @Override
-                    public boolean test(final String key, final String value) {
-                        return true;
-                    }
-                });
-        final KStream<String, String> processedSource2 = source2.filter(new Predicate<String, String>() {
-            @Override
-            public boolean test(final String key, final String value) {
-                return true;
-            }
-        });
-
-        final KStream<String, String> merged = processedSource1.merge(processedSource2).merge(source3);
-        merged.groupByKey().count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("my-table"));
-        final Map<String, List<String>> actual = builder.stateStoreNameToSourceTopics();
-        assertEquals(Utils.mkList("topic-1", "topic-2", "topic-3"), actual.get("my-table"));
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void shouldThrowExceptionWhenNoTopicPresent() {
-        builder.stream();
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowExceptionWhenTopicNamesAreNull() {
-        builder.stream(Serdes.String(), Serdes.String(), null, null);
-    }
-
-    @Test
-    public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() {
-        KTable table1 = builder.table("topic1", "table1");
-        KTable table2 = builder.table("topic2", (String) null);
-
-        final ProcessorTopology topology = builder.build(null);
-
-        assertEquals(2, topology.stateStores().size());
-        assertEquals("table1", topology.stateStores().get(0).name());
-
-        final String internalStoreName = topology.stateStores().get(1).name();
-        assertTrue(internalStoreName.contains(KTableImpl.STATE_STORE_NAME));
-        assertEquals(2, topology.storeToChangelogTopic().size());
-        assertEquals("topic1", topology.storeToChangelogTopic().get("table1"));
-        assertEquals("topic2", topology.storeToChangelogTopic().get(internalStoreName));
-        assertEquals(table1.queryableStoreName(), "table1");
-        assertNull(table2.queryableStoreName());
-    }
-
-    @Test
-    public void shouldBuildSimpleGlobalTableTopology() {
-        builder.globalTable("table", "globalTable");
-
-        final ProcessorTopology topology = builder.buildGlobalStateTopology();
-        final List<StateStore> stateStores = topology.globalStateStores();
-
-        assertEquals(1, stateStores.size());
-        assertEquals("globalTable", stateStores.get(0).name());
-    }
-
-    private void doBuildGlobalTopologyWithAllGlobalTables() {
-        final ProcessorTopology topology = builder.buildGlobalStateTopology();
-
-        final List<StateStore> stateStores = topology.globalStateStores();
-        final Set<String> sourceTopics = topology.sourceTopics();
-
-        assertEquals(Utils.mkSet("table", "table2"), sourceTopics);
-        assertEquals(2, stateStores.size());
-    }
-
-    @Test
-    public void shouldBuildGlobalTopologyWithAllGlobalTables() {
-        builder.globalTable("table", "globalTable");
-        builder.globalTable("table2", "globalTable2");
-
-        doBuildGlobalTopologyWithAllGlobalTables();
-    }
-
-    @Test
-    public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() {
-        builder.globalTable("table");
-        builder.globalTable("table2");
-
-        doBuildGlobalTopologyWithAllGlobalTables();
-    }
-
-    @Test
-    public void shouldAddGlobalTablesToEachGroup() {
-        final String one = "globalTable";
-        final String two = "globalTable2";
-        final GlobalKTable<String, String> globalTable = builder.globalTable("table", one);
-        final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", two);
-
-        builder.table("not-global", "not-global");
-
-        final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
-            @Override
-            public String apply(final String key, final String value) {
-                return value;
-            }
-        };
-
-        final KStream<String, String> stream = builder.stream("t1");
-        stream.leftJoin(globalTable, kvMapper, MockValueJoiner.TOSTRING_JOINER);
-        final KStream<String, String> stream2 = builder.stream("t2");
-        stream2.leftJoin(globalTable2, kvMapper, MockValueJoiner.TOSTRING_JOINER);
-
-        final Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
-        for (Integer groupId : nodeGroups.keySet()) {
-            final ProcessorTopology topology = builder.build(groupId);
-            final List<StateStore> stateStores = topology.globalStateStores();
-            final Set<String> names = new HashSet<>();
-            for (StateStore stateStore : stateStores) {
-                names.add(stateStore.name());
-            }
-
-            assertEquals(2, stateStores.size());
-            assertTrue(names.contains(one));
-            assertTrue(names.contains(two));
-        }
-    }
-
-    @Test
-    public void shouldMapStateStoresToCorrectSourceTopics() {
-        final KStream<String, String> playEvents = builder.stream("events");
-
-        final KTable<String, String> table = builder.table("table-topic", "table-store");
-        assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
-
-        final KStream<String, String> mapped = playEvents.map(MockMapper.<String, String>selectValueKeyValueMapper());
-        mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count"));
-        assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
-        assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count"));
-    }
-
-    @Test
-    public void shouldAddTopicToEarliestAutoOffsetResetList() {
-        final String topicName = "topic-1";
-
-        builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicName);
-
-        assertTrue(builder.earliestResetTopicsPattern().matcher(topicName).matches());
-        assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches());
-    }
-
-    @Test
-    public void shouldAddTopicToLatestAutoOffsetResetList() {
-        final String topicName = "topic-1";
-
-        builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, topicName);
-
-        assertTrue(builder.latestResetTopicsPattern().matcher(topicName).matches());
-        assertFalse(builder.earliestResetTopicsPattern().matcher(topicName).matches());
-    }
-
-    @Test
-    public void shouldAddTableToEarliestAutoOffsetResetList() {
-        final String topicName = "topic-1";
-        final String storeName = "test-store";
-
-        builder.table(TopologyBuilder.AutoOffsetReset.EARLIEST, topicName, storeName);
-
-        assertTrue(builder.earliestResetTopicsPattern().matcher(topicName).matches());
-        assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches());
-    }
-
-    @Test
-    public void shouldAddTableToLatestAutoOffsetResetList() {
-        final String topicName = "topic-1";
-        final String storeName = "test-store";
-
-        builder.table(TopologyBuilder.AutoOffsetReset.LATEST, topicName, storeName);
-
-        assertTrue(builder.latestResetTopicsPattern().matcher(topicName).matches());
-        assertFalse(builder.earliestResetTopicsPattern().matcher(topicName).matches());
-    }
-
-    @Test
-    public void shouldNotAddTableToOffsetResetLists() {
-        final String topicName = "topic-1";
-        final String storeName = "test-store";
-        final Serde<String> stringSerde = Serdes.String();
-
-        builder.table(stringSerde, stringSerde, topicName, storeName);
-
-        assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches());
-        assertFalse(builder.earliestResetTopicsPattern().matcher(topicName).matches());
-    }
-
-    @Test
-    public void shouldNotAddRegexTopicsToOffsetResetLists() {
-        final Pattern topicPattern = Pattern.compile("topic-\\d");
-        final String topic = "topic-5";
-
-        builder.stream(topicPattern);
-
-        assertFalse(builder.latestResetTopicsPattern().matcher(topic).matches());
-        assertFalse(builder.earliestResetTopicsPattern().matcher(topic).matches());
-
-    }
-
-    @Test
-    public void shouldAddRegexTopicToEarliestAutoOffsetResetList() {
-        final Pattern topicPattern = Pattern.compile("topic-\\d+");
-        final String topicTwo = "topic-500000";
-
-        builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicPattern);
-
-        assertTrue(builder.earliestResetTopicsPattern().matcher(topicTwo).matches());
-        assertFalse(builder.latestResetTopicsPattern().matcher(topicTwo).matches());
-    }
-
-    @Test
-    public void shouldAddRegexTopicToLatestAutoOffsetResetList() {
-        final Pattern topicPattern = Pattern.compile("topic-\\d+");
-        final String topicTwo = "topic-1000000";
-
-        builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, topicPattern);
-
-        assertTrue(builder.latestResetTopicsPattern().matcher(topicTwo).matches());
-        assertFalse(builder.earliestResetTopicsPattern().matcher(topicTwo).matches());
-    }
-
-    @Test
-    public void kStreamTimestampExtractorShouldBeNull() {
-        builder.stream("topic");
-        final ProcessorTopology processorTopology = builder.build(null);
-        assertNull(processorTopology.source("topic").getTimestampExtractor());
-    }
-
-    @Test
-    public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() {
-        builder.stream(new MockTimestampExtractor(), null, null, "topic");
-        final ProcessorTopology processorTopology = builder.build(null);
-        for (final SourceNode sourceNode: processorTopology.sources()) {
-            assertThat(sourceNode.getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
-        }
-    }
-
-    @Test
-    public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() {
-        builder.stream(null, new MockTimestampExtractor(), null, null, "topic");
-        final ProcessorTopology processorTopology = builder.build(null);
-        assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
-    }
-
-    @Test
-    public void shouldAddTimestampExtractorToTablePerSource() {
-        builder.table("topic", "store");
-        final ProcessorTopology processorTopology = builder.build(null);
-        assertNull(processorTopology.source("topic").getTimestampExtractor());
-    }
-
-    @Test
-    public void kTableTimestampExtractorShouldBeNull() {
-        builder.table("topic", "store");
-        final ProcessorTopology processorTopology = builder.build(null);
-        assertNull(processorTopology.source("topic").getTimestampExtractor());
-    }
-
-    @Test
-    public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() {
-        builder.table(null, new MockTimestampExtractor(), null, null, "topic", "store");
-        final ProcessorTopology processorTopology = builder.build(null);
-        assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
-    }
-}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index c6ee70f..463afb8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -24,15 +24,14 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.Printed;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -223,12 +222,11 @@ public class KStreamImplTest {
     }
 
     @Test
-    // TODO: this test should be refactored when we removed KStreamBuilder so that the created Topology contains internal topics as well
     public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
-        final KStreamBuilder builder = new KStreamBuilder();
-        KStream<String, String> kStream = builder.stream(Serdes.String(), Serdes.String(), "topic-1");
-        ValueJoiner<String, String, String> valueJoiner = MockValueJoiner.instance(":");
-        long windowSize = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, String> kStream = builder.stream("topic-1", stringConsumed);
+        final ValueJoiner<String, String, String> valueJoiner = MockValueJoiner.instance(":");
+        final long windowSize = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
         final KStream<String, String> stream = kStream
                         .map(new KeyValueMapper<String, String, KeyValue<? extends String, ? extends String>>() {
                             @Override
@@ -244,10 +242,11 @@ public class KStreamImplTest {
                                 Serdes.String()))
                 .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
 
-        ProcessorTopology processorTopology = builder.setApplicationId("X").build(null);
-        SourceNode originalSourceNode = processorTopology.source("topic-1");
+        final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build();
+
+        final SourceNode originalSourceNode = topology.source("topic-1");
 
-        for (SourceNode sourceNode: processorTopology.sources()) {
+        for (SourceNode sourceNode: topology.sources()) {
             if (sourceNode.name().equals(originalSourceNode.name())) {
                 assertEquals(sourceNode.getTimestampExtractor(), null);
             } else {
@@ -427,10 +426,9 @@ public class KStreamImplTest {
                         null);
     }
 
-    @SuppressWarnings("unchecked")
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnPrintIfPrintedIsNull() {
-        testStream.print((Printed) null);
+        testStream.print(null);
     }
 
     @Test(expected = NullPointerException.class)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
deleted file mode 100644
index 93b233b..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ /dev/null
@@ -1,752 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.TopologyTestDriverWrapper;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
-import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
-import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
-import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
-import org.apache.kafka.test.MockTimestampExtractor;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import static org.apache.kafka.common.utils.Utils.mkList;
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings("deprecation")
-public class TopologyBuilderTest {
-
-    @Test
-    public void shouldAddSourceWithOffsetReset() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        final String earliestTopic = "earliestTopic";
-        final String latestTopic = "latestTopic";
-
-        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", earliestTopic);
-        builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", latestTopic);
-
-        assertTrue(builder.earliestResetTopicsPattern().matcher(earliestTopic).matches());
-        assertTrue(builder.latestResetTopicsPattern().matcher(latestTopic).matches());
-
-    }
-
-    @Test
-    public void shouldAddSourcePatternWithOffsetReset() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        final String earliestTopicPattern = "earliest.*Topic";
-        final String latestTopicPattern = "latest.*Topic";
-
-        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", Pattern.compile(earliestTopicPattern));
-        builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", Pattern.compile(latestTopicPattern));
-
-        assertTrue(builder.earliestResetTopicsPattern().matcher("earliestTestTopic").matches());
-        assertTrue(builder.latestResetTopicsPattern().matcher("latestTestTopic").matches());
-    }
-
-    @Test
-    public void shouldAddSourceWithoutOffsetReset() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        final Serde<String> stringSerde = Serdes.String();
-        final Pattern expectedPattern = Pattern.compile("test-topic");
-
-        builder.addSource("source", stringSerde.deserializer(), stringSerde.deserializer(), "test-topic");
-
-        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
-        assertEquals(builder.earliestResetTopicsPattern().pattern(), "");
-        assertEquals(builder.latestResetTopicsPattern().pattern(), "");
-    }
-
-    @Test
-    public void shouldAddPatternSourceWithoutOffsetReset() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        final Serde<String> stringSerde = Serdes.String();
-        final Pattern expectedPattern = Pattern.compile("test-.*");
-
-        builder.addSource("source", stringSerde.deserializer(), stringSerde.deserializer(), Pattern.compile("test-.*"));
-
-        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
-        assertEquals(builder.earliestResetTopicsPattern().pattern(), "");
-        assertEquals(builder.latestResetTopicsPattern().pattern(), "");
-    }
-
-    @Test
-    public void shouldNotAllowOffsetResetSourceWithoutTopics() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        final Serde<String> stringSerde = Serdes.String();
-
-        try {
-            builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer());
-            fail("Should throw TopologyBuilderException with no topics");
-        } catch (TopologyBuilderException tpe) {
-            //no-op
-        }
-    }
-
-    @Test
-    public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        final Serde<String> stringSerde = Serdes.String();
-
-        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
-        try {
-            builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
-            fail("Should throw TopologyBuilderException for duplicate source name");
-        } catch (TopologyBuilderException tpe) {
-            //no-op
-        }
-    }
-
-
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testAddSourceWithSameName() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source", "topic-1");
-        builder.addSource("source", "topic-2");
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testAddSourceWithSameTopic() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source", "topic-1");
-        builder.addSource("source-2", "topic-1");
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testAddProcessorWithSameName() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source", "topic-1");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testAddProcessorWithWrongParent() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testAddProcessorWithSelfParent() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testAddSinkWithSameName() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source", "topic-1");
-        builder.addSink("sink", "topic-2", "source");
-        builder.addSink("sink", "topic-3", "source");
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testAddSinkWithWrongParent() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSink("sink", "topic-2", "source");
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testAddSinkWithSelfParent() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSink("sink", "topic-2", "sink");
-    }
-
-    @Test
-    public void testAddSinkConnectedWithParent() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source", "source-topic");
-        builder.addSink("sink", "dest-topic", "source");
-
-        Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
-        Set<String> nodeGroup = nodeGroups.get(0);
-
-        assertTrue(nodeGroup.contains("sink"));
-        assertTrue(nodeGroup.contains("source"));
-
-    }
-
-    @Test
-    public void testAddSinkConnectedWithMultipleParent() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source", "source-topic");
-        builder.addSource("sourceII", "source-topicII");
-        builder.addSink("sink", "dest-topic", "source", "sourceII");
-
-        Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
-        Set<String> nodeGroup = nodeGroups.get(0);
-
-        assertTrue(nodeGroup.contains("sink"));
-        assertTrue(nodeGroup.contains("source"));
-        assertTrue(nodeGroup.contains("sourceII"));
-
-    }
-
-    @Test
-    public void testSourceTopics() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.setApplicationId("X");
-        builder.addSource("source-1", "topic-1");
-        builder.addSource("source-2", "topic-2");
-        builder.addSource("source-3", "topic-3");
-        builder.addInternalTopic("topic-3");
-
-        Pattern expectedPattern = Pattern.compile("X-topic-3|topic-1|topic-2");
-
-        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
-    }
-
-    @Test
-    public void testPatternSourceTopic() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        Pattern expectedPattern = Pattern.compile("topic-\\d");
-        builder.addSource("source-1", expectedPattern);
-        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
-    }
-
-    @Test
-    public void testAddMoreThanOnePatternSourceNode() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        Pattern expectedPattern = Pattern.compile("topics[A-Z]|.*-\\d");
-        builder.addSource("source-1", Pattern.compile("topics[A-Z]"));
-        builder.addSource("source-2", Pattern.compile(".*-\\d"));
-        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
-    }
-
-    @Test
-    public void testSubscribeTopicNameAndPattern() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        Pattern expectedPattern = Pattern.compile("topic-bar|topic-foo|.*-\\d");
-        builder.addSource("source-1", "topic-foo", "topic-bar");
-        builder.addSource("source-2", Pattern.compile(".*-\\d"));
-        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testPatternMatchesAlreadyProvidedTopicSource() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source-1", "foo");
-        builder.addSource("source-2", Pattern.compile("f.*"));
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testNamedTopicMatchesAlreadyProvidedPattern() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source-1", Pattern.compile("f.*"));
-        builder.addSource("source-2", "foo");
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testAddStateStoreWithNonExistingProcessor() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testAddStateStoreWithSource() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source-1", "topic-1");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testAddStateStoreWithSink() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSink("sink-1", "topic-1");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
-    }
-
-    @Test(expected = TopologyBuilderException.class)
-    public void testAddStateStoreWithDuplicates() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addStateStore(new MockStateStoreSupplier("store", false));
-        builder.addStateStore(new MockStateStoreSupplier("store", false));
-    }
-
-    @Test
-    public void testAddStateStore() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
-        builder.addStateStore(supplier);
-        builder.setApplicationId("X");
-        builder.addSource("source-1", "topic-1");
-        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
-
-        assertEquals(0, builder.build(null).stateStores().size());
-
-        builder.connectProcessorAndStateStores("processor-1", "store-1");
-
-        List<StateStore> suppliers = builder.build(null).stateStores();
-        assertEquals(1, suppliers.size());
-        assertEquals(supplier.name(), suppliers.get(0).name());
-    }
-
-    @Test
-    public void testTopicGroups() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.setApplicationId("X");
-        builder.addInternalTopic("topic-1x");
-        builder.addSource("source-1", "topic-1", "topic-1x");
-        builder.addSource("source-2", "topic-2");
-        builder.addSource("source-3", "topic-3");
-        builder.addSource("source-4", "topic-4");
-        builder.addSource("source-5", "topic-5");
-
-        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
-
-        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
-        builder.copartitionSources(mkList("source-1", "source-2"));
-
-        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
-
-        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
-
-        Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
-        expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
-        expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
-
-        assertEquals(3, topicGroups.size());
-        assertEquals(expectedTopicGroups, topicGroups);
-
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
-
-        assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
-    }
-
-    @Test
-    public void testTopicGroupsByStateStore() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.setApplicationId("X");
-        builder.addSource("source-1", "topic-1", "topic-1x");
-        builder.addSource("source-2", "topic-2");
-        builder.addSource("source-3", "topic-3");
-        builder.addSource("source-4", "topic-4");
-        builder.addSource("source-5", "topic-5");
-
-        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
-        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
-        builder.addStateStore(new MockStateStoreSupplier("store-1", false), "processor-1", "processor-2");
-
-        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
-        builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
-        builder.addStateStore(new MockStateStoreSupplier("store-2", false), "processor-3", "processor-4");
-
-        builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5");
-        StateStoreSupplier supplier = new MockStateStoreSupplier("store-3", false);
-        builder.addStateStore(supplier);
-        builder.connectProcessorAndStateStores("processor-5", "store-3");
-
-        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
-
-        Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
-        final String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1");
-        final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
-        final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
-        expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
-                                                  Collections.<String, InternalTopicConfig>emptyMap(),
-                                                  Collections.singletonMap(store1, (InternalTopicConfig)  new UnwindowedChangelogTopicConfig(store1, Collections.<String, String>emptyMap()))));
-        expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"),
-                                                  Collections.<String, InternalTopicConfig>emptyMap(),
-                                                  Collections.singletonMap(store2, (InternalTopicConfig)  new UnwindowedChangelogTopicConfig(store2, Collections.<String, String>emptyMap()))));
-        expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"),
-                                                  Collections.<String, InternalTopicConfig>emptyMap(),
-                                                  Collections.singletonMap(store3, (InternalTopicConfig)  new UnwindowedChangelogTopicConfig(store3, Collections.<String, String>emptyMap()))));
-
-        assertEquals(3, topicGroups.size());
-        assertEquals(expectedTopicGroups, topicGroups);
-    }
-
-    @Test
-    public void testBuild() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source-1", "topic-1", "topic-1x");
-        builder.addSource("source-2", "topic-2");
-        builder.addSource("source-3", "topic-3");
-        builder.addSource("source-4", "topic-4");
-        builder.addSource("source-5", "topic-5");
-
-        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
-        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
-        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
-
-        builder.setApplicationId("X");
-        ProcessorTopology topology0 = builder.build(0);
-        ProcessorTopology topology1 = builder.build(1);
-        ProcessorTopology topology2 = builder.build(2);
-
-        assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors()));
-        assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors()));
-        assertEquals(mkSet("source-5"), nodeNames(topology2.processors()));
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullNameWhenAddingSink() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSink(null, "topic");
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTopicWhenAddingSink() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSink("name", null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullNameWhenAddingProcessor() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addProcessor(null, new ProcessorSupplier() {
-            @Override
-            public Processor get() {
-                return null;
-            }
-        });
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullProcessorSupplier() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addProcessor("name", null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullNameWhenAddingSource() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource(null, Pattern.compile(".*"));
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.connectProcessorAndStateStores(null, "store");
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAddNullInternalTopic() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addInternalTopic(null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldNotSetApplicationIdToNull() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.setApplicationId(null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAddNullStateStoreSupplier() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addStateStore(null);
-    }
-
-    private Set<String> nodeNames(Collection<ProcessorNode> nodes) {
-        Set<String> nodeNames = new HashSet<>();
-        for (ProcessorNode node : nodes) {
-            nodeNames.add(node.name());
-        }
-        return nodeNames;
-    }
-
-    @Test
-    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source", "topic");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
-        final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
-        assertEquals(1, stateStoreNameToSourceTopic.size());
-        assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
-    }
-
-    @Test
-    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source", "topic");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
-        final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
-        assertEquals(1, stateStoreNameToSourceTopic.size());
-        assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
-    }
-
-    @Test
-    public void shouldCorrectlyMapStateStoreToInternalTopics() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.setApplicationId("appId");
-        builder.addInternalTopic("internal-topic");
-        builder.addSource("source", "internal-topic");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
-        final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
-        assertEquals(1, stateStoreNameToSourceTopic.size());
-        assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
-    }
-
-    @Test
-    public void shouldAddInternalTopicConfigForNonWindowStores() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.setApplicationId("appId");
-        builder.addSource("source", "topic");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new MockStateStoreSupplier("store", true), "processor");
-        final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
-        final TopicsInfo topicsInfo = topicGroups.values().iterator().next();
-        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
-        final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
-        assertEquals(1, properties.size());
-        assertEquals("appId-store-changelog", topicConfig.name());
-    }
-
-    @Test
-    public void shouldAddInternalTopicConfigForRepartitionTopics() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.setApplicationId("appId");
-        builder.addInternalTopic("foo");
-        builder.addSource("source", "foo");
-        final TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
-        final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
-        final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
-        assertEquals(5, properties.size());
-        assertEquals(String.valueOf(Long.MAX_VALUE), properties.get(TopicConfig.RETENTION_MS_CONFIG));
-        assertEquals("appId-foo", topicConfig.name());
-    }
-
-    @Test
-    public void shouldThrowOnUnassignedStateStoreAccess() {
-        final String sourceNodeName = "source";
-        final String goodNodeName = "goodGuy";
-        final String badNodeName = "badGuy";
-
-        final Properties config = new Properties();
-        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource(sourceNodeName, "topic")
-                .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
-                .addStateStore(new MockStateStoreSupplier(LocalMockProcessorSupplier.STORE_NAME, false), goodNodeName)
-                .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-        try {
-            final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, config);
-            driver.pipeInput(new ConsumerRecord<>("topic", 0, 0L, new byte[] {}, new byte[] {}));
-            fail("Should have thrown StreamsException");
-        } catch (final StreamsException e) {
-            final String error = e.toString();
-            final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
-
-            assertThat(error, equalTo(expectedMessage));
-        }
-    }
-
-    private static class LocalMockProcessorSupplier implements ProcessorSupplier {
-        final static String STORE_NAME = "store";
-
-        @Override
-        public Processor get() {
-            return new Processor() {
-                @Override
-                public void init(ProcessorContext context) {
-                    context.getStateStore(STORE_NAME);
-                }
-
-                @Override
-                public void process(Object key, Object value) {
-                }
-
-                @Override
-                public void close() {
-                }
-            };
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source-1", "topic-foo");
-        builder.addSource("source-2", Pattern.compile("topic-[A-C]"));
-        builder.addSource("source-3", Pattern.compile("topic-\\d"));
-
-        StreamsPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamsPartitionAssignor.SubscriptionUpdates();
-        Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
-        updatedTopicsField.setAccessible(true);
-
-        Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
-
-        updatedTopics.add("topic-B");
-        updatedTopics.add("topic-3");
-        updatedTopics.add("topic-A");
-
-        builder.updateSubscriptions(subscriptionUpdates, null);
-        builder.setApplicationId("test-id");
-
-        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
-        assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo"));
-        assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A"));
-        assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B"));
-        assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3"));
-
-    }
-
-    @Test
-    public void shouldAddTimestampExtractorPerSource() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource(new MockTimestampExtractor(), "source", "topic");
-        final ProcessorTopology processorTopology = builder.build(null);
-        assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
-    }
-
-    @Test
-    public void shouldAddTimestampExtractorWithOffsetResetPerSource() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource(null, new MockTimestampExtractor(), "source", "topic");
-        final ProcessorTopology processorTopology = builder.build(null);
-        assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
-    }
-
-    @Test
-    public void shouldAddTimestampExtractorWithPatternPerSource() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        final Pattern pattern = Pattern.compile("t.*");
-        builder.addSource(new MockTimestampExtractor(), "source", pattern);
-        final ProcessorTopology processorTopology = builder.build(null);
-        assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
-    }
-
-    @Test
-    public void shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        final Pattern pattern = Pattern.compile("t.*");
-        builder.addSource(null, new MockTimestampExtractor(), "source", pattern);
-        final ProcessorTopology processorTopology = builder.build(null);
-        assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
-    }
-
-    @Test
-    public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource(null, "source", new MockTimestampExtractor(), null, null, "topic");
-        final ProcessorTopology processorTopology = builder.build(null);
-        assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
-    }
-
-    @Test
-    public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        final Pattern pattern = Pattern.compile("t.*");
-        builder.addSource(null, "source", new MockTimestampExtractor(), null, null, pattern);
-        final ProcessorTopology processorTopology = builder.build(null);
-        assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
-
-        final TopologyBuilder topologyBuilder = new TopologyBuilder()
-                .addSource("ingest", Pattern.compile("topic-\\d+"))
-                .addProcessor("my-processor", new MockProcessorSupplier(), "ingest")
-                .addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
-
-        final StreamsPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamsPartitionAssignor.SubscriptionUpdates();
-        final Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
-        updatedTopicsField.setAccessible(true);
-
-        final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
-
-        updatedTopics.add("topic-2");
-        updatedTopics.add("topic-3");
-        updatedTopics.add("topic-A");
-
-        topologyBuilder.updateSubscriptions(subscriptionUpdates, "test-thread");
-        topologyBuilder.setApplicationId("test-app");
-
-        Map<String, List<String>> stateStoreAndTopics = topologyBuilder.stateStoreNameToSourceTopics();
-        List<String> topics = stateStoreAndTopics.get("testStateStore");
-
-        assertTrue("Expected to contain two topics", topics.size() == 2);
-
-        assertTrue(topics.contains("topic-2"));
-        assertTrue(topics.contains("topic-3"));
-        assertFalse(topics.contains("topic-A"));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test(expected = TopologyBuilderException.class)
-    public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
-        final String sameNameForSourceAndProcessor = "sameName";
-        new TopologyBuilder()
-            .addGlobalStore(new MockStateStoreSupplier("anyName", false, false),
-                sameNameForSourceAndProcessor,
-                null,
-                null,
-                "anyTopicName",
-                sameNameForSourceAndProcessor,
-                new MockProcessorSupplier());
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
index bbc59fa..b8221d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -46,14 +46,14 @@ public class CopartitionedTopicsValidatorTest {
         partitions.put(new TopicPartition("second", 1), new PartitionInfo("second", 1, null, null, null));
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() {
         validator.validate(Collections.singleton("topic"),
                            Collections.<String, StreamsPartitionAssignor.InternalTopicMetadata>emptyMap(),
                            cluster);
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test(expected = TopologyException.class)
     public void shouldThrowTopologyBuilderExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() {
         partitions.remove(new TopicPartition("second", 0));
         validator.validate(Utils.mkSet("first", "second"),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index c73593e..d8e66b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -19,22 +19,18 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.TopologyTestDriverWrapper;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStoreBuilder;
 import org.apache.kafka.test.MockTimestampExtractor;
-import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
@@ -45,13 +41,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 
 import static org.junit.Assert.assertEquals;
@@ -64,7 +58,7 @@ public class InternalTopologyBuilderTest {
 
     private final Serde<String> stringSerde = Serdes.String();
     private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
-    private final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), Serdes.ByteArray(), Serdes.ByteArray());
+    private final StoreBuilder storeBuilder = new MockStoreBuilder("store", false);
 
     @Test
     public void shouldAddSourceWithOffsetReset() {
@@ -355,14 +349,14 @@ public class InternalTopologyBuilderTest {
 
         builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
         builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
-        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-1"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-1", "processor-2");
+        builder.addStateStore(new MockStoreBuilder("store-1", false), "processor-1", "processor-2");
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
         builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
-        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-2"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-3", "processor-4");
+        builder.addStateStore(new MockStoreBuilder("store-2", false), "processor-3", "processor-4");
 
         builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5");
-        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-3"), Serdes.ByteArray(), Serdes.ByteArray()));
+        builder.addStateStore(new MockStoreBuilder("store-3", false));
         builder.connectProcessorAndStateStores("processor-5", "store-3");
 
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
@@ -553,56 +547,6 @@ public class InternalTopologyBuilderTest {
         assertTrue(topicConfig instanceof RepartitionTopicConfig);
     }
 
-    @Test
-    public void shouldThrowOnUnassignedStateStoreAccess() {
-        final String sourceNodeName = "source";
-        final String goodNodeName = "goodGuy";
-        final String badNodeName = "badGuy";
-
-        final Properties config = new Properties();
-        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-
-        builder.addSource(null, sourceNodeName, null, null, null, "topic");
-        builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-        builder.addStateStore(
-            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(LocalMockProcessorSupplier.STORE_NAME), Serdes.String(), Serdes.String()),
-            goodNodeName);
-        builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-        
-        try {
-            new TopologyTestDriverWrapper(builder, config);
-            fail("Should have throw StreamsException");
-        } catch (final StreamsException e) {
-            final String error = e.toString();
-            final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
-            
-            assertThat(error, equalTo(expectedMessage));
-        }
-    }
-
-    private static class LocalMockProcessorSupplier implements ProcessorSupplier {
-        final static String STORE_NAME = "store";
-
-        @Override
-        public Processor get() {
-            return new Processor() {
-                @Override
-                public void init(final ProcessorContext context) {
-                    context.getStateStore(STORE_NAME);
-                }
-
-                @Override
-                public void process(final Object key, final Object value) { }
-
-                @Override
-                public void close() {
-                }
-            };
-        }
-    }
-
     @SuppressWarnings("unchecked")
     @Test
     public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 9f2b242..e247647 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -24,7 +24,8 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -67,7 +68,7 @@ public class ProcessorTopologyTest {
     private final MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER, 0L);
 
-    private TopologyTestDriverWrapper driver;
+    private TopologyTestDriver driver;
     private final Properties props = new Properties();
 
     @Before
@@ -122,7 +123,7 @@ public class ProcessorTopologyTest {
     @Test
     public void testDrivingSimpleTopology() {
         int partition = 10;
-        driver = new TopologyTestDriverWrapper(createSimpleTopology(partition), props);
+        driver = new TopologyTestDriver(createSimpleTopology(partition), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
@@ -143,7 +144,7 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingMultiplexingTopology() {
-        driver = new TopologyTestDriverWrapper(createMultiplexingTopology(), props);
+        driver = new TopologyTestDriver(createMultiplexingTopology(), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
@@ -165,7 +166,7 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingMultiplexByNameTopology() {
-        driver = new TopologyTestDriverWrapper(createMultiplexByNameTopology(), props);
+        driver = new TopologyTestDriver(createMultiplexByNameTopology(), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
@@ -188,7 +189,7 @@ public class ProcessorTopologyTest {
     @Test
     public void testDrivingStatefulTopology() {
         String storeName = "entries";
-        driver = new TopologyTestDriverWrapper(createStatefulTopology(storeName), props);
+        driver = new TopologyTestDriver(createStatefulTopology(storeName), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
@@ -211,7 +212,7 @@ public class ProcessorTopologyTest {
         topology.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()).withLoggingDisabled(),
                 global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor(storeName)));
 
-        driver = new TopologyTestDriverWrapper(topology.getInternalBuilder(), props);
+        driver = new TopologyTestDriver(topology, props);
         final KeyValueStore<String, String> globalStore = driver.getKeyValueStore(storeName);
         driver.pipeInput(recordFactory.create(topic, "key1", "value1"));
         driver.pipeInput(recordFactory.create(topic, "key2", "value2"));
@@ -222,7 +223,7 @@ public class ProcessorTopologyTest {
     @Test
     public void testDrivingSimpleMultiSourceTopology() {
         final int partition = 10;
-        driver = new TopologyTestDriverWrapper(createSimpleMultiSourceTopology(partition), props);
+        driver = new TopologyTestDriver(createSimpleMultiSourceTopology(partition), props);
 
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
@@ -235,7 +236,7 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingForwardToSourceTopology() {
-        driver = new TopologyTestDriverWrapper(createForwardToSourceTopology(), props);
+        driver = new TopologyTestDriver(createForwardToSourceTopology(), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
@@ -246,7 +247,7 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingInternalRepartitioningTopology() {
-        driver = new TopologyTestDriverWrapper(createInternalRepartitioningTopology(), props);
+        driver = new TopologyTestDriver(createInternalRepartitioningTopology(), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
@@ -257,7 +258,7 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingInternalRepartitioningForwardingTimestampTopology() {
-        driver = new TopologyTestDriverWrapper(createInternalRepartitioningWithValueTimestampTopology(), props);
+        driver = new TopologyTestDriver(createInternalRepartitioningWithValueTimestampTopology(), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1@1000"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2@2000"));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3@3000"));
@@ -316,7 +317,7 @@ public class ProcessorTopologyTest {
     @Test
     public void shouldConsiderTimeStamps() {
         final int partition = 10;
-        driver = new TopologyTestDriverWrapper(createSimpleTopology(partition), props);
+        driver = new TopologyTestDriver(createSimpleTopology(partition), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
@@ -328,7 +329,7 @@ public class ProcessorTopologyTest {
     @Test
     public void shouldConsiderModifiedTimeStamps() {
         final int partition = 10;
-        driver = new TopologyTestDriverWrapper(createTimestampTopology(partition), props);
+        driver = new TopologyTestDriver(createTimestampTopology(partition), props);
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
         driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
@@ -376,95 +377,87 @@ public class ProcessorTopologyTest {
         };
     }
 
-    private InternalTopologyBuilder createSimpleTopology(final int partition) {
-        return ((TopologyWrapper) topology
+    private Topology createSimpleTopology(final int partition) {
+        return topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new ForwardingProcessor()), "source")
-            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor"))
-            .getInternalBuilder();
+            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
     }
 
-    private InternalTopologyBuilder createTimestampTopology(final int partition) {
-        return ((TopologyWrapper) topology
+    private Topology createTimestampTopology(final int partition) {
+        return topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new TimestampProcessor()), "source")
-            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor"))
-            .getInternalBuilder();
+            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
     }
 
-    private InternalTopologyBuilder createMultiplexingTopology() {
-        return ((TopologyWrapper) topology
+    private Topology createMultiplexingTopology() {
+        return topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new MultiplexingProcessor(2)), "source")
             .addSink("sink1", OUTPUT_TOPIC_1, "processor")
-            .addSink("sink2", OUTPUT_TOPIC_2, "processor"))
-            .getInternalBuilder();
+            .addSink("sink2", OUTPUT_TOPIC_2, "processor");
     }
 
-    private InternalTopologyBuilder createMultiplexByNameTopology() {
-        return ((TopologyWrapper) topology
+    private Topology createMultiplexByNameTopology() {
+        return topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source")
             .addSink("sink0", OUTPUT_TOPIC_1, "processor")
-            .addSink("sink1", OUTPUT_TOPIC_2, "processor"))
-                .getInternalBuilder();
+            .addSink("sink1", OUTPUT_TOPIC_2, "processor");
     }
 
-    private InternalTopologyBuilder createStatefulTopology(final String storeName) {
-        return ((TopologyWrapper) topology
+    private Topology createStatefulTopology(final String storeName) {
+        return topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
             .addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()), "processor")
-            .addSink("counts", OUTPUT_TOPIC_1, "processor"))
-            .getInternalBuilder();
+            .addSink("counts", OUTPUT_TOPIC_1, "processor");
     }
 
-    private InternalTopologyBuilder createInternalRepartitioningTopology() {
-        final InternalTopologyBuilder internalTopologyBuilder = ((TopologyWrapper) topology
-            .addSource("source", INPUT_TOPIC_1)
+    private Topology createInternalRepartitioningTopology() {
+        topology.addSource("source", INPUT_TOPIC_1)
             .addSink("sink0", THROUGH_TOPIC_1, "source")
             .addSource("source1", THROUGH_TOPIC_1)
-            .addSink("sink1", OUTPUT_TOPIC_1, "source1"))
-            .getInternalBuilder();
+            .addSink("sink1", OUTPUT_TOPIC_1, "source1");
 
+        // use wrapper to get the internal topology builder to add internal topic
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
         internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1);
 
-        return internalTopologyBuilder;
+        return topology;
     }
 
-    private InternalTopologyBuilder createInternalRepartitioningWithValueTimestampTopology() {
-        final InternalTopologyBuilder internalTopologyBuilder = ((TopologyWrapper) topology
-            .addSource("source", INPUT_TOPIC_1)
-            .addProcessor("processor", define(new ValueTimestampProcessor()), "source")
-            .addSink("sink0", THROUGH_TOPIC_1, "processor")
-            .addSource("source1", THROUGH_TOPIC_1)
-            .addSink("sink1", OUTPUT_TOPIC_1, "source1"))
-            .getInternalBuilder();
+    private Topology createInternalRepartitioningWithValueTimestampTopology() {
+        topology.addSource("source", INPUT_TOPIC_1)
+                .addProcessor("processor", define(new ValueTimestampProcessor()), "source")
+                .addSink("sink0", THROUGH_TOPIC_1, "processor")
+                .addSource("source1", THROUGH_TOPIC_1)
+                .addSink("sink1", OUTPUT_TOPIC_1, "source1");
 
+        // use wrapper to get the internal topology builder to add internal topic
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
         internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1);
 
-        return internalTopologyBuilder;
+        return topology;
     }
 
-    private InternalTopologyBuilder createForwardToSourceTopology() {
-        return ((TopologyWrapper) topology.addSource("source-1", INPUT_TOPIC_1)
+    private Topology createForwardToSourceTopology() {
+        return topology.addSource("source-1", INPUT_TOPIC_1)
                 .addSink("sink-1", OUTPUT_TOPIC_1, "source-1")
                 .addSource("source-2", OUTPUT_TOPIC_1)
-                .addSink("sink-2", OUTPUT_TOPIC_2, "source-2"))
-                .getInternalBuilder();
+                .addSink("sink-2", OUTPUT_TOPIC_2, "source-2");
     }
 
-    private InternalTopologyBuilder createSimpleMultiSourceTopology(int partition) {
-        return ((TopologyWrapper) topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+    private Topology createSimpleMultiSourceTopology(int partition) {
+        return topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
                 .addProcessor("processor-1", define(new ForwardingProcessor()), "source-1")
                 .addSink("sink-1", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor-1")
                 .addSource("source-2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2)
                 .addProcessor("processor-2", define(new ForwardingProcessor()), "source-2")
-                .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2"))
-                .getInternalBuilder();
+                .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2");
     }
 
-
     /**
      * A processor that simply forwards all messages to all children.
      */
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 9090012..486b35e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -36,13 +36,12 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockRestoreConsumer;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockStateStore;
-import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.MockStoreBuilder;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -90,7 +89,7 @@ public class StandbyTaskTest {
 
     private final Set<TopicPartition> topicPartitions = Collections.emptySet();
     private final ProcessorTopology topology = ProcessorTopology.withLocalStores(
-            Utils.mkList(new MockStateStoreSupplier(storeName1, false).get(), new MockStateStoreSupplier(storeName2, true).get()),
+            Utils.mkList(new MockStoreBuilder(storeName1, false).build(), new MockStoreBuilder(storeName2, true).build()),
             new HashMap<String, String>() {
                 {
                     put(storeName1, storeChangelogTopicName1);
@@ -100,7 +99,7 @@ public class StandbyTaskTest {
     private final TopicPartition globalTopicPartition = new TopicPartition(globalStoreName, 0);
     private final Set<TopicPartition> ktablePartitions = Utils.mkSet(globalTopicPartition);
     private final ProcessorTopology ktableTopology = ProcessorTopology.withLocalStores(
-            Collections.<StateStore>singletonList(new MockStateStoreSupplier(globalTopicPartition.topic(), true, false).get()),
+            Collections.singletonList(new MockStoreBuilder(globalTopicPartition.topic(), true).withLoggingDisabled().build()),
             new HashMap<String, String>() {
                 {
                     put(globalStoreName, globalTopicPartition.topic());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 0048e73..9812158 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
@@ -41,10 +40,10 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStoreBuilder;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.Test;
@@ -329,10 +328,10 @@ public class StreamsPartitionAssignorTest {
     public void testAssignWithPartialTopology() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
-        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store1"), Serdes.ByteArray(), Serdes.ByteArray()), "processor1");
+        builder.addStateStore(new MockStoreBuilder("store1", false), "processor1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
-        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store2"), Serdes.ByteArray(), Serdes.ByteArray()), "processor2");
+        builder.addStateStore(new MockStoreBuilder("store2", false), "processor2");
         List<String> topics = Utils.mkList("topic1", "topic2");
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
@@ -470,11 +469,11 @@ public class StreamsPartitionAssignorTest {
         builder.addSource(null, "source2", null, null, null, "topic2");
 
         builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
-        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store1"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-1");
+        builder.addStateStore(new MockStoreBuilder("store1", false), "processor-1");
 
         builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
-        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store2"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-2");
-        builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store3"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-2");
+        builder.addStateStore(new MockStoreBuilder("store2", false), "processor-2");
+        builder.addStateStore(new MockStoreBuilder("store3", false), "processor-2");
 
         List<String> topics = Utils.mkList("topic1", "topic2");
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index a061ff2..317ffc9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -49,9 +49,8 @@ public class CompositeReadOnlyKeyValueStoreTest {
     private final String storeNameA = "my-storeA";
     private StateStoreProviderStub stubProviderTwo;
     private KeyValueStore<String, String> stubOneUnderlying;
+    private KeyValueStore<String, String> otherUnderlyingStore;
     private CompositeReadOnlyKeyValueStore<String, String> theStore;
-    private KeyValueStore<String, String>
-        otherUnderlyingStore;
 
     @Before
     public void before() {
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index bcb9856..033b68d 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -59,49 +58,6 @@ public class KStreamTestDriver extends ExternalResource {
     private ProcessorTopology globalTopology;
     private final LogContext logContext = new LogContext("testCache ");
 
-    @Deprecated
-    public void setUp(final KStreamBuilder builder) {
-        setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
-    }
-
-    @Deprecated
-    public void setUp(final KStreamBuilder builder, final File stateDir) {
-        setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
-    }
-
-    @Deprecated
-    public void setUp(final KStreamBuilder builder, final File stateDir, final long cacheSize) {
-        setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
-    }
-
-    @Deprecated
-    public void setUp(final KStreamBuilder builder,
-                      final File stateDir,
-                      final Serde<?> keySerde,
-                      final Serde<?> valSerde) {
-        setUp(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES);
-    }
-
-    @Deprecated
-    public void setUp(final KStreamBuilder builder,
-                      final File stateDir,
-                      final Serde<?> keySerde,
-                      final Serde<?> valSerde,
-                      final long cacheSize) {
-        builder.setApplicationId("TestDriver");
-        topology = builder.build(null);
-        globalTopology = builder.buildGlobalStateTopology();
-        final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics()));
-        context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
-        // init global topology first as it will add stores to the
-        // store map that are required for joins etc.
-        if (globalTopology != null) {
-            initTopology(globalTopology, globalTopology.globalStateStores());
-        }
-        initTopology(topology, topology.stateStores());
-    }
-
     public void setUp(final StreamsBuilder builder) {
         setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
deleted file mode 100644
index 042bb63..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Collections;
-import java.util.Map;
-
-@Deprecated
-public class MockStateStoreSupplier implements StateStoreSupplier {
-    private String name;
-    private boolean persistent;
-    private boolean loggingEnabled;
-
-    public MockStateStoreSupplier(final String name,
-                                  final boolean persistent) {
-        this(name, persistent, true);
-    }
-
-    public MockStateStoreSupplier(final String name,
-                                  final boolean persistent,
-                                  final boolean loggingEnabled) {
-        this.name = name;
-        this.persistent = persistent;
-        this.loggingEnabled = loggingEnabled;
-    }
-
-    @Override
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public StateStore get() {
-        return new MockStateStore(name, persistent);
-    }
-
-    @Override
-    public Map<String, String> logConfig() {
-        return Collections.emptyMap();
-    }
-
-    @Override
-    public boolean loggingEnabled() {
-        return loggingEnabled;
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStoreBuilder.java
similarity index 57%
rename from streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
rename to streams/src/test/java/org/apache/kafka/test/MockStoreBuilder.java
index 3495352..41aa239 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStoreBuilder.java
@@ -14,18 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.test;
 
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.internals.AbstractStoreBuilder;
 
-/**
- * A windowed state store supplier that extends the {@link org.apache.kafka.streams.processor.StateStoreSupplier} interface.
- *
- * @param <T> State store type
- */
-@Deprecated
-public interface WindowStoreSupplier<T extends StateStore> extends org.apache.kafka.streams.processor.StateStoreSupplier<T> {
+public class MockStoreBuilder extends AbstractStoreBuilder<Integer, byte[], StateStore> {
+
+    private final boolean persistent;
 
-    // window retention period in milli-second
-    long retentionPeriod();
+    public MockStoreBuilder(final String storeName, final boolean persistent) {
+        super(storeName, Serdes.Integer(), Serdes.ByteArray(), new MockTime());
+
+        this.persistent = persistent;
+    }
+
+    @Override
+    public StateStore build() {
+        return new MockStateStore(name, persistent);
+    }
 }
+

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.