You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/31 22:29:10 UTC

[7/8] kafka git commit: KAFKA-5671: Add StreamsBuilder and Deprecate KStreamBuilder

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
new file mode 100644
index 0000000..3ce1521
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -0,0 +1,1211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KGroupedTable;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+
+import java.util.regex.Pattern;
+
+/**
+ * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.
+ *
+ * @see Topology
+ * @see KStream
+ * @see KTable
+ * @see GlobalKTable
+ */
+public class StreamsBuilder {
+
+    /** The actual topology that is constructed by this StreamsBuilder. */
+    private final Topology topology = new Topology();
+    /** The topology's internal builder. */
+    private final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder;
+
+    private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
+
+    /**
+     * Create a {@link KStream} from the specified topics.
+     * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
+     * deserializers as specified in the {@link StreamsConfig config} are used.
+     * <p>
+     * If multiple topics are specified there is no ordering guarantee for records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param topics the topic names; must contain at least one topic name
+     * @return a {@link KStream} for the specified topics
+     */
+    public synchronized <K, V> KStream<K, V> stream(final String... topics) {
+        return internalStreamsBuilder.stream(null, null, null, null, topics);
+    }
+
+    /**
+     * Create a {@link KStream} from the specified topics.
+     * The default {@link TimestampExtractor} and default key and value deserializers as specified in the
+     * {@link StreamsConfig config} are used.
+     * <p>
+     * If multiple topics are specified there is no ordering guarantee for records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed
+     *                    offsets are available
+     * @param topics      the topic names; must contain at least one topic name
+     * @return a {@link KStream} for the specified topics
+     */
+    public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+                                                    final String... topics) {
+        return internalStreamsBuilder.stream(offsetReset, null, null, null, topics);
+    }
+
+    /**
+     * Create a {@link KStream} from the specified topic pattern.
+     * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
+     * deserializers as specified in the {@link StreamsConfig config} are used.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+     * them and there is no ordering guarantee between records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param topicPattern the pattern to match for topic names
+     * @return a {@link KStream} for topics matching the regex pattern.
+     */
+    public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern) {
+        return internalStreamsBuilder.stream(null, null,  null, null, topicPattern);
+    }
+
+    /**
+     * Create a {@link KStream} from the specified topic pattern.
+     * The default {@link TimestampExtractor} and default key and value deserializers as specified in the
+     * {@link StreamsConfig config} are used.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+     * them and there is no ordering guarantee between records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param offsetReset  the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed
+     *                     offsets are available
+     * @param topicPattern the pattern to match for topic names
+     * @return a {@link KStream} for topics matching the regex pattern.
+     */
+    public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+                                                    final Pattern topicPattern) {
+        return internalStreamsBuilder.stream(offsetReset, null, null, null, topicPattern);
+    }
+
+    /**
+     * Create a {@link KStream} from the specified topics.
+     * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor} as specified in the
+     * {@link StreamsConfig config} are used.
+     * <p>
+     * If multiple topics are specified there is no ordering guarantee for records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param keySerde   key serde used to read this source {@link KStream},
+     *                   if not specified the default serde defined in the configs will be used
+     * @param valueSerde value serde used to read this source {@link KStream},
+     *                   if not specified the default serde defined in the configs will be used
+     * @param topics     the topic names; must contain at least one topic name
+     * @return a {@link KStream} for the specified topics
+     */
+    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde,
+                                                    final Serde<V> valueSerde,
+                                                    final String... topics) {
+        return internalStreamsBuilder.stream(null, null, keySerde, valueSerde, topics);
+    }
+
+    /**
+     * Create a {@link KStream} from the specified topics.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * <p>
+     * If multiple topics are specified there is no ordering guarantee for records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed
+     *                    offsets are available
+     * @param keySerde    key serde used to read this source {@link KStream},
+     *                    if not specified the default serde defined in the configs will be used
+     * @param valueSerde  value serde used to read this source {@link KStream},
+     *                    if not specified the default serde defined in the configs will be used
+     * @param topics      the topic names; must contain at least one topic name
+     * @return a {@link KStream} for the specified topics
+     */
+    public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+                                                    final Serde<K> keySerde,
+                                                    final Serde<V> valueSerde,
+                                                    final String... topics) {
+        return internalStreamsBuilder.stream(offsetReset, null, keySerde, valueSerde, topics);
+    }
+
+    /**
+     * Create a {@link KStream} from the specified topics.
+     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * <p>
+     * If multiple topics are specified there is no ordering guarantee for records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to read this source {@link KStream}, if not specified the default
+     *                           serde defined in the configs will be used
+     * @param valueSerde         value serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param topics             the topic names; must contain at least one topic name
+     * @return a {@link KStream} for the specified topics
+     */
+    public synchronized <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
+                                                    final Serde<K> keySerde,
+                                                    final Serde<V> valueSerde,
+                                                    final String... topics) {
+        return internalStreamsBuilder.stream(null, timestampExtractor, keySerde, valueSerde, topics);
+    }
+
+    /**
+     * Create a {@link KStream} from the specified topics.
+     * <p>
+     * If multiple topics are specified there is no ordering guarantee for records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topics
+     *                           if no valid committed offsets are available
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param valueSerde         value serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param topics             the topic names; must contain at least one topic name
+     * @return a {@link KStream} for the specified topics
+     */
+    public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+                                                    final TimestampExtractor timestampExtractor,
+                                                    final Serde<K> keySerde,
+                                                    final Serde<V> valueSerde,
+                                                    final String... topics) {
+        return internalStreamsBuilder.stream(offsetReset, timestampExtractor, keySerde, valueSerde, topics);
+    }
+
+    /**
+     * Create a {@link KStream} from the specified topic pattern.
+     * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
+     * as specified in the {@link StreamsConfig config} are used.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+     * them and there is no ordering guarantee between records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param keySerde     key serde used to read this source {@link KStream},
+     *                     if not specified the default serde defined in the configs will be used
+     * @param valueSerde   value serde used to read this source {@link KStream},
+     *                     if not specified the default serde defined in the configs will be used
+     * @param topicPattern the pattern to match for topic names
+     * @return a {@link KStream} for topics matching the regex pattern.
+     */
+    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde,
+                                                    final Serde<V> valueSerde,
+                                                    final Pattern topicPattern) {
+        return internalStreamsBuilder.stream(null, null, keySerde, valueSerde, topicPattern);
+    }
+
+    /**
+     * Create a {@link KStream} from the specified topic pattern.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+     * them and there is no ordering guarantee between records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param offsetReset  the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed
+     *                     offsets are available
+     * @param keySerde     key serde used to read this source {@link KStream},
+     *                     if not specified the default serde defined in the configs will be used
+     * @param valueSerde   value serde used to read this source {@link KStream},
+     *                     if not specified the default serde defined in the configs will be used
+     * @param topicPattern the pattern to match for topic names
+     * @return a {@link KStream} for topics matching the regex pattern.
+     */
+    public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+                                                    final Serde<K> keySerde,
+                                                    final Serde<V> valueSerde,
+                                                    final Pattern topicPattern) {
+        return internalStreamsBuilder.stream(offsetReset, null, keySerde, valueSerde, topicPattern);
+    }
+
+    /**
+     * Create a {@link KStream} from the specified topic pattern.
+     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+     * them and there is no ordering guarantee between records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param valueSerde         value serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param topicPattern       the pattern to match for topic names
+     * @return a {@link KStream} for topics matching the regex pattern.
+     */
+    public synchronized <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
+                                                    final Serde<K> keySerde,
+                                                    final Serde<V> valueSerde,
+                                                    final Pattern topicPattern) {
+        return internalStreamsBuilder.stream(null, timestampExtractor, keySerde, valueSerde, topicPattern);
+    }
+
+    /**
+     * Create a {@link KStream} from the specified topic pattern.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+     * them and there is no ordering guarantee between records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the matched topics if no valid
+     *                           committed  offsets are available
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param valueSerde         value serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param topicPattern       the pattern to match for topic names
+     * @return a {@link KStream} for topics matching the regex pattern.
+     */
+    public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+                                                    final TimestampExtractor timestampExtractor,
+                                                    final Serde<K> keySerde,
+                                                    final Serde<V> valueSerde,
+                                                    final Pattern topicPattern) {
+        return internalStreamsBuilder.stream(offsetReset, timestampExtractor, keySerde, valueSerde, topicPattern);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and
+     * default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topic must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; if {@code null} this is the equivalent of {@link #table(String)}
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final String topic,
+                                                  final String queryableStoreName) {
+        return internalStreamsBuilder.table(null, null,  null, null, topic, queryableStoreName);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
+     * {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param topic         the topic name; cannot be {@code null}
+     * @param storeSupplier user defined state store supplier; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final String topic,
+                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        return internalStreamsBuilder.table(null, null, null, null, topic, storeSupplier);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
+     * {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     *
+     * @param topic the topic name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final String topic) {
+        return internalStreamsBuilder.table(null, null, null, null, topic, (String) null);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                           offsets are available
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; if {@code null} this is the equivalent of
+     *                           {@link #table(Topology.AutoOffsetReset, String)}
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+                                                  final String topic,
+                                                  final String queryableStoreName) {
+        return internalStreamsBuilder.table(offsetReset, null, null, null, topic, queryableStoreName);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@link TimestampExtractor} and default key and value deserializers
+     * as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topic must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param offsetReset   the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                      offsets are available
+     * @param topic         the topic name; cannot be {@code null}
+     * @param storeSupplier user defined state store supplier; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+                                                  final String topic,
+                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        return internalStreamsBuilder.table(offsetReset, null, null, null, topic, storeSupplier);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                    offsets are available
+     * @param topic       the topic name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+                                                  final String topic) {
+        return internalStreamsBuilder.table(offsetReset, null, null, null, topic, (String) null);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers
+     * as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topic must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
+                                                  final String topic,
+                                                  final String queryableStoreName) {
+        return internalStreamsBuilder.table(null, timestampExtractor, null, null, topic, queryableStoreName);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topic must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                           offsets are available
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+                                                  final TimestampExtractor timestampExtractor,
+                                                  final String topic,
+                                                  final String queryableStoreName) {
+        return internalStreamsBuilder.table(offsetReset, timestampExtractor, null, null, topic, queryableStoreName);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
+     * as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topic must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde defined in the configuration will be used
+     * @param valueSerde         value serde used to send key-value pairs,
+     *                           if not specified the default value serde defined in the configuration will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
+                                                  final Serde<V> valueSerde,
+                                                  final String topic,
+                                                  final String queryableStoreName) {
+        return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, queryableStoreName);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topic must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param keySerde      key serde used to send key-value pairs,
+     *                      if not specified the default key serde defined in the configuration will be used
+     * @param valueSerde    value serde used to send key-value pairs,
+     *                      if not specified the default value serde defined in the configuration will be used
+     * @param topic         the topic name; cannot be {@code null}
+     * @param storeSupplier user defined state store supplier; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
+                                                  final Serde<V> valueSerde,
+                                                  final String topic,
+                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, storeSupplier);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * @param keySerde   key serde used to send key-value pairs,
+     *                   if not specified the default key serde defined in the configuration will be used
+     * @param valueSerde value serde used to send key-value pairs,
+     *                   if not specified the default value serde defined in the configuration will be used
+     * @param topic      the topic name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
+                                                  final Serde<V> valueSerde,
+                                                  final String topic) {
+        return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, (String) null);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
+     *                           committed offsets are available
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde defined in the configuration will be used
+     * @param valueSerde         value serde used to send key-value pairs,
+     *                           if not specified the default value serde defined in the configuration will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+                                                  final Serde<K> keySerde,
+                                                  final Serde<V> valueSerde,
+                                                  final String topic,
+                                                  final String queryableStoreName) {
+        return internalStreamsBuilder.table(offsetReset, null, keySerde, valueSerde, topic, queryableStoreName);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topic must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde defined in the configuration will be used
+     * @param valueSerde         value serde used to send key-value pairs,
+     *                           if not specified the default value serde defined in the configuration will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
+                                                  final Serde<K> keySerde,
+                                                  final Serde<V> valueSerde,
+                                                  final String topic,
+                                                  final String queryableStoreName) {
+        return internalStreamsBuilder.table(null, timestampExtractor, keySerde, valueSerde, topic, queryableStoreName);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topic must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
+     *                           committed offsets are available
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde defined in the configuration will be used
+     * @param valueSerde         value serde used to send key-value pairs,
+     *                           if not specified the default value serde defined in the configuration will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+                                                  final TimestampExtractor timestampExtractor,
+                                                  final Serde<K> keySerde,
+                                                  final Serde<V> valueSerde,
+                                                  final String topic,
+                                                  final String queryableStoreName) {
+        return internalStreamsBuilder.table(offsetReset, timestampExtractor, keySerde, valueSerde, topic, queryableStoreName);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                    offsets are available
+     * @param keySerde    key serde used to send key-value pairs,
+     *                    if not specified the default key serde defined in the configuration will be used
+     * @param valueSerde  value serde used to send key-value pairs,
+     *                    if not specified the default value serde defined in the configuration will be used
+     * @param topic       the topic name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+                                                  final Serde<K> keySerde,
+                                                  final Serde<V> valueSerde,
+                                                  final String topic) {
+        return internalStreamsBuilder.table(offsetReset, null, keySerde, valueSerde, topic, (String) null);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                           offsets are available
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde defined in the configuration will be used
+     * @param valueSerde         value serde used to send key-value pairs,
+     *                           if not specified the default value serde defined in the configuration will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param storeSupplier      user defined state store supplier; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+                                                  final TimestampExtractor timestampExtractor,
+                                                  final Serde<K> keySerde,
+                                                  final Serde<V> valueSerde,
+                                                  final String topic,
+                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        return internalStreamsBuilder.table(offsetReset, timestampExtractor, keySerde, valueSerde, topic, storeSupplier);
+    }
+
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key);
+     * }</pre>
+     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
+                                                              final String queryableStoreName) {
+        return internalStreamsBuilder.globalTable(null, null, null,  topic, queryableStoreName);
+    }
+
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param topic the topic name; cannot be {@code null}
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic) {
+        return internalStreamsBuilder.globalTable(null, null, null, topic, null);
+    }
+
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * The default {@link TimestampExtractor} and default key and value deserializers as specified in
+     * the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key);
+     * }</pre>
+     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde defined in the configuration will be used
+     * @param valueSerde         value serde used to send key-value pairs,
+     *                           if not specified the default value serde defined in the configuration will be used
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+                                                              final Serde<V> valueSerde,
+                                                              final TimestampExtractor timestampExtractor,
+                                                              final String topic,
+                                                              final String queryableStoreName) {
+        return internalStreamsBuilder.globalTable(keySerde, valueSerde, timestampExtractor, topic, queryableStoreName);
+    }
+
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key);
+     * }</pre>
+     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param keySerde      key serde used to send key-value pairs,
+     *                      if not specified the default key serde defined in the configuration will be used
+     * @param valueSerde    value serde used to send key-value pairs,
+     *                      if not specified the default value serde defined in the configuration will be used
+     * @param topic         the topic name; cannot be {@code null}
+     * @param storeSupplier user defined state store supplier; Cannot be {@code null}
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+                                                              final Serde<V> valueSerde,
+                                                              final String topic,
+                                                              final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        return internalStreamsBuilder.globalTable(keySerde, valueSerde, topic, storeSupplier);
+    }
+
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key);
+     * }</pre>
+     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde defined in the configuration will be used
+     * @param valueSerde         value serde used to send key-value pairs,
+     *                           if not specified the default value serde defined in the configuration will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+                                                              final Serde<V> valueSerde,
+                                                              final String topic,
+                                                              final String queryableStoreName) {
+        return internalStreamsBuilder.globalTable(keySerde, valueSerde, null, topic, queryableStoreName);
+    }
+
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param keySerde   key serde used to send key-value pairs,
+     *                   if not specified the default key serde defined in the configuration will be used
+     * @param valueSerde value serde used to send key-value pairs,
+     *                   if not specified the default value serde defined in the configuration will be used
+     * @param topic      the topic name; cannot be {@code null}
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+                                                              final Serde<V> valueSerde,
+                                                              final String topic) {
+        return internalStreamsBuilder.globalTable(keySerde, valueSerde, null, topic, null);
+    }
+
+    /**
+     * Adds a state store to the underlying {@link Topology}.
+     *
+     * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+     * @param processorNames the names of the processors that should be able to access the provided store
+     * @return itself
+     * @throws TopologyException if state store supplier is already added
+     */
+    public synchronized StreamsBuilder addStateStore(final StateStoreSupplier supplier,
+                                                     final String... processorNames) {
+        internalStreamsBuilder.addStateStore(supplier, processorNames);
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param storeSupplier         user defined state store supplier
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized StreamsBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                                      final String sourceName,
+                                                      final Deserializer keyDeserializer,
+                                                      final Deserializer valueDeserializer,
+                                                      final String topic,
+                                                      final String processorName,
+                                                      final ProcessorSupplier stateUpdateSupplier) {
+        internalStreamsBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
+            valueDeserializer, topic, processorName, stateUpdateSupplier);
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeSupplier         user defined state store supplier
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized StreamsBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                                      final String sourceName,
+                                                      final TimestampExtractor timestampExtractor,
+                                                      final Deserializer keyDeserializer,
+                                                      final Deserializer valueDeserializer,
+                                                      final String topic,
+                                                      final String processorName,
+                                                      final ProcessorSupplier stateUpdateSupplier) {
+        internalStreamsBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
+            valueDeserializer, topic, processorName, stateUpdateSupplier);
+        return this;
+    }
+
+    /**
+     * Create a new instance of {@link KStream} by merging the given {@link KStream}s.
+     * <p>
+     * There is no ordering guarantee for records from different {@link KStream}s.
+     *
+     * @param streams the {@link KStream}s to be merged
+     * @return a {@link KStream} containing all records of the given streams
+     */
+    public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
+        return internalStreamsBuilder.merge(streams);
+    }
+
+    /**
+     * Returns the {@link Topology} that represents the specified processing logic.
+     *
+     * @return the {@link Topology} that represents the specified processing logic
+     */
+    public synchronized Topology build() {
+        return topology;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index c2e1077..000076e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -78,7 +78,7 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
  * Furthermore, it is not allowed to enable {@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG "enable.auto.commit"} that
  * is disabled by Kafka Streams by default.
  *
- * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.processor.TopologyBuilder, StreamsConfig)
+ * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, StreamsConfig)
  * @see ConsumerConfig
  * @see ProducerConfig
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index ca0ac75..e8f7d23 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -538,6 +538,7 @@ public class Topology {
      * Adds a state store.
      *
      * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+     * @param processorNames the names of the processors that should be able to access the provided store
      * @return itself
      * @throws TopologyException if state store supplier is already added
      */
@@ -633,6 +634,11 @@ public class Topology {
         return this;
     }
 
+    /**
+     * Returns a description of the specified {@code Topology}.
+     *
+     * @return a description of the topology.
+     */
     public synchronized TopologyDescription describe() {
         return internalTopologyBuilder.describe();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index 7266d7d..81aa405 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 /**
@@ -35,7 +36,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  * All joins with the {@code GlobalKTable} require that a {@link KeyValueMapper} is provided that can map from the
  * {@link KeyValue} of the left hand side {@link KStream} to the key of the right hand side {@code GlobalKTable}.
  * <p>
- * A {@code GlobalKTable} is created via a {@link KStreamBuilder}. For example:
+ * A {@code GlobalKTable} is created via a {@link StreamsBuilder}. For example:
  * <pre>{@code
  * builder.globalTable("topic-name", "queryable-store-name");
  * }</pre>
@@ -60,7 +61,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  * @param <K> Type of primary keys
  * @param <V> Type of value changes
  * @see KTable
- * @see KStreamBuilder#globalTable(String, String)
+ * @see StreamsBuilder#globalTable(String, String)
  * @see KStream#join(GlobalKTable, KeyValueMapper, ValueJoiner)
  * @see KStream#leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 535e1e9..8b3ceec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
@@ -36,7 +37,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
  * For example a user X might buy two items I1 and I2, and thus there might be two records {@code <K:I1>, <K:I2>}
  * in the stream.
  * <p>
- * A {@code KStream} is either {@link KStreamBuilder#stream(String...) defined from one or multiple Kafka topics} that
+ * A {@code KStream} is either {@link StreamsBuilder#stream(String...) defined from one or multiple Kafka topics} that
  * are consumed message by message or the result of a {@code KStream} transformation.
  * A {@link KTable} can also be {@link KTable#toStream() converted} into a {@code KStream}.
  * <p>
@@ -51,7 +52,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
  * @param <V> Type of values
  * @see KTable
  * @see KGroupedStream
- * @see KStreamBuilder#stream(String...)
+ * @see org.apache.kafka.streams.StreamsBuilder#stream(String...)
  */
 @InterfaceStability.Evolving
 public interface KStream<K, V> {
@@ -652,8 +653,9 @@ public interface KStream<K, V> {
      * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
      * started).
      * <p>
-     * This is equivalent to calling {@link #to(String) #to(someTopicName)} and {@link KStreamBuilder#stream(String...)
-     * KStreamBuilder#stream(someTopicName)}.
+     * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
+     * {@link org.apache.kafka.streams.StreamsBuilder#stream(String...)
+     * StreamsBuilder#stream(someTopicName)}.
      *
      * @param topic the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
@@ -667,7 +669,7 @@ public interface KStream<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(StreamPartitioner, someTopicName)} and
-     * {@link KStreamBuilder#stream(String...) KStreamBuilder#stream(someTopicName)}.
+     * {@link StreamsBuilder#stream(String...) StreamsBuilder#stream(someTopicName)}.
      *
      * @param partitioner the function used to determine how records are distributed among partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be used
@@ -686,7 +688,7 @@ public interface KStream<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valSerde, someTopicName)} and
-     * {@link KStreamBuilder#stream(Serde, Serde, String...) KStreamBuilder#stream(keySerde, valSerde, someTopicName)}.
+     * {@link StreamsBuilder#stream(Serde, Serde, String...) StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
      *
      * @param keySerde key serde used to send key-value pairs,
      *                 if not specified the default key serde defined in the configuration will be used
@@ -706,8 +708,8 @@ public interface KStream<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) #to(keySerde, valSerde,
-     * StreamPartitioner, someTopicName)} and {@link KStreamBuilder#stream(Serde, Serde, String...)
-     * KStreamBuilder#stream(keySerde, valSerde, someTopicName)}.
+     * StreamPartitioner, someTopicName)} and {@link StreamsBuilder#stream(Serde, Serde, String...)
+     * StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in the configuration will be used