You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:06:01 UTC

[47/50] [abbrv] kafka git commit: KAFKA-3440: Update streams javadocs

KAFKA-3440: Update streams javadocs

- add class doc for KTable, KStream, JoinWindows
- add missing return tags

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>, Michael G. Noll <mi...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1287 from mjsax/kafka-3440-JavaDoc


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3414d561
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3414d561
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3414d561

Branch: refs/heads/0.10.0
Commit: 3414d56121d8d2f66f8dd613453af71d5b3f0c5f
Parents: 69d9a66
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Apr 29 12:50:02 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Apr 29 12:50:02 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/JoinWindows.java      | 18 ++++
 .../kafka/streams/kstream/KGroupedTable.java    | 14 ++-
 .../apache/kafka/streams/kstream/KStream.java   | 95 +++++++++++++++++++-
 .../kafka/streams/kstream/KStreamBuilder.java   | 14 ++-
 .../apache/kafka/streams/kstream/KTable.java    | 47 +++++++++-
 .../kafka/streams/processor/StateStore.java     |  4 +-
 .../apache/kafka/streams/state/WindowStore.java |  2 +
 7 files changed, 189 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index a6d5603..f45c064 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -23,6 +23,24 @@ import java.util.Map;
 
 /**
  * The window specifications used for joins.
+ * <p>
+ * A {@link JoinWindows} instance defines a join over two stream on the same key and a maximum time difference.
+ * In SQL-style you would express this join as
+ * <pre>
+ *     SELECT * FROM stream1, stream2
+ *     WHERE
+ *       stream1.key = stream2.key
+ *       AND
+ *       stream2.ts - before <= stream1.ts <= stream2.ts + after
+ * </pre>
+ * There are three different window configuration supported:
+ * <ul>
+ *     <li>before = after = time-difference</li>
+ *     <li>before = 0 and after = time-difference</li>
+ *     <li>before = time-difference and after = 0</li>
+ * </ul>
+ * A join is symmetric in the sense, that a join specification on the first stream returns the same result record as
+ * a join specification on the second stream with flipped before and after values.
  */
 public class JoinWindows extends Windows<TimeWindow> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 86c34b1..2ebad87 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -21,7 +21,11 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 
 /**
- * {@link KGroupedTable} is an abstraction of a <i>grouped changelog stream</i> from a primary-keyed table.
+ * {@link KGroupedTable} is an abstraction of a <i>grouped changelog stream</i> from a primary-keyed table,
+ * usually on a different grouping key than the original primary key.
+ * <p>
+ * It is an intermediate representation after a re-grouping of a {@link KTable} before an aggregation is applied
+ * to the new partitions resulting in a new {@link KTable}.
  *
  * @param <K> Type of primary keys
  * @param <V> Type of value changes
@@ -35,6 +39,8 @@ public interface KGroupedTable<K, V> {
      * @param adder         the instance of {@link Reducer} for addition
      * @param subtractor    the instance of {@link Reducer} for subtraction
      * @param name          the name of the resulted {@link KTable}
+     * @return a {@link KTable} with the same key and value types as this {@link KGroupedTable},
+     *         containing aggregated values for each key
      */
     KTable<K, V> reduce(Reducer<V> adder,
                         Reducer<V> subtractor,
@@ -50,6 +56,8 @@ public interface KGroupedTable<K, V> {
      *                      if not specified the default serdes defined in the configs will be used
      * @param name          the name of the resulted table
      * @param <T>           the value type of the aggregated {@link KTable}
+     * @return a {@link KTable} with same key and aggregated value type {@code T},
+     *         containing aggregated values for each key
      */
     <T> KTable<K, T> aggregate(Initializer<T> initializer,
                                Aggregator<K, V, T> adder,
@@ -66,6 +74,8 @@ public interface KGroupedTable<K, V> {
      * @param substractor   the instance of {@link Aggregator} for subtraction
      * @param name          the name of the resulted {@link KTable}
      * @param <T>           the value type of the aggregated {@link KTable}
+     * @return a {@link KTable} with same key and aggregated value type {@code T},
+     *         containing aggregated values for each key
      */
     <T> KTable<K, T> aggregate(Initializer<T> initializer,
                                Aggregator<K, V, T> adder,
@@ -76,6 +86,8 @@ public interface KGroupedTable<K, V> {
      * Count number of records of this stream by the selected key into a new instance of {@link KTable}.
      *
      * @param name          the name of the resulted {@link KTable}
+     * @return a {@link KTable} with same key and {@link Long} value type as this {@link KGroupedTable},
+     *         containing the number of values for each key
      */
     KTable<K, Long> count(String name);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 6df2deb..a1ecfa4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -25,9 +25,17 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 
 /**
  * {@link KStream} is an abstraction of a <i>record stream</i> of key-value pairs.
+ * <p>
+ * A {@link KStream} is either defined from one or multiple Kafka topics that are consumed message by message or
+ * the result of a {@link KStream} transformation. A {@link KTable} can also be converted into a {@link KStream}.
+ * <p>
+ * A {@link KStream} can be transformed record by record, joined with another {@link KStream} or {@link KTable}, or
+ * can be aggregated into a {@link KTable}.
  *
  * @param <K> Type of keys
  * @param <V> Type of values
+ *
+ * @see KTable
  */
 @InterfaceStability.Unstable
 public interface KStream<K, V> {
@@ -36,6 +44,8 @@ public interface KStream<K, V> {
      * Create a new instance of {@link KStream} that consists of all elements of this stream which satisfy a predicate.
      *
      * @param predicate     the instance of {@link Predicate}
+     *
+     * @return a {@link KStream} that contains only those records that satisfy the given predicate
      */
     KStream<K, V> filter(Predicate<K, V> predicate);
 
@@ -43,6 +53,8 @@ public interface KStream<K, V> {
      * Create a new instance of {@link KStream} that consists all elements of this stream which do not satisfy a predicate.
      *
      * @param predicate     the instance of {@link Predicate}
+     *
+     * @return a {@link KStream} that contains only those records that do not satisfy the given predicate
      */
     KStream<K, V> filterNot(Predicate<K, V> predicate);
 
@@ -52,6 +64,8 @@ public interface KStream<K, V> {
      *
      * @param mapper  the instance of {@link KeyValueMapper}
      * @param <K1>   the new key type on the stream
+     *
+     * @return a {@link KStream} that contains records with different key type and same value type
      */
     <K1> KStream<K1, V> selectKey(KeyValueMapper<K, V, K1> mapper);
 
@@ -61,6 +75,8 @@ public interface KStream<K, V> {
      * @param mapper        the instance of {@link KeyValueMapper}
      * @param <K1>          the key type of the new stream
      * @param <V1>          the value type of the new stream
+     *
+     * @return a {@link KStream} that contains records with new key and value type
      */
     <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper);
 
@@ -69,6 +85,8 @@ public interface KStream<K, V> {
      *
      * @param mapper        the instance of {@link ValueMapper}
      * @param <V1>          the value type of the new stream
+     *
+     * @return a {@link KStream} that contains records with unmodified keys and new values of different type
      */
     <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
@@ -124,6 +142,8 @@ public interface KStream<K, V> {
      * @param mapper        the instance of {@link KeyValueMapper}
      * @param <K1>          the key type of the new stream
      * @param <V1>          the value type of the new stream
+     *
+     * @return a {@link KStream} that contains more or less records with new key and value type
      */
     <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper);
 
@@ -132,6 +152,8 @@ public interface KStream<K, V> {
      *
      * @param processor     the instance of {@link ValueMapper}
      * @param <V1>          the value type of the new stream
+     *
+     * @return a {@link KStream} that contains more or less records with unmodified keys and new values of different type
      */
     <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor);
 
@@ -143,6 +165,8 @@ public interface KStream<K, V> {
      * assigned to this stream only. An element will be dropped if none of the predicates evaluate to true.
      *
      * @param predicates    the ordered list of {@link Predicate} instances
+     *
+     * @return multiple distinct substreams of this {@link KStream}
      */
     KStream<K, V>[] branch(Predicate<K, V>... predicates);
 
@@ -152,6 +176,8 @@ public interface KStream<K, V> {
      * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
      *
      * @param topic     the topic name
+     *
+     * @return a {@link KStream} that contains the exact same records as this {@link KStream}
      */
     KStream<K, V> through(String topic);
 
@@ -159,7 +185,7 @@ public interface KStream<K, V> {
      * Perform an action on each element of {@link KStream}.
      * Note that this is a terminal operation that returns void.
      *
-     * @param action An action to perform on each element
+     * @param action an action to perform on each element
      */
     void foreach(ForeachAction<K, V> action);
 
@@ -171,6 +197,8 @@ public interface KStream<K, V> {
      * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
      *                     if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
      * @param topic        the topic name
+     *
+     * @return a {@link KStream} that contains the exact same records as this {@link KStream}
      */
     KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic);
 
@@ -187,6 +215,8 @@ public interface KStream<K, V> {
      * @param valSerde     value serde used to send key-value pairs,
      *                     if not specified the default value serde defined in the configuration will be used
      * @param topic        the topic name
+     *
+     * @return a {@link KStream} that contains the exact same records as this {@link KStream}
      */
     KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
@@ -205,6 +235,8 @@ public interface KStream<K, V> {
      *                     {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
      *                     &mdash; otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
      * @param topic        the topic name
+     *
+     * @return a {@link KStream} that contains the exact same records as this {@link KStream}
      */
     KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
 
@@ -260,6 +292,8 @@ public interface KStream<K, V> {
      *
      * @param transformerSupplier   the instance of {@link TransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.Transformer}
      * @param stateStoreNames       the names of the state store used by the processor
+     *
+     * @return a new {@link KStream} with transformed key and value types
      */
     <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames);
 
@@ -268,6 +302,8 @@ public interface KStream<K, V> {
      *
      * @param valueTransformerSupplier  the instance of {@link ValueTransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.ValueTransformer}
      * @param stateStoreNames           the names of the state store used by the processor
+     *
+     * @return a {@link KStream} that contains records with unmodified keys and transformed values with type {@code R}
      */
     <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
 
@@ -293,6 +329,9 @@ public interface KStream<K, V> {
      *                          if not specified the default serdes defined in the configs will be used
      * @param <V1>              the value type of the other stream
      * @param <R>               the value type of the new stream
+     *
+     * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     *         one for each matched record-pair with the same key and within the joining window intervals
      */
     <V1, R> KStream<K, R> join(
             KStream<K, V1> otherStream,
@@ -311,6 +350,9 @@ public interface KStream<K, V> {
      * @param windows       the specification of the {@link JoinWindows}
      * @param <V1>          the value type of the other stream
      * @param <R>           the value type of the new stream
+     *
+     * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     *         one for each matched record-pair with the same key and within the joining window intervals
      */
     <V1, R> KStream<K, R> join(
             KStream<K, V1> otherStream,
@@ -331,6 +373,9 @@ public interface KStream<K, V> {
      *                          if not specified the default serdes defined in the configs will be used
      * @param <V1>              the value type of the other stream
      * @param <R>               the value type of the new stream
+     *
+     * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     *         one for each matched record-pair with the same key and within the joining window intervals
      */
     <V1, R> KStream<K, R> outerJoin(
             KStream<K, V1> otherStream,
@@ -349,6 +394,9 @@ public interface KStream<K, V> {
      * @param windows       the specification of the {@link JoinWindows}
      * @param <V1>          the value type of the other stream
      * @param <R>           the value type of the new stream
+     *
+     * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     *         one for each matched record-pair with the same key and within the joining window intervals
      */
     <V1, R> KStream<K, R> outerJoin(
             KStream<K, V1> otherStream,
@@ -367,6 +415,9 @@ public interface KStream<K, V> {
      *                          if not specified the default serdes defined in the configs will be used
      * @param <V1>              the value type of the other stream
      * @param <R>               the value type of the new stream
+     *
+     * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     *         one for each matched record-pair with the same key and within the joining window intervals
      */
     <V1, R> KStream<K, R> leftJoin(
             KStream<K, V1> otherStream,
@@ -384,6 +435,9 @@ public interface KStream<K, V> {
      * @param windows       the specification of the {@link JoinWindows}
      * @param <V1>          the value type of the other stream
      * @param <R>           the value type of the new stream
+     *
+     * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     *         one for each matched record-pair with the same key and within the joining window intervals
      */
     <V1, R> KStream<K, R> leftJoin(
             KStream<K, V1> otherStream,
@@ -397,6 +451,9 @@ public interface KStream<K, V> {
      * @param joiner    the instance of {@link ValueJoiner}
      * @param <V1>      the value type of the table
      * @param <V2>      the value type of the new stream
+     *
+     * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     *         one for each matched record-pair with the same key and within the joining window intervals
      */
     <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> table, ValueJoiner<V, V1, V2> joiner);
 
@@ -409,6 +466,10 @@ public interface KStream<K, V> {
      *                          if not specified the default serdes defined in the configs will be used
      * @param valueSerde        value serdes for materializing the aggregated table,
      *                          if not specified the default serdes defined in the configs will be used
+     *
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) aggregate for each key within that window
      */
     <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
                                                           Windows<W> windows,
@@ -421,6 +482,10 @@ public interface KStream<K, V> {
      *
      * @param reducer the instance of {@link Reducer}
      * @param windows the specification of the aggregation {@link Windows}
+     *
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) aggregate for each key within that window
      */
     <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows);
 
@@ -433,6 +498,8 @@ public interface KStream<K, V> {
      * @param valueSerde        value serdes for materializing the aggregated table,
      *                          if not specified the default serdes defined in the configs will be used
      * @param name              the name of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
      */
     KTable<K, V> reduceByKey(Reducer<V> reducer,
                              Serde<K> keySerde,
@@ -444,6 +511,8 @@ public interface KStream<K, V> {
      *
      * @param reducer the instance of {@link Reducer}
      * @param name    the name of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
      */
     KTable<K, V> reduceByKey(Reducer<V> reducer, String name);
 
@@ -458,6 +527,10 @@ public interface KStream<K, V> {
      * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will be used
      * @param <T>           the value type of the resulted {@link KTable}
+     *
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values with type {@code T}
+     *         that represent the latest (rolling) aggregate for each key within that window
      */
     <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
                                                                 Aggregator<K, V, T> aggregator,
@@ -473,6 +546,10 @@ public interface KStream<K, V> {
      * @param aggregator    the instance of {@link Aggregator}
      * @param windows       the specification of the aggregation {@link Windows}
      * @param <T>           the value type of the resulted {@link KTable}
+     *
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values with type {@code T}
+     *         that represent the latest (rolling) aggregate for each key within that window
      */
     <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
                                                                 Aggregator<K, V, T> aggregator,
@@ -489,6 +566,8 @@ public interface KStream<K, V> {
      *                      if not specified the default serdes defined in the configs will be used
      * @param name          the name of the resulted {@link KTable}
      * @param <T>           the value type of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and values (of different type) that represent the latest (rolling) aggregate for each key
      */
     <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
                                     Aggregator<K, V, T> aggregator,
@@ -504,6 +583,8 @@ public interface KStream<K, V> {
      * @param aggregator    the class of {@link Aggregator}
      * @param name          the name of the resulted {@link KTable}
      * @param <T>           the value type of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and values (of different type) that represent the latest (rolling) aggregate for each key
      */
     <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
                                     Aggregator<K, V, T> aggregator,
@@ -515,6 +596,10 @@ public interface KStream<K, V> {
      * @param windows       the specification of the aggregation {@link Windows}
      * @param keySerde      key serdes for materializing the counting table,
      *                      if not specified the default serdes defined in the configs will be used
+     *
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
      */
     <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde);
 
@@ -523,6 +608,10 @@ public interface KStream<K, V> {
      * with default serializers and deserializers.
      *
      * @param windows       the specification of the aggregation {@link Windows}
+     *
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
      */
     <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows);
 
@@ -532,6 +621,8 @@ public interface KStream<K, V> {
      * @param keySerde      key serdes for materializing the counting table,
      *                      if not specified the default serdes defined in the configs will be used
      * @param name          the name of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
      */
     KTable<K, Long> countByKey(Serde<K> keySerde, String name);
 
@@ -540,6 +631,8 @@ public interface KStream<K, V> {
      * with default serializers and deserializers.
      *
      * @param name          the name of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
      */
     KTable<K, Long> countByKey(String name);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 159876c..9d90ba0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -45,21 +45,27 @@ public class KStreamBuilder extends TopologyBuilder {
     /**
      * Create a {@link KStream} instance from the specified topics.
      * The default deserializers specified in the config are used.
+     * <p>
+     * If multiple topics are specified there are nor ordering guaranteed for records from different topics.
      *
      * @param topics    the topic names; must contain at least one topic name
+     * @return a {@link KStream} for the specified topics
      */
     public <K, V> KStream<K, V> stream(String... topics) {
         return stream(null, null, topics);
     }
 
     /**
-     * Create a {@link KStream} instance for the specified topics.
+     * Create a {@link KStream} instance from the specified topics.
+     * <p>
+     * If multiple topics are specified there are nor ordering guaranteed for records from different topics.
      *
      * @param keySerde  key serde used to read this source {@link KStream},
      *                  if not specified the default serde defined in the configs will be used
      * @param valSerde  value serde used to read this source {@link KStream},
      *                  if not specified the default serde defined in the configs will be used
      * @param topics    the topic names; must contain at least one topic name
+     * @return a {@link KStream} for the specified topics
      */
     public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, String... topics) {
         String name = newName(KStreamImpl.SOURCE_NAME);
@@ -74,6 +80,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * The default deserializers specified in the config are used.
      *
      * @param topic     the topic name; cannot be null
+     * @return a {@link KTable} for the specified topics
      */
     public <K, V> KTable<K, V> table(String topic) {
         return table(null, null, topic);
@@ -87,6 +94,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param valSerde   value serde used to send key-value pairs,
      *                   if not specified the default value serde defined in the configuration will be used
      * @param topic      the topic name; cannot be null
+     * @return a {@link KTable} for the specified topics
      */
     public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic) {
         String source = newName(KStreamImpl.SOURCE_NAME);
@@ -102,8 +110,11 @@ public class KStreamBuilder extends TopologyBuilder {
 
     /**
      * Create a new instance of {@link KStream} by merging the given streams.
+     * <p>
+     * There are nor ordering guaranteed for records from different streams.
      *
      * @param streams   the instances of {@link KStream} to be merged
+     * @return a {@link KStream} containing all records of the given streams
      */
     public <K, V> KStream<K, V> merge(KStream<K, V>... streams) {
         return KStreamImpl.merge(this, streams);
@@ -114,6 +125,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * This function is only for internal usage.
      *
      * @param prefix    processor name prefix
+     * @return a new unique name
      */
     public String newName(String prefix) {
         return prefix + String.format("%010d", index.getAndIncrement());

http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 4ff9b48..cc5a521 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -24,9 +24,18 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 
 /**
  * {@link KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
+ * Each record in this stream is an update on the primary-keyed table with the record key as the primary key.
+ * <p>
+ * A {@link KTable} is either defined from one or multiple Kafka topics that are consumed message by message or
+ * the result of a {@link KTable} transformation. An aggregation of a {@link KStream} also yields a {@link KTable}.
+ * <p>
+ * A {@link KTable} can be transformed record by record, joined with another {@link KTable} or {@link KStream}, or
+ * can be re-partitioned and aggregated into a new {@link KTable}.
  *
  * @param <K> Type of primary keys
  * @param <V> Type of value changes
+ *
+ * @see KStream
  */
 @InterfaceStability.Unstable
 public interface KTable<K, V> {
@@ -35,6 +44,8 @@ public interface KTable<K, V> {
      * Create a new instance of {@link KTable} that consists of all elements of this stream which satisfy a predicate.
      *
      * @param predicate     the instance of {@link Predicate}
+     *
+     * @return a {@link KTable} that contains only those records that satisfy the given predicate
      */
     KTable<K, V> filter(Predicate<K, V> predicate);
 
@@ -42,6 +53,8 @@ public interface KTable<K, V> {
      * Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate.
      *
      * @param predicate     the instance of {@link Predicate}
+     *
+     * @return a {@link KTable} that contains only those records that do not satisfy the given predicate
      */
     KTable<K, V> filterNot(Predicate<K, V> predicate);
 
@@ -50,6 +63,8 @@ public interface KTable<K, V> {
      *
      * @param mapper        the instance of {@link ValueMapper}
      * @param <V1>          the value type of the new stream
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and new values of different type
      */
     <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
@@ -103,6 +118,8 @@ public interface KTable<K, V> {
      * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}.
      *
      * @param topic         the topic name
+     *
+     * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
      */
     KTable<K, V> through(String topic);
 
@@ -114,6 +131,8 @@ public interface KTable<K, V> {
      * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
      *                     if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
      * @param topic        the topic name
+     *
+     * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
      */
     KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic);
 
@@ -130,6 +149,8 @@ public interface KTable<K, V> {
      * @param valSerde     value serde used to send key-value pairs,
      *                     if not specified the default value serde defined in the configuration will be used
      * @param topic        the topic name
+     *
+     * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
      */
     KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
@@ -148,6 +169,8 @@ public interface KTable<K, V> {
      *                     {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
      *                     &mdash; otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
      * @param topic        the topic name
+     *
+     * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
      */
     KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
 
@@ -200,6 +223,10 @@ public interface KTable<K, V> {
 
     /**
      * Convert this stream to a new instance of {@link KStream}.
+     *
+     * @return a {@link KStream} that contains the same records as this {@link KTable};
+     *         the records are no longer treated as updates on a primary-keyed table,
+     *         but rather as normal key-value pairs in a record stream
      */
     KStream<K, V> toStream();
 
@@ -209,6 +236,11 @@ public interface KTable<K, V> {
      *
      * @param mapper  @param mapper  the instance of {@link KeyValueMapper}
      * @param <K1> the new key type
+     *
+     * @return a {@link KStream} that contains records with new keys of different type for each update of this {@link KTable}
+     * @return a {@link KStream} that contains the transformed records from this {@link KTable};
+     *         the records are no longer treated as updates on a primary-keyed table,
+     *         but rather as normal key-value pairs in a record stream
      */
     <K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> mapper);
 
@@ -219,6 +251,9 @@ public interface KTable<K, V> {
      * @param joiner        the instance of {@link ValueJoiner}
      * @param <V1>          the value type of the other stream
      * @param <R>           the value type of the new stream
+     *
+     * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     *         one for each matched record-pair with the same key
      */
     <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
@@ -229,6 +264,9 @@ public interface KTable<K, V> {
      * @param joiner        the instance of {@link ValueJoiner}
      * @param <V1>          the value type of the other stream
      * @param <R>           the value type of the new stream
+     *
+     * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     *         one for each matched record-pair with the same key
      */
     <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
@@ -239,6 +277,9 @@ public interface KTable<K, V> {
      * @param joiner        the instance of {@link ValueJoiner}
      * @param <V1>          the value type of the other stream
      * @param <R>           the value type of the new stream
+     *
+     * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     *         one for each matched record-pair with the same key
      */
     <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
@@ -252,6 +293,8 @@ public interface KTable<K, V> {
      *                      if not specified the default serdes defined in the configs will be used
      * @param <K1>          the key type of the {@link KGroupedTable}
      * @param <V1>          the value type of the {@link KGroupedTable}
+     *
+     * @return a {@link KGroupedTable} that contains the re-partitioned records of this {@link KTable}
      */
     <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde);
 
@@ -261,6 +304,8 @@ public interface KTable<K, V> {
      * @param selector      select the grouping key and value to be aggregated
      * @param <K1>          the key type of the {@link KGroupedTable}
      * @param <V1>          the value type of the {@link KGroupedTable}
+     *
+     * @return a {@link KGroupedTable} that contains the re-partitioned records of this {@link KTable}
      */
     <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector);
 
@@ -268,7 +313,7 @@ public interface KTable<K, V> {
      * Perform an action on each element of {@link KTable}.
      * Note that this is a terminal operation that returns void.
      *
-     * @param action An action to perform on each element
+     * @param action an action to perform on each element
      */
     void foreach(ForeachAction<K, V> action);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index b07e510..f79e6f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -51,7 +51,9 @@ public interface StateStore {
     void close();
 
     /**
-     * If the storage is persistent
+     * Return if the storage is persistent or not.
+     *
+     * @return  {@code true} if the storage is persistent&mdash;{@code false} otherwise
      */
     boolean persistent();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index e400cef..079a2b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -45,6 +45,8 @@ public interface WindowStore<K, V> extends StateStore {
     /**
      * Get all the key-value pairs with the given key and the time range from all
      * the existing windows.
+     *
+     * @return an iterator over key-value pairs {@code <timestamp, value>}
      */
     WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
 }