You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/07 22:47:02 UTC

[kafka] branch 2.2 updated: Improve API docs of (flatT|t)ransform (#6365)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 8179e8f  Improve API docs of (flatT|t)ransform (#6365)
8179e8f is described below

commit 8179e8fdc04d8968e2aa11d7959bdbbecea19141
Author: cadonna <ca...@users.noreply.github.com>
AuthorDate: Thu Mar 7 23:41:57 2019 +0100

    Improve API docs of (flatT|t)ransform (#6365)
    
    This commit is a follow-up of pull request #5273
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bb...@gmail.com>
---
 .../org/apache/kafka/streams/kstream/KStream.java  | 61 +++++++++++++++-------
 1 file changed, 41 insertions(+), 20 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 5138917..c61b703 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -493,9 +493,9 @@ public interface KStream<K, V> {
      * A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and
      * returns zero or one output record.
      * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
-     * This is a stateful record-by-record operation (cf. {@link #map(KeyValueMapper)}).
-     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}, the processing progress
-     * can be observed and additional periodic actions can be performed.
+     * This is a stateful record-by-record operation (cf. {@link #map(KeyValueMapper) map()}).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()},
+     * the processing progress can be observed and additional periodic actions can be performed.
      * <p>
      * In order to assign a state, the state must be created and registered beforehand:
      * <pre>{@code
@@ -509,11 +509,13 @@ public interface KStream<K, V> {
      *
      * KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
      * }</pre>
-     * Within the {@link Transformer}, the state is obtained via the {@link  ProcessorContext}.
+     * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}.
      * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
      * a schedule must be registered.
      * The {@link Transformer} must return a {@link KeyValue} type in {@link Transformer#transform(Object, Object)
      * transform()} and {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}.
+     * The return value of {@link Transformer#transform(Object, Object) Transformer#transform()} may be {@code null},
+     * in which case no record is emitted.
      * <pre>{@code
      * new TransformerSupplier() {
      *     Transformer get() {
@@ -541,19 +543,24 @@ public interface KStream<K, V> {
      * }
      * }</pre>
      * Even if any upstream operation was key-changing, no auto-repartition is triggered.
-     * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}.
+     * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+     * {@code transform()}.
      * <p>
      * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
      * or join) is applied to the result {@code KStream}.
-     * (cf. {@link #transformValues(ValueTransformerSupplier, String...)})
+     * (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()} )
      * <p>
      * Note that it is possible to emit multiple records for each input record by using
-     * {@link ProcessorContext#forward(Object, Object) context#forward()} in {@link Transformer#transform(K, V)}.
-     * However, a mismatch between the types of the emitted records and the type of the stream would only be detected
-     * at runtime.
-     * To ensure type-safety at compile-time, it is recommended to use
-     * {@link #flatTransform(TransformerSupplier, String...)} if multiple records need to be emitted for each input
-     * record.
+     * {@link ProcessorContext#forward(Object, Object) context#forward()} in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+     * Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
+     * detected at runtime.
+     * To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
+     * not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+     * If in {@link Transformer#transform(Object, Object) Transformer#transform()} multiple records need to be emitted
+     * for each input record, it is recommended to use {@link #flatTransform(TransformerSupplier, String...)
+     * flatTransform()}.
      *
      * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}
      * @param stateStoreNames     the names of the state stores used by the processor
@@ -575,10 +582,10 @@ public interface KStream<K, V> {
      * A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and
      * returns zero or more output records.
      * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K':V'>, <K'':V''>, ...}.
-     * This is a stateful record-by-record operation (cf. {@link #flatMap(KeyValueMapper)} for stateless record
-     * transformation).
-     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
-     * can be observed and additional periodic actions can be performed.
+     * This is a stateful record-by-record operation (cf. {@link #flatMap(KeyValueMapper) flatMap()} for stateless
+     * record transformation).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
+     * the processing progress can be observed and additional periodic actions can be performed.
      * <p>
      * In order to assign a state, the state must be created and registered beforehand:
      * <pre>{@code
@@ -593,8 +600,12 @@ public interface KStream<K, V> {
      * KStream outputStream = inputStream.flatTransform(new TransformerSupplier() { ... }, "myTransformState");
      * }</pre>
      * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}.
-     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
-     * a schedule must be registered.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
+     * punctuate()}, a schedule must be registered.
+     * The {@link Transformer} must return an {@link java.lang.Iterable} type (e.g., any {@link java.util.Collection}
+     * type) in {@link Transformer#transform(Object, Object) transform()}.
+     * The return value of {@link Transformer#transform(Object, Object) Transformer#transform()} may be {@code null},
+     * which is equal to returning an empty {@link java.lang.Iterable Iterable}, i.e., no records are emitted.
      * <pre>{@code
      * new TransformerSupplier() {
      *     Transformer get() {
@@ -626,11 +637,21 @@ public interface KStream<K, V> {
      * }
      * }</pre>
      * Even if any upstream operation was key-changing, no auto-repartition is triggered.
-     * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}.
+     * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+     * {@code flatTransform()}.
      * <p>
      * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
      * or join) is applied to the result {@code KStream}.
-     * (cf. {@link #transformValues(ValueTransformerSupplier, String...)})
+     * (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()})
+     * <p>
+     * Note that it is possible to emit records by using {@link ProcessorContext#forward(Object, Object)
+     * context#forward()} in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+     * Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
+     * detected at runtime.
+     * To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
+     * not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
      *
      * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}
      * @param stateStoreNames     the names of the state stores used by the processor