You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/07 02:19:57 UTC
[kafka] branch trunk updated: MINOR: improve JavaDocs about global
state stores (#6359)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 240d758 MINOR: improve JavaDocs about global state stores (#6359)
240d758 is described below
commit 240d7589d624b72fa95e2f84a84778bc3a127927
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Mar 6 18:19:47 2019 -0800
MINOR: improve JavaDocs about global state stores (#6359)
Improve JavaDocs about global state stores.
Reviewers: Guozhang Wang <wa...@gmail.com>, Sophie Blee-Goldman <so...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
.../org/apache/kafka/streams/StreamsBuilder.java | 9 ++
.../org/apache/kafka/streams/kstream/KStream.java | 142 +++++++++++----------
.../kafka/streams/scala/StreamsBuilder.scala | 10 +-
.../kafka/streams/scala/kstream/KStream.scala | 25 ++--
4 files changed, 110 insertions(+), 76 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 1b3b4a2..9e89d7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -25,9 +25,12 @@ import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TimestampExtractor;
@@ -447,6 +450,9 @@ public class StreamsBuilder {
/**
* Adds a state store to the underlying {@link Topology}.
+ * <p>
+ * It is required to connect state stores to {@link Processor Processors}, {@link Transformer Transformers},
+ * or {@link ValueTransformer ValueTransformers} before they can be used.
*
* @param builder the builder used to obtain this state store {@link StateStore} instance
* @return itself
@@ -492,6 +498,9 @@ public class StreamsBuilder {
* records forwarded from the {@link SourceNode}.
* This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ * <p>
+ * It is not required to connect a global store to {@link Processor Processors}, {@link Transformer Transformers},
+ * or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
*
* @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null}
* @param topic the topic to source the data from
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 5138917..df001d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -229,18 +229,19 @@ public interface KStream<K, V> {
* and emit a record {@code <word:1>} for each word.
* <pre>{@code
* KStream<byte[], String> inputStream = builder.stream("topic");
- * KStream<String, Integer> outputStream = inputStream.flatMap(new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> {
- * Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) {
- * String[] tokens = value.split(" ");
- * List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length);
+ * KStream<String, Integer> outputStream = inputStream.flatMap(
+ * new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> {
+ * Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) {
+ * String[] tokens = value.split(" ");
+ * List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length);
+ *
+ * for(String token : tokens) {
+ * result.add(new KeyValue<>(token, 1));
+ * }
*
- * for(String token : tokens) {
- * result.add(new KeyValue<>(token, 1));
+ * return result;
* }
- *
- * return result;
- * }
- * });
+ * });
* }</pre>
* The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
* and the return value must not be {@code null}.
@@ -497,7 +498,8 @@ public interface KStream<K, V> {
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}, the processing progress
* can be observed and additional periodic actions can be performed.
* <p>
- * In order to assign a state, the state must be created and registered beforehand:
+ * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
+ * global state stores; read-only access to global state stores is available by default):
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -580,7 +582,8 @@ public interface KStream<K, V> {
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
* can be observed and additional periodic actions can be performed.
* <p>
- * In order to assign a state, the state must be created and registered beforehand:
+ * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
+ * global state stores; read-only access to global state stores is available by default):
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -652,10 +655,11 @@ public interface KStream<K, V> {
* record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}).
- * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional
- * periodic actions can be performed.
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
+ * can be observed and additional periodic actions can be performed.
* <p>
- * In order to assign a state, the state must be created and registered beforehand:
+ * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
+ * global state stores; read-only access to global state stores is available by default):
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -669,8 +673,8 @@ public interface KStream<K, V> {
* }</pre>
* Within the {@link ValueTransformer}, the state is obtained via the
* {@link ProcessorContext}.
- * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, a schedule must be
- * registered.
+ * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+ * a schedule must be registered.
* In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
* pairs should be emitted via {@link ProcessorContext#forward(Object, Object)
* ProcessorContext.forward()}.
@@ -682,7 +686,8 @@ public interface KStream<K, V> {
*
* void init(ProcessorContext context) {
* this.state = context.getStateStore("myValueTransformState");
- * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
+ * // punctuate each 1000ms, can access this.state
+ * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* NewValueType transform(V value) {
@@ -718,14 +723,15 @@ public interface KStream<K, V> {
/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
- * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to each input
- * record value and computes a new value for it.
+ * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
+ * each input record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey)}).
- * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional
- * periodic actions can be performed.
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
+ * can be observed and additional periodic actions can be performed.
* <p>
- * In order to assign a state, the state must be created and registered beforehand:
+ * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
+ * global state stores; read-only access to global state stores is available by default):
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -752,7 +758,8 @@ public interface KStream<K, V> {
*
* void init(ProcessorContext context) {
* this.state = context.getStateStore("myValueTransformState");
- * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
+ * // punctuate each 1000ms, can access this.state
+ * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* NewValueType transform(K readOnlyKey, V value) {
@@ -791,11 +798,12 @@ public interface KStream<K, V> {
* Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given
* {@link ProcessorSupplier}).
* This is a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}).
- * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional
- * periodic actions can be performed.
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
+ * can be observed and additional periodic actions can be performed.
* Note that this is a terminal operation that returns void.
* <p>
- * In order to assign a state, the state must be created and registered beforehand:
+ * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
+ * global state stores; read-only access to global state stores is available by default):
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -819,7 +827,8 @@ public interface KStream<K, V> {
*
* void init(ProcessorContext context) {
* this.state = context.getStateStore("myProcessorState");
- * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
+ * // punctuate each 1000ms, can access this.state
+ * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* void process(K key, V value) {
@@ -857,8 +866,8 @@ public interface KStream<K, V> {
* {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka if a later
* operator depends on the newly selected key.
* This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
- * an internally generated name, and "-repartition" is a fixed suffix.
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+ * "<name>" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
@@ -886,8 +895,8 @@ public interface KStream<K, V> {
* {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka
* if a later operator depends on the newly selected key.
* This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
- * an internally generated name, and "-repartition" is a fixed suffix.
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+ * "<name>" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
@@ -916,9 +925,9 @@ public interface KStream<K, V> {
* {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka if a later operator
* depends on the newly selected key.
* This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, <name> is
- * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name,
- * and "-repartition" is a fixed suffix.
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+ * <name> is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally
+ * generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
@@ -938,14 +947,15 @@ public interface KStream<K, V> {
* and default serializers and deserializers.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
- * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the original values.
+ * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
+ * original values.
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}
* <p>
* Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
* later operator depends on the newly selected key.
* This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
- * an internally generated name, and "-repartition" is a fixed suffix.
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+ * "<name>" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
@@ -966,14 +976,15 @@ public interface KStream<K, V> {
* and {@link Serde}s as specified by {@link Serialized}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
- * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the original values.
+ * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
+ * original values.
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p>
* Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
* later operator depends on the newly selected key.
* This topic will be as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
- * an internally generated name, and "-repartition" is a fixed suffix.
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+ * "<name>" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
@@ -997,14 +1008,16 @@ public interface KStream<K, V> {
* and {@link Serde}s as specified by {@link Grouped}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
- * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the original values.
+ * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
+ * original values.
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p>
* Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later
* operator depends on the newly selected key.
* This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
- * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name.
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+ * "<name>" is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an
+ * internally generated name.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
@@ -1067,8 +1080,8 @@ public interface KStream<K, V> {
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
- * "-repartition" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated
+ * name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
@@ -1144,8 +1157,8 @@ public interface KStream<K, V> {
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
- * "-repartition" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated
+ * name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
@@ -1227,8 +1240,8 @@ public interface KStream<K, V> {
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
- * "-repartition" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated
+ * name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
@@ -1308,8 +1321,8 @@ public interface KStream<K, V> {
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
- * "-repartition" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated
+ * name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
@@ -1392,8 +1405,8 @@ public interface KStream<K, V> {
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
- * "-repartition" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated
+ * name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
@@ -1474,8 +1487,8 @@ public interface KStream<K, V> {
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
- * "-repartition" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated
+ * name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
@@ -1561,8 +1574,8 @@ public interface KStream<K, V> {
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
- * "-repartition" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated
+ * name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
@@ -1636,8 +1649,8 @@ public interface KStream<K, V> {
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
- * "-repartition" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated
+ * name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
@@ -1717,8 +1730,8 @@ public interface KStream<K, V> {
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
- * "-repartition" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated
+ * name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
@@ -1795,8 +1808,8 @@ public interface KStream<K, V> {
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
- * "-repartition" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated
+ * name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
@@ -1889,5 +1902,4 @@ public interface KStream<K, V> {
<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner);
-
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 8c5a9b3..4a1df92 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.state.StoreBuilder
import org.apache.kafka.streams.{Topology, StreamsBuilder => StreamsBuilderJ}
import org.apache.kafka.streams.scala.kstream._
import ImplicitConversions._
+import org.apache.kafka.streams.errors.TopologyException
import scala.collection.JavaConverters._
@@ -77,7 +78,7 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
/**
* Create a [[kstream.KStream]] from the specified topic pattern.
*
- * @param topics the topic name pattern
+ * @param topicPattern the topic name pattern
* @return a [[kstream.KStream]] for the specified topics
* @see #stream(String)
* @see `org.apache.kafka.streams.StreamsBuilder#stream`
@@ -157,6 +158,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
/**
* Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`,
* `Transformer`, or `ValueTransformer` before it can be used.
+ * <p>
+ * It is required to connect state stores to `Processor`, `Transformer`, or `ValueTransformer` before they can be used.
*
* @param builder the builder used to obtain this state store `StateStore` instance
* @return the underlying Java abstraction `StreamsBuilder` after adding the `StateStore`
@@ -166,8 +169,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilderJ = inner.addStateStore(builder)
/**
- * Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`,
+ * Adds a global `StateStore` to the topology. Global stores should not be added to `Processor`, `Transformer`,
* or `ValueTransformer` (in contrast to regular stores).
+ * <p>
+ * It is not required to connect a global store to `Processor`, `Transformer`, or `ValueTransformer`;
+ * those have read-only access to all global stores by default.
*
* @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
*/
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 635975b..5df9de8 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -285,9 +285,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* can be altered arbitrarily).
* A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input record
* and computes zero or more output records.
- * In order to assign a state, the state must be created and registered
- * beforehand via stores added via `addStateStore` or `addGlobalStore`
- * before they can be connected to the `Transformer`
+ * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
+ * to the `Transformer`.
+ * It's not required to connect global state stores that are added via `addGlobalStore`;
+ * read-only access to global state stores is available by default.
*
* @param transformerSupplier the `TransformerSuplier` that generates `Transformer`
* @param stateStoreNames the names of the state stores used by the processor
@@ -302,8 +303,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Transform the value of each input record into a new value (with possible new type) of the output record.
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
* record value and computes a new value for it.
- * In order to assign a state, the state must be created and registered
- * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
+ * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
+ * to the `ValueTransformer`.
+ * It's not required to connect global state stores that are added via `addGlobalStore`;
+ * read-only access to global state stores is available by default.
*
* @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer`
* @param stateStoreNames the names of the state stores used by the processor
@@ -318,8 +321,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Transform the value of each input record into a new value (with possible new type) of the output record.
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
* record value and computes a new value for it.
- * In order to assign a state, the state must be created and registered
- * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
+ * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
+ * to the `ValueTransformer`.
+ * It's not required to connect global state stores that are added via `addGlobalStore`;
+ * read-only access to global state stores is available by default.
*
* @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
* @param stateStoreNames the names of the state stores used by the processor
@@ -333,8 +338,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
/**
* Process all records in this stream, one record at a time, by applying a `Processor` (provided by the given
* `processorSupplier`).
- * In order to assign a state, the state must be created and registered
- * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
+ * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
+ * to the `Processor`.
+ * It's not required to connect global state stores that are added via `addGlobalStore`;
+ * read-only access to global state stores is available by default.
*
* @param processorSupplier a function that generates a [[org.apache.kafka.streams.processor.Processor]]
* @param stateStoreNames the names of the state store used by the processor