You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/07 22:47:02 UTC
[kafka] branch 2.2 updated: Improve API docs of (flatT|t)ransform
(#6365)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new 8179e8f Improve API docs of (flatT|t)ransform (#6365)
8179e8f is described below
commit 8179e8fdc04d8968e2aa11d7959bdbbecea19141
Author: cadonna <ca...@users.noreply.github.com>
AuthorDate: Thu Mar 7 23:41:57 2019 +0100
Improve API docs of (flatT|t)ransform (#6365)
This commit is a follow-up of pull request #5273
Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bb...@gmail.com>
---
.../org/apache/kafka/streams/kstream/KStream.java | 61 +++++++++++++++-------
1 file changed, 41 insertions(+), 20 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 5138917..c61b703 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -493,9 +493,9 @@ public interface KStream<K, V> {
* A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and
* returns zero or one output record.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
- * This is a stateful record-by-record operation (cf. {@link #map(KeyValueMapper)}).
- * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}, the processing progress
- * can be observed and additional periodic actions can be performed.
+ * This is a stateful record-by-record operation (cf. {@link #map(KeyValueMapper) map()}).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()},
+ * the processing progress can be observed and additional periodic actions can be performed.
* <p>
* In order to assign a state, the state must be created and registered beforehand:
* <pre>{@code
@@ -509,11 +509,13 @@ public interface KStream<K, V> {
*
* KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
* }</pre>
- * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}.
+ * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* The {@link Transformer} must return a {@link KeyValue} type in {@link Transformer#transform(Object, Object)
* transform()} and {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}.
+ * The return value of {@link Transformer#transform(Object, Object) Transformer#transform()} may be {@code null},
+ * in which case no record is emitted.
* <pre>{@code
* new TransformerSupplier() {
* Transformer get() {
@@ -541,19 +543,24 @@ public interface KStream<K, V> {
* }
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
- * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}.
+ * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+ * {@code transform()}.
* <p>
* Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
* or join) is applied to the result {@code KStream}.
- * (cf. {@link #transformValues(ValueTransformerSupplier, String...)})
+ * (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()} )
* <p>
* Note that it is possible to emit multiple records for each input record by using
- * {@link ProcessorContext#forward(Object, Object) context#forward()} in {@link Transformer#transform(K, V)}.
- * However, a mismatch between the types of the emitted records and the type of the stream would only be detected
- * at runtime.
- * To ensure type-safety at compile-time, it is recommended to use
- * {@link #flatTransform(TransformerSupplier, String...)} if multiple records need to be emitted for each input
- * record.
+ * {@link ProcessorContext#forward(Object, Object) context#forward()} in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+ * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+ * Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
+ * detected at runtime.
+ * To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
+ * not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+ * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+ * If in {@link Transformer#transform(Object, Object) Transformer#transform()} multiple records need to be emitted
+ * for each input record, it is recommended to use {@link #flatTransform(TransformerSupplier, String...)
+ * flatTransform()}.
*
* @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}
* @param stateStoreNames the names of the state stores used by the processor
@@ -575,10 +582,10 @@ public interface KStream<K, V> {
* A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and
* returns zero or more output records.
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K':V'>, <K'':V''>, ...}.
- * This is a stateful record-by-record operation (cf. {@link #flatMap(KeyValueMapper)} for stateless record
- * transformation).
- * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
- * can be observed and additional periodic actions can be performed.
+ * This is a stateful record-by-record operation (cf. {@link #flatMap(KeyValueMapper) flatMap()} for stateless
+ * record transformation).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
+ * the processing progress can be observed and additional periodic actions can be performed.
* <p>
* In order to assign a state, the state must be created and registered beforehand:
* <pre>{@code
@@ -593,8 +600,12 @@ public interface KStream<K, V> {
* KStream outputStream = inputStream.flatTransform(new TransformerSupplier() { ... }, "myTransformState");
* }</pre>
* Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}.
- * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
- * a schedule must be registered.
+ * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
+ * punctuate()}, a schedule must be registered.
+ * The {@link Transformer} must return an {@link java.lang.Iterable} type (e.g., any {@link java.util.Collection}
+ * type) in {@link Transformer#transform(Object, Object) transform()}.
+ * The return value of {@link Transformer#transform(Object, Object) Transformer#transform()} may be {@code null},
+ * which is equal to returning an empty {@link java.lang.Iterable Iterable}, i.e., no records are emitted.
* <pre>{@code
* new TransformerSupplier() {
* Transformer get() {
@@ -626,11 +637,21 @@ public interface KStream<K, V> {
* }
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
- * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}.
+ * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+ * {@code flatTransform()}.
* <p>
* Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
* or join) is applied to the result {@code KStream}.
- * (cf. {@link #transformValues(ValueTransformerSupplier, String...)})
+ * (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()})
+ * <p>
+ * Note that it is possible to emit records by using {@link ProcessorContext#forward(Object, Object)
+ * context#forward()} in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+ * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+ * Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
+ * detected at runtime.
+ * To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
+ * not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+ * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
*
* @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}
* @param stateStoreNames the names of the state stores used by the processor