You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/31 22:29:10 UTC
[7/8] kafka git commit: KAFKA-5671: Add StreamsBuilder and Deprecate
KStreamBuilder
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
new file mode 100644
index 0000000..3ce1521
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -0,0 +1,1211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KGroupedTable;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+
+import java.util.regex.Pattern;
+
+/**
+ * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.
+ *
+ * @see Topology
+ * @see KStream
+ * @see KTable
+ * @see GlobalKTable
+ */
+public class StreamsBuilder {
+
+ /** The actual topology that is constructed by this StreamsBuilder. */
+ private final Topology topology = new Topology();
+ /** The topology's internal builder. */
+ private final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder;
+
+ private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
+
+ /**
+ * Create a {@link KStream} from the specified topics.
+ * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
+ * deserializers as specified in the {@link StreamsConfig config} are used.
+ * <p>
+ * If multiple topics are specified there is no ordering guarantee for records from different topics.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+ * (like aggregation or join) is applied to the returned {@link KStream}.
+ *
+ * @param topics the topic names; must contain at least one topic name
+ * @return a {@link KStream} for the specified topics
+ */
+ public synchronized <K, V> KStream<K, V> stream(final String... topics) {
+ return internalStreamsBuilder.stream(null, null, null, null, topics);
+ }
+
+ /**
+ * Create a {@link KStream} from the specified topics.
+ * The default {@link TimestampExtractor} and default key and value deserializers as specified in the
+ * {@link StreamsConfig config} are used.
+ * <p>
+ * If multiple topics are specified there is no ordering guarantee for records from different topics.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+ * (like aggregation or join) is applied to the returned {@link KStream}.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed
+ * offsets are available
+ * @param topics the topic names; must contain at least one topic name
+ * @return a {@link KStream} for the specified topics
+ */
+ public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+ final String... topics) {
+ return internalStreamsBuilder.stream(offsetReset, null, null, null, topics);
+ }
+
+ /**
+ * Create a {@link KStream} from the specified topic pattern.
+ * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
+ * deserializers as specified in the {@link StreamsConfig config} are used.
+ * <p>
+ * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+ * them and there is no ordering guarantee between records from different topics.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+ * (like aggregation or join) is applied to the returned {@link KStream}.
+ *
+ * @param topicPattern the pattern to match for topic names
+ * @return a {@link KStream} for topics matching the regex pattern.
+ */
+ public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern) {
+ return internalStreamsBuilder.stream(null, null, null, null, topicPattern);
+ }
+
+ /**
+ * Create a {@link KStream} from the specified topic pattern.
+ * The default {@link TimestampExtractor} and default key and value deserializers as specified in the
+ * {@link StreamsConfig config} are used.
+ * <p>
+ * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+ * them and there is no ordering guarantee between records from different topics.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+ * (like aggregation or join) is applied to the returned {@link KStream}.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed
+ * offsets are available
+ * @param topicPattern the pattern to match for topic names
+ * @return a {@link KStream} for topics matching the regex pattern.
+ */
+ public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+ final Pattern topicPattern) {
+ return internalStreamsBuilder.stream(offsetReset, null, null, null, topicPattern);
+ }
+
+ /**
+ * Create a {@link KStream} from the specified topics.
+ * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor} as specified in the
+ * {@link StreamsConfig config} are used.
+ * <p>
+ * If multiple topics are specified there is no ordering guarantee for records from different topics.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+ * (like aggregation or join) is applied to the returned {@link KStream}.
+ *
+ * @param keySerde key serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param valueSerde value serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param topics the topic names; must contain at least one topic name
+ * @return a {@link KStream} for the specified topics
+ */
+ public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String... topics) {
+ return internalStreamsBuilder.stream(null, null, keySerde, valueSerde, topics);
+ }
+
+ /**
+ * Create a {@link KStream} from the specified topics.
+ * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ * <p>
+ * If multiple topics are specified there is no ordering guarantee for records from different topics.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+ * (like aggregation or join) is applied to the returned {@link KStream}.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed
+ * offsets are available
+ * @param keySerde key serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param valueSerde value serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param topics the topic names; must contain at least one topic name
+ * @return a {@link KStream} for the specified topics
+ */
+ public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String... topics) {
+ return internalStreamsBuilder.stream(offsetReset, null, keySerde, valueSerde, topics);
+ }
+
+ /**
+ * Create a {@link KStream} from the specified topics.
+ * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+ * <p>
+ * If multiple topics are specified there is no ordering guarantee for records from different topics.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+ * (like aggregation or join) is applied to the returned {@link KStream}.
+ *
+ * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
+ * if not specified the default extractor defined in the configs will be used
+ * @param keySerde key serde used to read this source {@link KStream}, if not specified the default
+ * serde defined in the configs will be used
+ * @param valueSerde value serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param topics the topic names; must contain at least one topic name
+ * @return a {@link KStream} for the specified topics
+ */
+ public synchronized <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String... topics) {
+ return internalStreamsBuilder.stream(null, timestampExtractor, keySerde, valueSerde, topics);
+ }
+
+ /**
+ * Create a {@link KStream} from the specified topics.
+ * <p>
+ * If multiple topics are specified there is no ordering guarantee for records from different topics.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+ * (like aggregation or join) is applied to the returned {@link KStream}.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics
+ * if no valid committed offsets are available
+ * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
+ * if not specified the default extractor defined in the configs will be used
+ * @param keySerde key serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param valueSerde value serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param topics the topic names; must contain at least one topic name
+ * @return a {@link KStream} for the specified topics
+ */
+ public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+ final TimestampExtractor timestampExtractor,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String... topics) {
+ return internalStreamsBuilder.stream(offsetReset, timestampExtractor, keySerde, valueSerde, topics);
+ }
+
+ /**
+ * Create a {@link KStream} from the specified topic pattern.
+ * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
+ * as specified in the {@link StreamsConfig config} are used.
+ * <p>
+ * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+ * them and there is no ordering guarantee between records from different topics.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+ * (like aggregation or join) is applied to the returned {@link KStream}.
+ *
+ * @param keySerde key serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param valueSerde value serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param topicPattern the pattern to match for topic names
+ * @return a {@link KStream} for topics matching the regex pattern.
+ */
+ public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final Pattern topicPattern) {
+ return internalStreamsBuilder.stream(null, null, keySerde, valueSerde, topicPattern);
+ }
+
+ /**
+ * Create a {@link KStream} from the specified topic pattern.
+ * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ * <p>
+ * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+ * them and there is no ordering guarantee between records from different topics.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+ * (like aggregation or join) is applied to the returned {@link KStream}.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed
+ * offsets are available
+ * @param keySerde key serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param valueSerde value serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param topicPattern the pattern to match for topic names
+ * @return a {@link KStream} for topics matching the regex pattern.
+ */
+ public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final Pattern topicPattern) {
+ return internalStreamsBuilder.stream(offsetReset, null, keySerde, valueSerde, topicPattern);
+ }
+
+ /**
+ * Create a {@link KStream} from the specified topic pattern.
+ * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+ * <p>
+ * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+ * them and there is no ordering guarantee between records from different topics.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+ * (like aggregation or join) is applied to the returned {@link KStream}.
+ *
+ * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
+ * if not specified the default extractor defined in the configs will be used
+ * @param keySerde key serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param valueSerde value serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param topicPattern the pattern to match for topic names
+ * @return a {@link KStream} for topics matching the regex pattern.
+ */
+ public synchronized <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final Pattern topicPattern) {
+ return internalStreamsBuilder.stream(null, timestampExtractor, keySerde, valueSerde, topicPattern);
+ }
+
+ /**
+ * Create a {@link KStream} from the specified topic pattern.
+ * <p>
+ * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+ * them and there is no ordering guarantee between records from different topics.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+ * (like aggregation or join) is applied to the returned {@link KStream}.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the matched topics if no valid
+ * committed offsets are available
+ * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
+ * if not specified the default extractor defined in the configs will be used
+ * @param keySerde key serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param valueSerde value serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param topicPattern the pattern to match for topic names
+ * @return a {@link KStream} for topics matching the regex pattern.
+ */
+ public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+ final TimestampExtractor timestampExtractor,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final Pattern topicPattern) {
+ return internalStreamsBuilder.stream(offsetReset, timestampExtractor, keySerde, valueSerde, topicPattern);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and
+ * default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topic must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; if {@code null} this is the equivalent of {@link #table(String)}
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final String topic,
+ final String queryableStoreName) {
+ return internalStreamsBuilder.table(null, null, null, null, topic, queryableStoreName);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
+ * {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param topic the topic name; cannot be {@code null}
+ * @param storeSupplier user defined state store supplier; cannot be {@code null}
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ return internalStreamsBuilder.table(null, null, null, null, topic, storeSupplier);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
+ * {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+ * store name. Note that that store name may not be queriable through Interactive Queries.
+ * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ *
+ * @param topic the topic name; cannot be {@code null}
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final String topic) {
+ return internalStreamsBuilder.table(null, null, null, null, topic, (String) null);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+ * offsets are available
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; if {@code null} this is the equivalent of
+ * {@link #table(Topology.AutoOffsetReset, String)}
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+ final String topic,
+ final String queryableStoreName) {
+ return internalStreamsBuilder.table(offsetReset, null, null, null, topic, queryableStoreName);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@link TimestampExtractor} and default key and value deserializers
+ * as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topic must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+ * offsets are available
+ * @param topic the topic name; cannot be {@code null}
+ * @param storeSupplier user defined state store supplier; cannot be {@code null}
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ return internalStreamsBuilder.table(offsetReset, null, null, null, topic, storeSupplier);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+ * store name. Note that that store name may not be queriable through Interactive Queries.
+ * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+ * offsets are available
+ * @param topic the topic name; cannot be {@code null}
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+ final String topic) {
+ return internalStreamsBuilder.table(offsetReset, null, null, null, topic, (String) null);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy and default key and value deserializers
+ * as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topic must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code storeName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+ * if not specified the default extractor defined in the configs will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
+ final String topic,
+ final String queryableStoreName) {
+ return internalStreamsBuilder.table(null, timestampExtractor, null, null, topic, queryableStoreName);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topic must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code storeName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+ * offsets are available
+ * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+ * if not specified the default extractor defined in the configs will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+ final TimestampExtractor timestampExtractor,
+ final String topic,
+ final String queryableStoreName) {
+ return internalStreamsBuilder.table(offsetReset, timestampExtractor, null, null, topic, queryableStoreName);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
+ * as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topic must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valueSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String topic,
+ final String queryableStoreName) {
+ return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, queryableStoreName);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topic must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valueSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param storeSupplier user defined state store supplier; cannot be {@code null}
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, storeSupplier);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+ * store name. Note that that store name may not be queriable through Interactive Queries.
+ * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valueSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String topic) {
+ return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, (String) null);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
+ * committed offsets are available
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valueSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String topic,
+ final String queryableStoreName) {
+ return internalStreamsBuilder.table(offsetReset, null, keySerde, valueSerde, topic, queryableStoreName);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+ * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topic must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code storeName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+ * if not specified the default extractor defined in the configs will be used
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valueSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String topic,
+ final String queryableStoreName) {
+ return internalStreamsBuilder.table(null, timestampExtractor, keySerde, valueSerde, topic, queryableStoreName);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topic must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code storeName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
+ * committed offsets are available
+ * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+ * if not specified the default extractor defined in the configs will be used
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valueSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+ final TimestampExtractor timestampExtractor,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String topic,
+ final String queryableStoreName) {
+ return internalStreamsBuilder.table(offsetReset, timestampExtractor, keySerde, valueSerde, topic, queryableStoreName);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+ * store name. Note that that store name may not be queriable through Interactive Queries.
+ * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+ * offsets are available
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valueSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String topic) {
+ return internalStreamsBuilder.table(offsetReset, null, keySerde, valueSerde, topic, (String) null);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+ * offsets are available
+ * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+ * if not specified the default extractor defined in the configs will be used
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valueSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param storeSupplier user defined state store supplier; cannot be {@code null}
+ * @return a {@link KTable} for the specified topic
+ */
+ public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+ final TimestampExtractor timestampExtractor,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ return internalStreamsBuilder.table(offsetReset, timestampExtractor, keySerde, valueSerde, topic, storeSupplier);
+ }
+
+ /**
+ * Create a {@link GlobalKTable} for the specified topic.
+ * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key);
+ * }</pre>
+ * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+ * regardless of the specified value in {@link StreamsConfig}.
+ *
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+ * @return a {@link GlobalKTable} for the specified topic
+ */
+ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
+ final String queryableStoreName) {
+ return internalStreamsBuilder.globalTable(null, null, null, topic, queryableStoreName);
+ }
+
+ /**
+ * Create a {@link GlobalKTable} for the specified topic.
+ * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
+ * store name. Note that that store name may not be queriable through Interactive Queries.
+ * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+ * regardless of the specified value in {@link StreamsConfig}.
+ *
+ * @param topic the topic name; cannot be {@code null}
+ * @return a {@link GlobalKTable} for the specified topic
+ */
+ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic) {
+ return internalStreamsBuilder.globalTable(null, null, null, topic, null);
+ }
+
+ /**
+ * Create a {@link GlobalKTable} for the specified topic.
+ * The default {@link TimestampExtractor} and default key and value deserializers as specified in
+ * the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+ * <p>
+ * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key);
+ * }</pre>
+ * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+ * regardless of the specified value in {@link StreamsConfig}.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valueSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+ * if not specified the default extractor defined in the configs will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+ * @return a {@link GlobalKTable} for the specified topic
+ */
+ public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final TimestampExtractor timestampExtractor,
+ final String topic,
+ final String queryableStoreName) {
+ return internalStreamsBuilder.globalTable(keySerde, valueSerde, timestampExtractor, topic, queryableStoreName);
+ }
+
+ /**
+ * Create a {@link GlobalKTable} for the specified topic.
+ * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+ * <p>
+ * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key);
+ * }</pre>
+ * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+ * regardless of the specified value in {@link StreamsConfig}.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valueSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param storeSupplier user defined state store supplier; Cannot be {@code null}
+ * @return a {@link GlobalKTable} for the specified topic
+ */
+ public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ return internalStreamsBuilder.globalTable(keySerde, valueSerde, topic, storeSupplier);
+ }
+
+ /**
+ * Create a {@link GlobalKTable} for the specified topic.
+ * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+ * <p>
+ * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key);
+ * }</pre>
+ * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+ * regardless of the specified value in {@link StreamsConfig}.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valueSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+ * @return a {@link GlobalKTable} for the specified topic
+ */
+ public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String topic,
+ final String queryableStoreName) {
+ return internalStreamsBuilder.globalTable(keySerde, valueSerde, null, topic, queryableStoreName);
+ }
+
+ /**
+ * Create a {@link GlobalKTable} for the specified topic.
+ * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
+ * store name. Note that that store name may not be queriable through Interactive Queries.
+ * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+ * regardless of the specified value in {@link StreamsConfig}.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valueSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @return a {@link GlobalKTable} for the specified topic
+ */
+ public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final String topic) {
+ return internalStreamsBuilder.globalTable(keySerde, valueSerde, null, topic, null);
+ }
+
+ /**
+ * Adds a state store to the underlying {@link Topology}.
+ *
+ * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+ * @param processorNames the names of the processors that should be able to access the provided store
+ * @return itself
+ * @throws TopologyException if state store supplier is already added
+ */
+ public synchronized StreamsBuilder addStateStore(final StateStoreSupplier supplier,
+ final String... processorNames) {
+ internalStreamsBuilder.addStateStore(supplier, processorNames);
+ return this;
+ }
+
+ /**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+ * <p>
+ * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+ * of the input topic.
+ * <p>
+ * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+ * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ *
+ * @param storeSupplier user defined state store supplier
+ * @param sourceName name of the {@link SourceNode} that will be automatically added
+ * @param keyDeserializer the {@link Deserializer} to deserialize keys with
+ * @param valueDeserializer the {@link Deserializer} to deserialize values with
+ * @param topic the topic to source the data from
+ * @param processorName the name of the {@link ProcessorSupplier}
+ * @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
+ * @return itself
+ * @throws TopologyException if the processor of state is already registered
+ */
+ public synchronized StreamsBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final String sourceName,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier stateUpdateSupplier) {
+ internalStreamsBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
+ valueDeserializer, topic, processorName, stateUpdateSupplier);
+ return this;
+ }
+
+ /**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+ * <p>
+ * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+ * of the input topic.
+ * <p>
+ * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+ *
+ * @param storeSupplier user defined state store supplier
+ * @param sourceName name of the {@link SourceNode} that will be automatically added
+ * @param timestampExtractor the stateless timestamp extractor used for this source,
+ * if not specified the default extractor defined in the configs will be used
+ * @param keyDeserializer the {@link Deserializer} to deserialize keys with
+ * @param valueDeserializer the {@link Deserializer} to deserialize values with
+ * @param topic the topic to source the data from
+ * @param processorName the name of the {@link ProcessorSupplier}
+ * @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
+ * @return itself
+ * @throws TopologyException if the processor of state is already registered
+ */
+ public synchronized StreamsBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final String sourceName,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier stateUpdateSupplier) {
+ internalStreamsBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
+ valueDeserializer, topic, processorName, stateUpdateSupplier);
+ return this;
+ }
+
+ /**
+ * Create a new instance of {@link KStream} by merging the given {@link KStream}s.
+ * <p>
+ * There is no ordering guarantee for records from different {@link KStream}s.
+ *
+ * @param streams the {@link KStream}s to be merged
+ * @return a {@link KStream} containing all records of the given streams
+ */
+ public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
+ return internalStreamsBuilder.merge(streams);
+ }
+
+ /**
+ * Returns the {@link Topology} that represents the specified processing logic.
+ *
+ * @return the {@link Topology} that represents the specified processing logic
+ */
+ public synchronized Topology build() {
+ return topology;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index c2e1077..000076e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -78,7 +78,7 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
* Furthermore, it is not allowed to enable {@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG "enable.auto.commit"} that
* is disabled by Kafka Streams by default.
*
- * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.processor.TopologyBuilder, StreamsConfig)
+ * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, StreamsConfig)
* @see ConsumerConfig
* @see ProducerConfig
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index ca0ac75..e8f7d23 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -538,6 +538,7 @@ public class Topology {
* Adds a state store.
*
* @param supplier the supplier used to obtain this state store {@link StateStore} instance
+ * @param processorNames the names of the processors that should be able to access the provided store
* @return itself
* @throws TopologyException if state store supplier is already added
*/
@@ -633,6 +634,11 @@ public class Topology {
return this;
}
+ /**
+ * Returns a description of the specified {@code Topology}.
+ *
+ * @return a description of the topology.
+ */
public synchronized TopologyDescription describe() {
return internalTopologyBuilder.describe();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index 7266d7d..81aa405 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
/**
@@ -35,7 +36,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
* All joins with the {@code GlobalKTable} require that a {@link KeyValueMapper} is provided that can map from the
* {@link KeyValue} of the left hand side {@link KStream} to the key of the right hand side {@code GlobalKTable}.
* <p>
- * A {@code GlobalKTable} is created via a {@link KStreamBuilder}. For example:
+ * A {@code GlobalKTable} is created via a {@link StreamsBuilder}. For example:
* <pre>{@code
* builder.globalTable("topic-name", "queryable-store-name");
* }</pre>
@@ -60,7 +61,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
* @param <K> Type of primary keys
* @param <V> Type of value changes
* @see KTable
- * @see KStreamBuilder#globalTable(String, String)
+ * @see StreamsBuilder#globalTable(String, String)
* @see KStream#join(GlobalKTable, KeyValueMapper, ValueJoiner)
* @see KStream#leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 535e1e9..8b3ceec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
@@ -36,7 +37,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
* For example a user X might buy two items I1 and I2, and thus there might be two records {@code <K:I1>, <K:I2>}
* in the stream.
* <p>
- * A {@code KStream} is either {@link KStreamBuilder#stream(String...) defined from one or multiple Kafka topics} that
+ * A {@code KStream} is either {@link StreamsBuilder#stream(String...) defined from one or multiple Kafka topics} that
* are consumed message by message or the result of a {@code KStream} transformation.
* A {@link KTable} can also be {@link KTable#toStream() converted} into a {@code KStream}.
* <p>
@@ -51,7 +52,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
* @param <V> Type of values
* @see KTable
* @see KGroupedStream
- * @see KStreamBuilder#stream(String...)
+ * @see org.apache.kafka.streams.StreamsBuilder#stream(String...)
*/
@InterfaceStability.Evolving
public interface KStream<K, V> {
@@ -652,8 +653,9 @@ public interface KStream<K, V> {
* The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
* started).
* <p>
- * This is equivalent to calling {@link #to(String) #to(someTopicName)} and {@link KStreamBuilder#stream(String...)
- * KStreamBuilder#stream(someTopicName)}.
+ * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
+ * {@link org.apache.kafka.streams.StreamsBuilder#stream(String...)
+ * StreamsBuilder#stream(someTopicName)}.
*
* @param topic the topic name
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
@@ -667,7 +669,7 @@ public interface KStream<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(StreamPartitioner, String) #to(StreamPartitioner, someTopicName)} and
- * {@link KStreamBuilder#stream(String...) KStreamBuilder#stream(someTopicName)}.
+ * {@link StreamsBuilder#stream(String...) StreamsBuilder#stream(someTopicName)}.
*
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
@@ -686,7 +688,7 @@ public interface KStream<K, V> {
* used—otherwise producer's {@link DefaultPartitioner} is used.
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valSerde, someTopicName)} and
- * {@link KStreamBuilder#stream(Serde, Serde, String...) KStreamBuilder#stream(keySerde, valSerde, someTopicName)}.
+ * {@link StreamsBuilder#stream(Serde, Serde, String...) StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -706,8 +708,8 @@ public interface KStream<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) #to(keySerde, valSerde,
- * StreamPartitioner, someTopicName)} and {@link KStreamBuilder#stream(Serde, Serde, String...)
- * KStreamBuilder#stream(keySerde, valSerde, someTopicName)}.
+ * StreamPartitioner, someTopicName)} and {@link StreamsBuilder#stream(Serde, Serde, String...)
+ * StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used