You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/09 13:46:03 UTC

[kafka] branch trunk updated: MINOR: improve JavaDocs for KStream.through() (#6639)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 53dec548 MINOR: improve JavaDocs for KStream.through() (#6639)
53dec548 is described below

commit 53dec548b6149274e6c2acf832386dba6ec3c1e1
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu May 9 15:45:50 2019 +0200

    MINOR: improve JavaDocs for KStream.through() (#6639)
    
    Reviewers: Bruno Cadonna <br...@confluent.io>, John Roesler <jo...@confluent.io>, A. Sophie Blee-Goldman <so...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 .../org/apache/kafka/streams/kstream/KStream.java  | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 8375336..cd64f75 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -429,8 +429,10 @@ public interface KStream<K, V> {
      * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
      * started).
      * <p>
-     * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
+     * This is similar to calling {@link #to(String) #to(someTopicName)} and
      * {@link StreamsBuilder#stream(String) StreamsBuilder#stream(someTopicName)}.
+     * Note that {@code through()} uses a hard coded {@link org.apache.kafka.streams.processor.FailOnInvalidTimestamp
+     * timestamp extractor} and does not allow to customize it, to ensure correct timestamp propagation.
      *
      * @param topic the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
@@ -444,8 +446,10 @@ public interface KStream<K, V> {
      * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
      * started).
      * <p>
-     * This is equivalent to calling {@link #to(String, Produced) to(someTopic, Produced.with(keySerde, valueSerde)}
+     * This is similar to calling {@link #to(String, Produced) to(someTopic, Produced.with(keySerde, valueSerde)}
      * and {@link StreamsBuilder#stream(String, Consumed) StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde))}.
+     * Note that {@code through()} uses a hard coded {@link org.apache.kafka.streams.processor.FailOnInvalidTimestamp
+     * timestamp extractor} and does not allow to customize it, to ensure correct timestamp propagation.
      *
      * @param topic     the topic name
      * @param produced  the options to use when producing to the topic
@@ -703,8 +707,8 @@ public interface KStream<K, V> {
      * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
      * a schedule must be registered.
      * The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}.
-     * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is {@null}, no
-     * records are emitted.
+     * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is {@code null},
+     * no records are emitted.
      * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
      * pairs can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
      * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
@@ -781,7 +785,7 @@ public interface KStream<K, V> {
      * The {@link ValueTransformerWithKey} must return the new value in
      * {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
      * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
-     * is {@null}, no records are emitted.
+     * is {@code null}, no records are emitted.
      * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
      * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
      * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
@@ -861,7 +865,7 @@ public interface KStream<K, V> {
      * {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object)
      * transform()}.
      * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty
-     * {@link java.lang.Iterable Iterable} or {@null}, no records are emitted.
+     * {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
      * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
      * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
      * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
@@ -925,8 +929,8 @@ public interface KStream<K, V> {
      * each input record value and computes zero or more new values.
      * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
      * This is a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}).
-     * Furthermore, via {@link org.apache.kafka.streams.processor. Punctuator#punctuate()} the processing progress can
-     * be observed and additional periodic actions can be performed.
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()} the processing
+     * progress can be observed and additional periodic actions can be performed.
      * <p>
      * In order to assign a state store, the state store must be created and registered beforehand:
      * <pre>{@code
@@ -947,7 +951,7 @@ public interface KStream<K, V> {
      * {@link java.util.Collection} type) in {@link ValueTransformerWithKey#transform(Object, Object)
      * transform()}.
      * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
-     * is an empty {@link java.lang.Iterable Iterable} or {@null}, no records are emitted.
+     * is an empty {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
      * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
      * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
      * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.