You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:06:01 UTC
[47/50] [abbrv] kafka git commit: KAFKA-3440: Update streams javadocs
KAFKA-3440: Update streams javadocs
- add class doc for KTable, KStream, JoinWindows
- add missing return tags
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>, Michael G. Noll <mi...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1287 from mjsax/kafka-3440-JavaDoc
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3414d561
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3414d561
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3414d561
Branch: refs/heads/0.10.0
Commit: 3414d56121d8d2f66f8dd613453af71d5b3f0c5f
Parents: 69d9a66
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Apr 29 12:50:02 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Apr 29 12:50:02 2016 -0700
----------------------------------------------------------------------
.../kafka/streams/kstream/JoinWindows.java | 18 ++++
.../kafka/streams/kstream/KGroupedTable.java | 14 ++-
.../apache/kafka/streams/kstream/KStream.java | 95 +++++++++++++++++++-
.../kafka/streams/kstream/KStreamBuilder.java | 14 ++-
.../apache/kafka/streams/kstream/KTable.java | 47 +++++++++-
.../kafka/streams/processor/StateStore.java | 4 +-
.../apache/kafka/streams/state/WindowStore.java | 2 +
7 files changed, 189 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index a6d5603..f45c064 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -23,6 +23,24 @@ import java.util.Map;
/**
* The window specifications used for joins.
+ * <p>
+ * A {@link JoinWindows} instance defines a join over two stream on the same key and a maximum time difference.
+ * In SQL-style you would express this join as
+ * <pre>
+ * SELECT * FROM stream1, stream2
+ * WHERE
+ * stream1.key = stream2.key
+ * AND
+ * stream2.ts - before <= stream1.ts <= stream2.ts + after
+ * </pre>
+ * There are three different window configuration supported:
+ * <ul>
+ * <li>before = after = time-difference</li>
+ * <li>before = 0 and after = time-difference</li>
+ * <li>before = time-difference and after = 0</li>
+ * </ul>
+ * A join is symmetric in the sense, that a join specification on the first stream returns the same result record as
+ * a join specification on the second stream with flipped before and after values.
*/
public class JoinWindows extends Windows<TimeWindow> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 86c34b1..2ebad87 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -21,7 +21,11 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
/**
- * {@link KGroupedTable} is an abstraction of a <i>grouped changelog stream</i> from a primary-keyed table.
+ * {@link KGroupedTable} is an abstraction of a <i>grouped changelog stream</i> from a primary-keyed table,
+ * usually on a different grouping key than the original primary key.
+ * <p>
+ * It is an intermediate representation after a re-grouping of a {@link KTable} before an aggregation is applied
+ * to the new partitions resulting in a new {@link KTable}.
*
* @param <K> Type of primary keys
* @param <V> Type of value changes
@@ -35,6 +39,8 @@ public interface KGroupedTable<K, V> {
* @param adder the instance of {@link Reducer} for addition
* @param subtractor the instance of {@link Reducer} for subtraction
* @param name the name of the resulted {@link KTable}
+ * @return a {@link KTable} with the same key and value types as this {@link KGroupedTable},
+ * containing aggregated values for each key
*/
KTable<K, V> reduce(Reducer<V> adder,
Reducer<V> subtractor,
@@ -50,6 +56,8 @@ public interface KGroupedTable<K, V> {
* if not specified the default serdes defined in the configs will be used
* @param name the name of the resulted table
* @param <T> the value type of the aggregated {@link KTable}
+ * @return a {@link KTable} with same key and aggregated value type {@code T},
+ * containing aggregated values for each key
*/
<T> KTable<K, T> aggregate(Initializer<T> initializer,
Aggregator<K, V, T> adder,
@@ -66,6 +74,8 @@ public interface KGroupedTable<K, V> {
* @param substractor the instance of {@link Aggregator} for subtraction
* @param name the name of the resulted {@link KTable}
* @param <T> the value type of the aggregated {@link KTable}
+ * @return a {@link KTable} with same key and aggregated value type {@code T},
+ * containing aggregated values for each key
*/
<T> KTable<K, T> aggregate(Initializer<T> initializer,
Aggregator<K, V, T> adder,
@@ -76,6 +86,8 @@ public interface KGroupedTable<K, V> {
* Count number of records of this stream by the selected key into a new instance of {@link KTable}.
*
* @param name the name of the resulted {@link KTable}
+ * @return a {@link KTable} with same key and {@link Long} value type as this {@link KGroupedTable},
+ * containing the number of values for each key
*/
KTable<K, Long> count(String name);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 6df2deb..a1ecfa4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -25,9 +25,17 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
/**
* {@link KStream} is an abstraction of a <i>record stream</i> of key-value pairs.
+ * <p>
+ * A {@link KStream} is either defined from one or multiple Kafka topics that are consumed message by message or
+ * the result of a {@link KStream} transformation. A {@link KTable} can also be converted into a {@link KStream}.
+ * <p>
+ * A {@link KStream} can be transformed record by record, joined with another {@link KStream} or {@link KTable}, or
+ * can be aggregated into a {@link KTable}.
*
* @param <K> Type of keys
* @param <V> Type of values
+ *
+ * @see KTable
*/
@InterfaceStability.Unstable
public interface KStream<K, V> {
@@ -36,6 +44,8 @@ public interface KStream<K, V> {
* Create a new instance of {@link KStream} that consists of all elements of this stream which satisfy a predicate.
*
* @param predicate the instance of {@link Predicate}
+ *
+ * @return a {@link KStream} that contains only those records that satisfy the given predicate
*/
KStream<K, V> filter(Predicate<K, V> predicate);
@@ -43,6 +53,8 @@ public interface KStream<K, V> {
* Create a new instance of {@link KStream} that consists all elements of this stream which do not satisfy a predicate.
*
* @param predicate the instance of {@link Predicate}
+ *
+ * @return a {@link KStream} that contains only those records that do not satisfy the given predicate
*/
KStream<K, V> filterNot(Predicate<K, V> predicate);
@@ -52,6 +64,8 @@ public interface KStream<K, V> {
*
* @param mapper the instance of {@link KeyValueMapper}
* @param <K1> the new key type on the stream
+ *
+ * @return a {@link KStream} that contains records with different key type and same value type
*/
<K1> KStream<K1, V> selectKey(KeyValueMapper<K, V, K1> mapper);
@@ -61,6 +75,8 @@ public interface KStream<K, V> {
* @param mapper the instance of {@link KeyValueMapper}
* @param <K1> the key type of the new stream
* @param <V1> the value type of the new stream
+ *
+ * @return a {@link KStream} that contains records with new key and value type
*/
<K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper);
@@ -69,6 +85,8 @@ public interface KStream<K, V> {
*
* @param mapper the instance of {@link ValueMapper}
* @param <V1> the value type of the new stream
+ *
+ * @return a {@link KStream} that contains records with unmodified keys and new values of different type
*/
<V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
@@ -124,6 +142,8 @@ public interface KStream<K, V> {
* @param mapper the instance of {@link KeyValueMapper}
* @param <K1> the key type of the new stream
* @param <V1> the value type of the new stream
+ *
+ * @return a {@link KStream} that contains more or less records with new key and value type
*/
<K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper);
@@ -132,6 +152,8 @@ public interface KStream<K, V> {
*
* @param processor the instance of {@link ValueMapper}
* @param <V1> the value type of the new stream
+ *
+ * @return a {@link KStream} that contains more or less records with unmodified keys and new values of different type
*/
<V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor);
@@ -143,6 +165,8 @@ public interface KStream<K, V> {
* assigned to this stream only. An element will be dropped if none of the predicates evaluate to true.
*
* @param predicates the ordered list of {@link Predicate} instances
+ *
+ * @return multiple distinct substreams of this {@link KStream}
*/
KStream<K, V>[] branch(Predicate<K, V>... predicates);
@@ -152,6 +176,8 @@ public interface KStream<K, V> {
* This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
*
* @param topic the topic name
+ *
+ * @return a {@link KStream} that contains the exact same records as this {@link KStream}
*/
KStream<K, V> through(String topic);
@@ -159,7 +185,7 @@ public interface KStream<K, V> {
* Perform an action on each element of {@link KStream}.
* Note that this is a terminal operation that returns void.
*
- * @param action An action to perform on each element
+ * @param action an action to perform on each element
*/
void foreach(ForeachAction<K, V> action);
@@ -171,6 +197,8 @@ public interface KStream<K, V> {
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
* @param topic the topic name
+ *
+ * @return a {@link KStream} that contains the exact same records as this {@link KStream}
*/
KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic);
@@ -187,6 +215,8 @@ public interface KStream<K, V> {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name
+ *
+ * @return a {@link KStream} that contains the exact same records as this {@link KStream}
*/
KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
@@ -205,6 +235,8 @@ public interface KStream<K, V> {
* {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
* — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
* @param topic the topic name
+ *
+ * @return a {@link KStream} that contains the exact same records as this {@link KStream}
*/
KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
@@ -260,6 +292,8 @@ public interface KStream<K, V> {
*
* @param transformerSupplier the instance of {@link TransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.Transformer}
* @param stateStoreNames the names of the state store used by the processor
+ *
+ * @return a new {@link KStream} with transformed key and value types
*/
<K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames);
@@ -268,6 +302,8 @@ public interface KStream<K, V> {
*
* @param valueTransformerSupplier the instance of {@link ValueTransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.ValueTransformer}
* @param stateStoreNames the names of the state store used by the processor
+ *
+ * @return a {@link KStream} that contains records with unmodified keys and transformed values with type {@code R}
*/
<R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
@@ -293,6 +329,9 @@ public interface KStream<K, V> {
* if not specified the default serdes defined in the configs will be used
* @param <V1> the value type of the other stream
* @param <R> the value type of the new stream
+ *
+ * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key and within the joining window intervals
*/
<V1, R> KStream<K, R> join(
KStream<K, V1> otherStream,
@@ -311,6 +350,9 @@ public interface KStream<K, V> {
* @param windows the specification of the {@link JoinWindows}
* @param <V1> the value type of the other stream
* @param <R> the value type of the new stream
+ *
+ * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key and within the joining window intervals
*/
<V1, R> KStream<K, R> join(
KStream<K, V1> otherStream,
@@ -331,6 +373,9 @@ public interface KStream<K, V> {
* if not specified the default serdes defined in the configs will be used
* @param <V1> the value type of the other stream
* @param <R> the value type of the new stream
+ *
+ * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key and within the joining window intervals
*/
<V1, R> KStream<K, R> outerJoin(
KStream<K, V1> otherStream,
@@ -349,6 +394,9 @@ public interface KStream<K, V> {
* @param windows the specification of the {@link JoinWindows}
* @param <V1> the value type of the other stream
* @param <R> the value type of the new stream
+ *
+ * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key and within the joining window intervals
*/
<V1, R> KStream<K, R> outerJoin(
KStream<K, V1> otherStream,
@@ -367,6 +415,9 @@ public interface KStream<K, V> {
* if not specified the default serdes defined in the configs will be used
* @param <V1> the value type of the other stream
* @param <R> the value type of the new stream
+ *
+ * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key and within the joining window intervals
*/
<V1, R> KStream<K, R> leftJoin(
KStream<K, V1> otherStream,
@@ -384,6 +435,9 @@ public interface KStream<K, V> {
* @param windows the specification of the {@link JoinWindows}
* @param <V1> the value type of the other stream
* @param <R> the value type of the new stream
+ *
+ * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key and within the joining window intervals
*/
<V1, R> KStream<K, R> leftJoin(
KStream<K, V1> otherStream,
@@ -397,6 +451,9 @@ public interface KStream<K, V> {
* @param joiner the instance of {@link ValueJoiner}
* @param <V1> the value type of the table
* @param <V2> the value type of the new stream
+ *
+ * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key and within the joining window intervals
*/
<V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> table, ValueJoiner<V, V1, V2> joiner);
@@ -409,6 +466,10 @@ public interface KStream<K, V> {
* if not specified the default serdes defined in the configs will be used
* @param valueSerde value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
+ *
+ * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+ * where each table contains records with unmodified keys and values
+ * that represent the latest (rolling) aggregate for each key within that window
*/
<W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
Windows<W> windows,
@@ -421,6 +482,10 @@ public interface KStream<K, V> {
*
* @param reducer the instance of {@link Reducer}
* @param windows the specification of the aggregation {@link Windows}
+ *
+ * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+ * where each table contains records with unmodified keys and values
+ * that represent the latest (rolling) aggregate for each key within that window
*/
<W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows);
@@ -433,6 +498,8 @@ public interface KStream<K, V> {
* @param valueSerde value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
* @param name the name of the resulted {@link KTable}
+ *
+ * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
*/
KTable<K, V> reduceByKey(Reducer<V> reducer,
Serde<K> keySerde,
@@ -444,6 +511,8 @@ public interface KStream<K, V> {
*
* @param reducer the instance of {@link Reducer}
* @param name the name of the resulted {@link KTable}
+ *
+ * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
*/
KTable<K, V> reduceByKey(Reducer<V> reducer, String name);
@@ -458,6 +527,10 @@ public interface KStream<K, V> {
* @param aggValueSerde aggregate value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
* @param <T> the value type of the resulted {@link KTable}
+ *
+ * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+ * where each table contains records with unmodified keys and values with type {@code T}
+ * that represent the latest (rolling) aggregate for each key within that window
*/
<T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
Aggregator<K, V, T> aggregator,
@@ -473,6 +546,10 @@ public interface KStream<K, V> {
* @param aggregator the instance of {@link Aggregator}
* @param windows the specification of the aggregation {@link Windows}
* @param <T> the value type of the resulted {@link KTable}
+ *
+ * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+ * where each table contains records with unmodified keys and values with type {@code T}
+ * that represent the latest (rolling) aggregate for each key within that window
*/
<T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
Aggregator<K, V, T> aggregator,
@@ -489,6 +566,8 @@ public interface KStream<K, V> {
* if not specified the default serdes defined in the configs will be used
* @param name the name of the resulted {@link KTable}
* @param <T> the value type of the resulted {@link KTable}
+ *
+ * @return a {@link KTable} that contains records with unmodified keys and values (of different type) that represent the latest (rolling) aggregate for each key
*/
<T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
Aggregator<K, V, T> aggregator,
@@ -504,6 +583,8 @@ public interface KStream<K, V> {
* @param aggregator the class of {@link Aggregator}
* @param name the name of the resulted {@link KTable}
* @param <T> the value type of the resulted {@link KTable}
+ *
+ * @return a {@link KTable} that contains records with unmodified keys and values (of different type) that represent the latest (rolling) aggregate for each key
*/
<T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
Aggregator<K, V, T> aggregator,
@@ -515,6 +596,10 @@ public interface KStream<K, V> {
* @param windows the specification of the aggregation {@link Windows}
* @param keySerde key serdes for materializing the counting table,
* if not specified the default serdes defined in the configs will be used
+ *
+ * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+ * where each table contains records with unmodified keys and values
+ * that represent the latest (rolling) count (i.e., number of records) for each key within that window
*/
<W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde);
@@ -523,6 +608,10 @@ public interface KStream<K, V> {
* with default serializers and deserializers.
*
* @param windows the specification of the aggregation {@link Windows}
+ *
+ * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+ * where each table contains records with unmodified keys and values
+ * that represent the latest (rolling) count (i.e., number of records) for each key within that window
*/
<W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows);
@@ -532,6 +621,8 @@ public interface KStream<K, V> {
* @param keySerde key serdes for materializing the counting table,
* if not specified the default serdes defined in the configs will be used
* @param name the name of the resulted {@link KTable}
+ *
+ * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
*/
KTable<K, Long> countByKey(Serde<K> keySerde, String name);
@@ -540,6 +631,8 @@ public interface KStream<K, V> {
* with default serializers and deserializers.
*
* @param name the name of the resulted {@link KTable}
+ *
+ * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
*/
KTable<K, Long> countByKey(String name);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 159876c..9d90ba0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -45,21 +45,27 @@ public class KStreamBuilder extends TopologyBuilder {
/**
* Create a {@link KStream} instance from the specified topics.
* The default deserializers specified in the config are used.
+ * <p>
+ * If multiple topics are specified there are nor ordering guaranteed for records from different topics.
*
* @param topics the topic names; must contain at least one topic name
+ * @return a {@link KStream} for the specified topics
*/
public <K, V> KStream<K, V> stream(String... topics) {
return stream(null, null, topics);
}
/**
- * Create a {@link KStream} instance for the specified topics.
+ * Create a {@link KStream} instance from the specified topics.
+ * <p>
+ * If multiple topics are specified there are nor ordering guaranteed for records from different topics.
*
* @param keySerde key serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param valSerde value serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param topics the topic names; must contain at least one topic name
+ * @return a {@link KStream} for the specified topics
*/
public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, String... topics) {
String name = newName(KStreamImpl.SOURCE_NAME);
@@ -74,6 +80,7 @@ public class KStreamBuilder extends TopologyBuilder {
* The default deserializers specified in the config are used.
*
* @param topic the topic name; cannot be null
+ * @return a {@link KTable} for the specified topics
*/
public <K, V> KTable<K, V> table(String topic) {
return table(null, null, topic);
@@ -87,6 +94,7 @@ public class KStreamBuilder extends TopologyBuilder {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be null
+ * @return a {@link KTable} for the specified topics
*/
public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic) {
String source = newName(KStreamImpl.SOURCE_NAME);
@@ -102,8 +110,11 @@ public class KStreamBuilder extends TopologyBuilder {
/**
* Create a new instance of {@link KStream} by merging the given streams.
+ * <p>
+ * There are nor ordering guaranteed for records from different streams.
*
* @param streams the instances of {@link KStream} to be merged
+ * @return a {@link KStream} containing all records of the given streams
*/
public <K, V> KStream<K, V> merge(KStream<K, V>... streams) {
return KStreamImpl.merge(this, streams);
@@ -114,6 +125,7 @@ public class KStreamBuilder extends TopologyBuilder {
* This function is only for internal usage.
*
* @param prefix processor name prefix
+ * @return a new unique name
*/
public String newName(String prefix) {
return prefix + String.format("%010d", index.getAndIncrement());
http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 4ff9b48..cc5a521 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -24,9 +24,18 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
/**
* {@link KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
+ * Each record in this stream is an update on the primary-keyed table with the record key as the primary key.
+ * <p>
+ * A {@link KTable} is either defined from one or multiple Kafka topics that are consumed message by message or
+ * the result of a {@link KTable} transformation. An aggregation of a {@link KStream} also yields a {@link KTable}.
+ * <p>
+ * A {@link KTable} can be transformed record by record, joined with another {@link KTable} or {@link KStream}, or
+ * can be re-partitioned and aggregated into a new {@link KTable}.
*
* @param <K> Type of primary keys
* @param <V> Type of value changes
+ *
+ * @see KStream
*/
@InterfaceStability.Unstable
public interface KTable<K, V> {
@@ -35,6 +44,8 @@ public interface KTable<K, V> {
* Create a new instance of {@link KTable} that consists of all elements of this stream which satisfy a predicate.
*
* @param predicate the instance of {@link Predicate}
+ *
+ * @return a {@link KTable} that contains only those records that satisfy the given predicate
*/
KTable<K, V> filter(Predicate<K, V> predicate);
@@ -42,6 +53,8 @@ public interface KTable<K, V> {
* Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate.
*
* @param predicate the instance of {@link Predicate}
+ *
+ * @return a {@link KTable} that contains only those records that do not satisfy the given predicate
*/
KTable<K, V> filterNot(Predicate<K, V> predicate);
@@ -50,6 +63,8 @@ public interface KTable<K, V> {
*
* @param mapper the instance of {@link ValueMapper}
* @param <V1> the value type of the new stream
+ *
+ * @return a {@link KTable} that contains records with unmodified keys and new values of different type
*/
<V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper);
@@ -103,6 +118,8 @@ public interface KTable<K, V> {
* This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}.
*
* @param topic the topic name
+ *
+ * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
*/
KTable<K, V> through(String topic);
@@ -114,6 +131,8 @@ public interface KTable<K, V> {
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
* @param topic the topic name
+ *
+ * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
*/
KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic);
@@ -130,6 +149,8 @@ public interface KTable<K, V> {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name
+ *
+ * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
*/
KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
@@ -148,6 +169,8 @@ public interface KTable<K, V> {
* {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
* — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
* @param topic the topic name
+ *
+ * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
*/
KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
@@ -200,6 +223,10 @@ public interface KTable<K, V> {
/**
* Convert this stream to a new instance of {@link KStream}.
+ *
+ * @return a {@link KStream} that contains the same records as this {@link KTable};
+ * the records are no longer treated as updates on a primary-keyed table,
+ * but rather as normal key-value pairs in a record stream
*/
KStream<K, V> toStream();
@@ -209,6 +236,11 @@ public interface KTable<K, V> {
*
* @param mapper @param mapper the instance of {@link KeyValueMapper}
* @param <K1> the new key type
+ *
+ * @return a {@link KStream} that contains records with new keys of different type for each update of this {@link KTable}
+ * @return a {@link KStream} that contains the transformed records from this {@link KTable};
+ * the records are no longer treated as updates on a primary-keyed table,
+ * but rather as normal key-value pairs in a record stream
*/
<K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> mapper);
@@ -219,6 +251,9 @@ public interface KTable<K, V> {
* @param joiner the instance of {@link ValueJoiner}
* @param <V1> the value type of the other stream
* @param <R> the value type of the new stream
+ *
+ * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key
*/
<V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
@@ -229,6 +264,9 @@ public interface KTable<K, V> {
* @param joiner the instance of {@link ValueJoiner}
* @param <V1> the value type of the other stream
* @param <R> the value type of the new stream
+ *
+ * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key
*/
<V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
@@ -239,6 +277,9 @@ public interface KTable<K, V> {
* @param joiner the instance of {@link ValueJoiner}
* @param <V1> the value type of the other stream
* @param <R> the value type of the new stream
+ *
+ * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key
*/
<V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
@@ -252,6 +293,8 @@ public interface KTable<K, V> {
* if not specified the default serdes defined in the configs will be used
* @param <K1> the key type of the {@link KGroupedTable}
* @param <V1> the value type of the {@link KGroupedTable}
+ *
+ * @return a {@link KGroupedTable} that contains the re-partitioned records of this {@link KTable}
*/
<K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde);
@@ -261,6 +304,8 @@ public interface KTable<K, V> {
* @param selector select the grouping key and value to be aggregated
* @param <K1> the key type of the {@link KGroupedTable}
* @param <V1> the value type of the {@link KGroupedTable}
+ *
+ * @return a {@link KGroupedTable} that contains the re-partitioned records of this {@link KTable}
*/
<K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector);
@@ -268,7 +313,7 @@ public interface KTable<K, V> {
* Perform an action on each element of {@link KTable}.
* Note that this is a terminal operation that returns void.
*
- * @param action An action to perform on each element
+ * @param action an action to perform on each element
*/
void foreach(ForeachAction<K, V> action);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index b07e510..f79e6f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -51,7 +51,9 @@ public interface StateStore {
void close();
/**
- * If the storage is persistent
+ * Return if the storage is persistent or not.
+ *
+ * @return {@code true} if the storage is persistent—{@code false} otherwise
*/
boolean persistent();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index e400cef..079a2b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -45,6 +45,8 @@ public interface WindowStore<K, V> extends StateStore {
/**
* Get all the key-value pairs with the given key and the time range from all
* the existing windows.
+ *
+ * @return an iterator over key-value pairs {@code <timestamp, value>}
*/
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
}