You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:54 UTC

[40/50] [abbrv] kafka git commit: KAFKA-3598: Improve JavaDoc of public API

KAFKA-3598: Improve JavaDoc of public API

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Michael G. Noll, Guozhang Wang

Closes #1250 from mjsax/JavaDoc-publicAPI


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4ab4e4af
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4ab4e4af
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4ab4e4af

Branch: refs/heads/0.10.0
Commit: 4ab4e4af814fb791fe6e8c2bd3381da8ca80b0b5
Parents: 68433dc
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Apr 29 08:49:16 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Apr 29 08:49:16 2016 -0700

----------------------------------------------------------------------
 .../examples/pageview/PageViewTypedDemo.java    |  4 +-
 .../examples/pageview/PageViewUntypedDemo.java  |  2 +-
 .../org/apache/kafka/streams/KafkaStreams.java  | 10 ++-
 .../java/org/apache/kafka/streams/KeyValue.java | 17 +++++
 .../kafka/streams/kstream/Aggregator.java       | 10 ++-
 .../kafka/streams/kstream/ForeachAction.java    | 10 ++-
 .../kafka/streams/kstream/Initializer.java      |  7 +-
 .../kafka/streams/kstream/JoinWindows.java      | 21 +++---
 .../apache/kafka/streams/kstream/KStream.java   | 10 +--
 .../kafka/streams/kstream/KStreamBuilder.java   | 15 ++--
 .../apache/kafka/streams/kstream/KTable.java    | 19 ++---
 .../kafka/streams/kstream/KeyValueMapper.java   |  9 ++-
 .../apache/kafka/streams/kstream/Predicate.java |  9 ++-
 .../apache/kafka/streams/kstream/Reducer.java   |  9 ++-
 .../kafka/streams/kstream/TimeWindows.java      | 18 +++--
 .../kafka/streams/kstream/Transformer.java      |  8 +-
 .../streams/kstream/TransformerSupplier.java    |  7 +-
 .../kafka/streams/kstream/UnlimitedWindows.java |  9 ++-
 .../kafka/streams/kstream/ValueJoiner.java      |  9 ++-
 .../kafka/streams/kstream/ValueMapper.java      |  8 +-
 .../kafka/streams/kstream/ValueTransformer.java |  8 +-
 .../kstream/ValueTransformerSupplier.java       |  7 +-
 .../apache/kafka/streams/kstream/Window.java    | 18 ++++-
 .../apache/kafka/streams/kstream/Windowed.java  | 30 +++++---
 .../apache/kafka/streams/kstream/Windows.java   | 37 ++++-----
 .../internals/KStreamWindowAggregate.java       |  2 +-
 .../kstream/internals/KStreamWindowReduce.java  |  2 +-
 .../kstream/internals/WindowedSerializer.java   |  4 +-
 .../internals/WindowedStreamPartitioner.java    |  6 +-
 .../ConsumerRecordTimestampExtractor.java       |  3 +-
 .../kafka/streams/processor/Processor.java      |  9 ++-
 .../streams/processor/ProcessorSupplier.java    |  5 ++
 .../streams/processor/StateStoreSupplier.java   | 10 +++
 .../streams/processor/StreamPartitioner.java    | 20 ++---
 .../apache/kafka/streams/processor/TaskId.java  |  6 +-
 .../streams/processor/TimestampExtractor.java   |  8 +-
 .../streams/processor/TopologyBuilder.java      | 64 ++++++++--------
 .../processor/WallclockTimestampExtractor.java  |  9 ++-
 .../processor/internals/StreamThread.java       |  2 +-
 .../apache/kafka/streams/state/StateSerdes.java | 79 ++++++++++++++++++--
 .../apache/kafka/streams/state/WindowStore.java |  2 +-
 .../streams/smoketest/SmokeTestClient.java      |  2 +-
 .../kafka/streams/smoketest/SmokeTestUtil.java  |  2 +-
 43 files changed, 383 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 39ec41f..e53b037 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -168,10 +168,10 @@ public class PageViewTypedDemo {
                     public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
                         WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
                         wViewByRegion.windowStart = key.window().start();
-                        wViewByRegion.region = key.value();
+                        wViewByRegion.region = key.key();
 
                         RegionCount rCount = new RegionCount();
-                        rCount.region = key.value();
+                        rCount.region = key.key();
                         rCount.count = value;
 
                         return new KeyValue<>(wViewByRegion, rCount);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 9a41b9e..8a0af6c 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -107,7 +107,7 @@ public class PageViewUntypedDemo {
                     public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
                         ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
                         keyNode.put("window-start", key.window().start())
-                                .put("region", key.value());
+                                .put("region", key.key());
 
                         ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
                         valueNode.put("count", value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 4d1306d..45024f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -97,6 +97,12 @@ public class KafkaStreams {
     // usage only and should not be exposed to users at all.
     private final UUID processId;
 
+    /**
+     * Construct the stream instance.
+     *
+     * @param builder  the processor topology builder specifying the computational logic
+     * @param props    properties for the {@link StreamsConfig}
+     */
     public KafkaStreams(TopologyBuilder builder, Properties props) {
         this(builder, new StreamsConfig(props));
     }
@@ -104,8 +110,8 @@ public class KafkaStreams {
     /**
      * Construct the stream instance.
      *
-     * @param builder The processor topology builder specifying the computational logic
-     * @param config The stream configs
+     * @param builder  the processor topology builder specifying the computational logic
+     * @param config   the stream configs
      */
     public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
         // create the metrics

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
index ca86fc4..58f2083 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -29,14 +29,31 @@ import java.util.Objects;
  */
 public class KeyValue<K, V> {
 
+    /** The key of the key-value pair. */
     public final K key;
+    /** The value of the key-value pair. */
     public final V value;
 
+    /**
+     * Create a new key-value pair.
+     *
+     * @param key    the key
+     * @param value  the value
+     */
     public KeyValue(K key, V value) {
         this.key = key;
         this.value = value;
     }
 
+    /**
+     * Create a new key-value pair.
+     *
+     * @param key    the key
+     * @param value  the value
+     * @param <K>    the type of the key
+     * @param <V>    the type of the value
+     * @return       a new key value pair
+     */
     public static <K, V> KeyValue<K, V> pair(K key, V value) {
         return new KeyValue<>(key, value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index 9ec9f96..989d89f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.streams.kstream;
 
 /**
- * The Aggregator interface for aggregating values of the given key.
+ * The {@link Aggregator} interface for aggregating values of the given key.
  *
  * @param <K>   key type
  * @param <V>   original value type
@@ -26,5 +26,13 @@ package org.apache.kafka.streams.kstream;
  */
 public interface Aggregator<K, V, T> {
 
+    /**
+     * Compute a new aggregate from the key and value of a record and the current aggregate of the same key.
+     *
+     * @param aggKey     the key of the record
+     * @param value      the value of the record
+     * @param aggregate  the current aggregate value
+     * @return           the new aggregate value
+     */
     T apply(K aggKey, V value, T aggregate);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
index 83064e8..b3e3169 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
@@ -18,9 +18,8 @@
 package org.apache.kafka.streams.kstream;
 
 
-
 /**
- * The ForeachAction interface for performing an action on a key-value pair.
+ * The {@link ForeachAction} interface for performing an action on a key-value pair.
  * Note that this action is stateless. If stateful processing is required, consider
  * using {@link KStream#transform(TransformerSupplier, String...)} or
  * {@link KStream#process(ProcessorSupplier, String...)} instead.
@@ -29,6 +28,13 @@ package org.apache.kafka.streams.kstream;
  * @param <V>   original value type
  */
 public interface ForeachAction<K, V> {
+
+    /**
+     * Perform an action for each record of a stream.
+     *
+     * @param key    the key of the record
+     * @param value  the value of the record
+     */
     void apply(K key, V value);
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
index 67c1c21..39bc40d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
@@ -18,11 +18,16 @@
 package org.apache.kafka.streams.kstream;
 
 /**
- * The Initializer interface for creating an initial value in aggregations.
+ * The {@link Initializer} interface for creating an initial value in aggregations.
  *
  * @param <T>   aggregate value type
  */
 public interface Initializer<T> {
 
+    /**
+     * Return the initial value for an aggregation.
+     *
+     * @return  the initial value for an aggregation
+     */
     T apply();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index a74984a..a6d5603 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -26,7 +26,9 @@ import java.util.Map;
  */
 public class JoinWindows extends Windows<TimeWindow> {
 
+    /** Maximum time difference for tuples that are before the join tuple. */
     public final long before;
+    /** Maximum time difference for tuples that are after the join tuple. */
     public final long after;
 
     private JoinWindows(String name, long before, long after) {
@@ -41,40 +43,41 @@ public class JoinWindows extends Windows<TimeWindow> {
     }
 
     /**
-     * Specifies that records of the same key are joinable if their timestamp stamps are within
-     * timeDifference.
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}.
      *
-     * @param timeDifference    join window interval in milliseconds
+     * @param timeDifference    join window interval
      */
     public JoinWindows within(long timeDifference) {
         return new JoinWindows(this.name, timeDifference, timeDifference);
     }
 
     /**
-     * Specifies that records of the same key are joinable if their timestamp stamps are within
+     * Specifies that records of the same key are joinable if their timestamps are within
      * the join window interval, and if the timestamp of a record from the secondary stream is
      * earlier than or equal to the timestamp of a record from the first stream.
      *
-     * @param timeDifference    join window interval in milliseconds
+     * @param timeDifference    join window interval
      */
     public JoinWindows before(long timeDifference) {
         return new JoinWindows(this.name, timeDifference, this.after);
     }
 
     /**
-     * Specifies that records of the same key are joinable if their timestamp stamps are within
+     * Specifies that records of the same key are joinable if their timestamps are within
      * the join window interval, and if the timestamp of a record from the secondary stream
      * is later than or equal to the timestamp of a record from the first stream.
      *
-     * @param timeDifference    join window interval in milliseconds
+     * @param timeDifference    join window interval
      */
     public JoinWindows after(long timeDifference) {
         return new JoinWindows(this.name, this.before, timeDifference);
     }
 
+    /**
+     * Not supported by {@link JoinWindows}. Throws {@link UnsupportedOperationException}.
+     */
     @Override
     public Map<Long, TimeWindow> windowsFor(long timestamp) {
-        // this function should never be called
         throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
     }
 
@@ -98,4 +101,4 @@ public class JoinWindows extends Windows<TimeWindow> {
         return result;
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 7e3562c..6df2deb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
 /**
- * KStream is an abstraction of a <i>record stream</i> of key-value pairs.
+ * {@link KStream} is an abstraction of a <i>record stream</i> of key-value pairs.
  *
  * @param <K> Type of keys
  * @param <V> Type of values
@@ -510,7 +510,7 @@ public interface KStream<K, V> {
                                     String name);
 
     /**
-     * Count number of messages of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
      *
      * @param windows       the specification of the aggregation {@link Windows}
      * @param keySerde      key serdes for materializing the counting table,
@@ -519,7 +519,7 @@ public interface KStream<K, V> {
     <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde);
 
     /**
-     * Count number of messages of this stream by key on a window basis into a new instance of windowed {@link KTable}
+     * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}
      * with default serializers and deserializers.
      *
      * @param windows       the specification of the aggregation {@link Windows}
@@ -527,7 +527,7 @@ public interface KStream<K, V> {
     <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows);
 
     /**
-     * Count number of messages of this stream by key into a new instance of ever-updating {@link KTable}.
+     * Count number of records of this stream by key into a new instance of ever-updating {@link KTable}.
      *
      * @param keySerde      key serdes for materializing the counting table,
      *                      if not specified the default serdes defined in the configs will be used
@@ -536,7 +536,7 @@ public interface KStream<K, V> {
     KTable<K, Long> countByKey(Serde<K> keySerde, String name);
 
     /**
-     * Count number of messages of this stream by key into a new instance of ever-updating {@link KTable}
+     * Count number of records of this stream by key into a new instance of ever-updating {@link KTable}
      * with default serializers and deserializers.
      *
      * @param name          the name of the resulted {@link KTable}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 6b770b4..159876c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -28,19 +28,22 @@ import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * KStreamBuilder is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL
+ * {@link KStreamBuilder} is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL
  * for users to specify computational logic and translates the given logic to a {@link org.apache.kafka.streams.processor.internals.ProcessorTopology}.
  */
 public class KStreamBuilder extends TopologyBuilder {
 
     private final AtomicInteger index = new AtomicInteger(0);
 
+    /**
+     * Create a new {@link KStreamBuilder} instance.
+     */
     public KStreamBuilder() {
         super();
     }
 
     /**
-     * Creates a {@link KStream} instance from the specified topics.
+     * Create a {@link KStream} instance from the specified topics.
      * The default deserializers specified in the config are used.
      *
      * @param topics    the topic names; must contain at least one topic name
@@ -50,7 +53,7 @@ public class KStreamBuilder extends TopologyBuilder {
     }
 
     /**
-     * Creates a {@link KStream} instance for the specified topics.
+     * Create a {@link KStream} instance for the specified topics.
      *
      * @param keySerde  key serde used to read this source {@link KStream},
      *                  if not specified the default serde defined in the configs will be used
@@ -67,7 +70,7 @@ public class KStreamBuilder extends TopologyBuilder {
     }
 
     /**
-     * Creates a {@link KTable} instance for the specified topic.
+     * Create a {@link KTable} instance for the specified topic.
      * The default deserializers specified in the config are used.
      *
      * @param topic     the topic name; cannot be null
@@ -77,7 +80,7 @@ public class KStreamBuilder extends TopologyBuilder {
     }
 
     /**
-     * Creates a {@link KTable} instance for the specified topic.
+     * Create a {@link KTable} instance for the specified topic.
      *
      * @param keySerde   key serde used to send key-value pairs,
      *                   if not specified the default key serde defined in the configuration will be used
@@ -98,7 +101,7 @@ public class KStreamBuilder extends TopologyBuilder {
     }
 
     /**
-     * Creates a new instance of {@link KStream} by merging the given streams
+     * Create a new instance of {@link KStream} by merging the given streams.
      *
      * @param streams   the instances of {@link KStream} to be merged
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 8414279..4ff9b48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
 /**
- * KTable is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
+ * {@link KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
  *
  * @param <K> Type of primary keys
  * @param <V> Type of value changes
@@ -39,7 +39,7 @@ public interface KTable<K, V> {
     KTable<K, V> filter(Predicate<K, V> predicate);
 
     /**
-     * Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate
+     * Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate.
      *
      * @param predicate     the instance of {@link Predicate}
      */
@@ -55,7 +55,7 @@ public interface KTable<K, V> {
 
 
     /**
-     * Print the elements of this stream to System.out
+     * Print the elements of this stream to {@code System.out}
      *
      * Implementors will need to override toString for keys and values that are not of
      * type String, Integer etc to get meaningful information.
@@ -63,7 +63,7 @@ public interface KTable<K, V> {
     void print();
 
     /**
-     * Print the elements of this stream to System.out
+     * Print the elements of this stream to {@code System.out}
      * @param keySerde key serde used to send key-value pairs,
      *                 if not specified the default serde defined in the configs will be used
      * @param valSerde value serde used to send key-value pairs,
@@ -75,15 +75,16 @@ public interface KTable<K, V> {
     void print(Serde<K> keySerde, Serde<V> valSerde);
 
     /**
-     * Write the elements of this stream to a file at the given path.
+     * Write the elements of this stream to a file at the given path using default serializers and deserializers.
      * @param filePath name of file to write to
      *
-     * Implementors will need to override toString for keys and values that are not of
-     * type String, Integer etc to get meaningful information.
+     * Implementors will need to override {@code toString} for keys and values that are not of
+     * type {@link String}, {@link Integer} etc. to get meaningful information.
      */
     void writeAsText(String filePath);
 
     /**
+     * Write the elements of this stream to a file at the given path.
      *
      * @param filePath name of file to write to
      * @param keySerde key serde used to send key-value pairs,
@@ -91,8 +92,8 @@ public interface KTable<K, V> {
      * @param valSerde value serde used to send key-value pairs,
      *                 if not specified the default serde defined in the configs will be used
      *
-     * Implementors will need to override toString for keys and values that are not of
-     * type String, Integer etc to get meaningful information.
+     * Implementors will need to override {@code toString} for keys and values that are not of
+     * type {@link String}, {@link Integer} etc. to get meaningful information.
      */
     void  writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
index a4aed91..b36ed63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.streams.kstream;
 
 /**
- * The KeyValueMapper interface for mapping a key-value pair to a new value (could be another key-value pair).
+ * The {@link KeyValueMapper} interface for mapping a key-value pair to a new value (could be another key-value pair).
  *
  * @param <K>   original key type
  * @param <V>   original value type
@@ -26,5 +26,12 @@ package org.apache.kafka.streams.kstream;
  */
 public interface KeyValueMapper<K, V, R> {
 
+    /**
+     * Map a record with the given key and value to a new value.
+     *
+     * @param key    the key of the record
+     * @param value  the value of the record
+     * @return       the new value
+     */
     R apply(K key, V value);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
index c90554b..2df2d5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
@@ -18,12 +18,19 @@
 package org.apache.kafka.streams.kstream;
 
 /**
- * The Predicate interface represents a predicate (boolean-valued function) of a key-value pair.
+ * The {@link Predicate} interface represents a predicate (boolean-valued function) of a key-value pair.
  *
  * @param <K>   key type
  * @param <V>   value type
  */
 public interface Predicate<K, V> {
 
+    /**
+     * Test if the record with the given key and value satisfies the predicate.
+     *
+     * @param key    the key of the record
+     * @param value  the value of the record
+     * @return       return {@code true} if the key-value pair satisfies the predicate&mdash;{@code false} otherwise
+     */
     boolean test(K key, V value);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
index 551a672..e7cfa0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
@@ -18,11 +18,18 @@
 package org.apache.kafka.streams.kstream;
 
 /**
- * The Reducer interface for combining two values of the same type into a new value.
+ * The {@link Reducer} interface for combining two values of the same type into a new value.
  *
  * @param <V>   value type
  */
 public interface Reducer<V> {
 
+    /**
+     * Aggregate the two given values into a single one.
+     *
+     * @param value1  the first value for the aggregation
+     * @param value2  the second value for the aggregation
+     * @return        the aggregated value
+     */
     V apply(V value1, V value2);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index fa3a9d8..e4ce883 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -24,6 +24,14 @@ import java.util.Map;
 
 /**
  * The time-based window specifications used for aggregations.
+ * <p>
+ * The semantics of a time-based window are: Every T1 (advance) time-units, compute the aggregate total for T2 (size) time-units.
+ * <ul>
+ *     <li> If {@code advance < size} a hopping windows is defined: <br />
+ *          it discretize a stream into overlapping windows, which implies that a record maybe contained in one and or more "adjacent" windows.</li>
+ *     <li> If {@code advance == size} a tumbling window is defined:<br />
+ *          it discretize a stream into non-overlapping windows, which implies that a record is only ever contained in one and only one tumbling window.</li>
+ * </ul>
  */
 public class TimeWindows extends Windows<TimeWindow> {
 
@@ -36,7 +44,7 @@ public class TimeWindows extends Windows<TimeWindow> {
 
     /**
      * The size of the window's advance interval, i.e. by how much a window moves forward relative
-     * to the previous one.  The interval's effective time unit is determined by the semantics of
+     * to the previous one. The interval's effective time unit is determined by the semantics of
      * the topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
      */
     public final long advance;
@@ -56,13 +64,13 @@ public class TimeWindows extends Windows<TimeWindow> {
 
     /**
      * Returns a window definition with the given window size, and with the advance interval being
-     * equal to the window size.  Think: [N * size, N * size + size), with N denoting the N-th
+     * equal to the window size. Think: [N * size, N * size + size), with N denoting the N-th
      * window.
      *
      * This provides the semantics of tumbling windows, which are fixed-sized, gap-less,
-     * non-overlapping windows.  Tumbling windows are a specialization of hopping windows.
+     * non-overlapping windows. Tumbling windows are a specialization of hopping windows.
      *
-     * @param name The name of the window.  Must not be null or empty.
+     * @param name The name of the window. Must not be null or empty.
      * @param size The size of the window, with the requirement that size &gt; 0.
      *             The window size's effective time unit is determined by the semantics of the
      *             topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
@@ -80,7 +88,7 @@ public class TimeWindows extends Windows<TimeWindow> {
      * This provides the semantics of hopping windows, which are fixed-sized, overlapping windows.
      *
      * @param interval The advance interval ("hop") of the window, with the requirement that
-     *                 0 &lt; interval &le; size.  The interval's effective time unit is
+     *                 0 &lt; interval &le; size. The interval's effective time unit is
      *                 determined by the semantics of the topology's configured
      *                 {@link org.apache.kafka.streams.processor.TimestampExtractor}.
      * @return a new window definition

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 5197e94..239854b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
 /**
- * A stateful Transformer interface for transform a key-value pair into a new value.
+ * A stateful {@link Transformer} interface for transform a key-value pair into a new value.
  *
  * @param <K>   key type
  * @param <V>   value type
@@ -40,10 +40,10 @@ public interface Transformer<K, V, R> {
     void init(ProcessorContext context);
 
     /**
-     * Transform the message with the given key and value.
+     * Transform the record with the given key and value.
      *
-     * @param key the key for the message
-     * @param value the value for the message
+     * @param key the key for the record
+     * @param value the value for the record
      * @return new value; if null no key-value pair will be forwarded to down stream
      */
     R transform(K key, V value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
index fc7ba60..0341702 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
@@ -18,9 +18,14 @@
 package org.apache.kafka.streams.kstream;
 
 /**
- * A transformer supplier which can create one or more {@link Transformer} instances.
+ * A {@link TransformerSupplier} interface which can create one or more {@link Transformer} instances.
  */
 public interface TransformerSupplier<K, V, R> {
 
+    /**
+     * Return a new {@link Transformer} instance.
+     *
+     * @return  a new {@link Transformer} instance
+     */
     Transformer<K, V, R> get();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index bea3b57..f45f8c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -29,6 +29,7 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
 
     private static final long DEFAULT_START_TIMESTAMP = 0L;
 
+    /** The start timestamp of the window. */
     public final long start;
 
     private UnlimitedWindows(String name, long start) {
@@ -41,12 +42,18 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
     }
 
     /**
-     * Returns an unlimited window definition
+     * Return an unlimited window starting at timestamp zero.
      */
     public static UnlimitedWindows of(String name) {
         return new UnlimitedWindows(name, DEFAULT_START_TIMESTAMP);
     }
 
+    /**
+     * Return a new unlimited window for the specified start timestamp.
+     *
+     * @param start  the window start time
+     * @return       a new unlimited window that starts at {@code start}
+     */
     public UnlimitedWindows startOn(long start) {
         return new UnlimitedWindows(this.name, start);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
index 5f00a1a..8d4a8e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.streams.kstream;
 
 /**
- * The ValueJoiner interface for joining two values and return a the joined new value.
+ * The {@link ValueJoiner} interface for joining two values into a new value.
  *
  * @param <V1>  first value type
  * @param <V2>  second value type
@@ -26,5 +26,12 @@ package org.apache.kafka.streams.kstream;
  */
 public interface ValueJoiner<V1, V2, R> {
 
+    /**
+     * Return a joined value consisting of {@code value1} and {@code value2}.
+     *
+     * @param value1  the first value for joining
+     * @param value2  the second value for joining
+     * @return        the joined value
+     */
     R apply(V1 value1, V2 value2);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
index 6e62a55..e168e37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -18,12 +18,18 @@
 package org.apache.kafka.streams.kstream;
 
 /**
- * The KeyValueMapper interface for mapping an original value to a new value (could be another key-value pair).
+ * The {@link ValueMapper} interface for mapping an original value to a new value (could be another key-value pair).
  *
  * @param <V1>  original value type
  * @param <V2>  mapped value type
  */
 public interface ValueMapper<V1, V2> {
 
+    /**
+     * Map the given value to a new value.
+     *
+     * @param value  the value to be mapped
+     * @return       the new value
+     */
     V2 apply(V1 value);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 63214fd..f92d9a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
 /**
- * A stateful Value Transformer interface for transform a value into a new value.
+ * A stateful {@link ValueTransformer} interface to transform a value into a new value.
  *
  * @param <V>   value type
  * @param <R>   return type
@@ -31,7 +31,7 @@ public interface ValueTransformer<V, R> {
      * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
      * that contains it is initialized.
      * <p>
-     * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
+     * If this transformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
      * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
      *
      * @param context the context; may not be null
@@ -39,9 +39,9 @@ public interface ValueTransformer<V, R> {
     void init(ProcessorContext context);
 
     /**
-     * Transform the message with the given key and value.
+     * Transform the record with the given key and value.
      *
-     * @param value the value for the message
+     * @param value the value for the record
      * @return new value
      */
     R transform(V value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
index 6bc86bc..ecd454a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
@@ -18,9 +18,14 @@
 package org.apache.kafka.streams.kstream;
 
 /**
- * A value transformer supplier which can create one or more {@link ValueTransformer} instances.
+ * A {@link ValueTransformerSupplier} interface which can create one or more {@link ValueTransformer} instances.
  */
 public interface ValueTransformerSupplier<V, R> {
 
+    /**
+     * Return a new {@link ValueTransformer} instance.
+     *
+     * @return  a new {@link ValueTransformer} instance.
+     */
     ValueTransformer<V, R> get();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index 784d5c3..e1ea9a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -25,25 +25,37 @@ public abstract class Window {
     private long start;
     private long end;
 
+    /**
+     * Create a new window for the given start time (inclusive) and end time (exclusive).
+     *
+     * @param start  the start timestamp of the window (inclusive)
+     * @param end    the end timestamp of the window (exclusive)
+     */
     public Window(long start, long end) {
         this.start = start;
         this.end = end;
     }
 
     /**
-     * Returns the start timestamp of this window, inclusive
+     * Return the start timestamp of this window, inclusive
      */
     public long start() {
         return start;
     }
 
     /**
-     * Returns the end timestamp of this window, exclusive
+     * Return the end timestamp of this window, exclusive
      */
     public long end() {
         return end;
     }
 
+    /**
+     * Check if the given window overlaps with this window.
+     *
+     * @param other  another window
+     * @return       {@code true} if {@code other} overlaps with this window&mdash;{@code false} otherwise
+     */
     public boolean overlap(Window other) {
         return this.start() < other.end() || other.start() < this.end();
     }
@@ -68,4 +80,4 @@ public abstract class Window {
         return (int) (n % 0xFFFFFFFFL);
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index 3691282..feaf6a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -22,30 +22,40 @@ package org.apache.kafka.streams.kstream;
  * i.e. {@link KStream#aggregateByKey(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde,
  * org.apache.kafka.common.serialization.Serde)}
  *
- * @param <T> Type of the key
+ * @param <K> Type of the key
  */
-public class Windowed<T> {
+public class Windowed<K> {
 
-    private T value;
+    private K key;
 
     private Window window;
 
-    public Windowed(T value, Window window) {
-        this.value = value;
+    public Windowed(K key, Window window) {
+        this.key = key;
         this.window = window;
     }
 
-    public T value() {
-        return value;
+    /**
+     * Return the key of the window.
+     *
+     * @return the key of the window
+     */
+    public K key() {
+        return key;
     }
 
+    /**
+     * Return the window containing the values associated with this key.
+     *
+     * @return  the window containing the values
+     */
     public Window window() {
         return window;
     }
 
     @Override
     public String toString() {
-        return "[" + value + "@" + window.start() + "]";
+        return "[" + key + "@" + window.start() + "]";
     }
 
     @Override
@@ -58,12 +68,12 @@ public class Windowed<T> {
 
         Windowed<?> that = (Windowed) obj;
 
-        return this.window.equals(that.window) && this.value.equals(that.value);
+        return this.window.equals(that.window) && this.key.equals(that.key);
     }
 
     @Override
     public int hashCode() {
-        long n = ((long) window.hashCode() << 32) | value.hashCode();
+        long n = ((long) window.hashCode() << 32) | key.hashCode();
         return (int) (n % 0xFFFFFFFFL);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 1406de6..06cacb4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -30,16 +30,12 @@ public abstract class Windows<W extends Window> {
 
     private static final int DEFAULT_NUM_SEGMENTS = 3;
 
-    private static final long DEFAULT_EMIT_DURATION = 1000L;
-
     private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L;   // one day
 
     private static final AtomicInteger NAME_INDEX = new AtomicInteger(0);
 
     protected String name;
 
-    private long emitDurationMs;
-
     private long maintainDurationMs;
 
     public int segments;
@@ -50,7 +46,6 @@ public abstract class Windows<W extends Window> {
         }
         this.name = name;
         this.segments = DEFAULT_NUM_SEGMENTS;
-        this.emitDurationMs = DEFAULT_EMIT_DURATION;
         this.maintainDurationMs = DEFAULT_MAINTAIN_DURATION;
     }
 
@@ -59,16 +54,9 @@ public abstract class Windows<W extends Window> {
     }
 
     /**
-     * Set the window emit duration in milliseconds of system time.
-     */
-    public Windows emit(long durationMs) {
-        this.emitDurationMs = durationMs;
-
-        return this;
-    }
-
-    /**
      * Set the window maintain duration in milliseconds of system time.
+     *
+     * @return  itself
      */
     public Windows until(long durationMs) {
         this.maintainDurationMs = durationMs;
@@ -79,6 +67,8 @@ public abstract class Windows<W extends Window> {
     /**
      * Specify the number of segments to be used for rolling the window store,
      * this function is not exposed to users but can be called by developers that extend this JoinWindows specs.
+     *
+     * @return  itself
      */
     protected Windows segments(int segments) {
         this.segments = segments;
@@ -86,18 +76,21 @@ public abstract class Windows<W extends Window> {
         return this;
     }
 
-    public long emitEveryMs() {
-        return this.emitDurationMs;
-    }
-
+    /**
+     * Return the window maintain duration in milliseconds of system time.
+     *
+     * @return the window maintain duration in milliseconds of system time
+     */
     public long maintainMs() {
         return this.maintainDurationMs;
     }
 
-    protected String newName(String prefix) {
-        return prefix + String.format("%010d", NAME_INDEX.getAndIncrement());
-    }
-
+    /**
+     * Creates all windows that contain the provided timestamp.
+     *
+     * @param timestamp  the timestamp window should get created for
+     * @return  a map of {@code windowStartTimestamp -> Window} entries
+     */
     public abstract Map<Long, W> windowsFor(long timestamp);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index f36cc8c..b4272f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -163,7 +163,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
         @SuppressWarnings("unchecked")
         @Override
         public T get(Windowed<K> windowedKey) {
-            K key = windowedKey.value();
+            K key = windowedKey.key();
             W window = (W) windowedKey.window();
 
             // this iterator should contain at most one element

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 6c05ce3..3ed1499 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -157,7 +157,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
         @SuppressWarnings("unchecked")
         @Override
         public V get(Windowed<K> windowedKey) {
-            K key = windowedKey.value();
+            K key = windowedKey.key();
             W window = (W) windowedKey.window();
 
             // this iterator should only contain one element

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
index 0afcad1..2e19816 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
@@ -40,7 +40,7 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
 
     @Override
     public byte[] serialize(String topic, Windowed<T> data) {
-        byte[] serializedKey = inner.serialize(topic, data.value());
+        byte[] serializedKey = inner.serialize(topic, data.key());
 
         ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE);
         buf.put(serializedKey);
@@ -55,7 +55,7 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
     }
 
     public byte[] serializeBaseKey(String topic, Windowed<T> data) {
-        return inner.serialize(topic, data.value());
+        return inner.serialize(topic, data.key());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index 10e69cc..1e30864 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -29,12 +29,12 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window
     }
 
     /**
-     * WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value
+     * WindowedStreamPartitioner determines the partition number for a record with the given windowed key and value
      * and the current number of partitions. The partition number id determined by the original key of the windowed key
      * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
      *
-     * @param windowedKey the key of the message
-     * @param value the value of the message
+     * @param windowedKey the key of the record
+     * @param value the value of the record
      * @param numPartitions the total number of partitions
      * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
index 61b1c98..0d3424e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
@@ -27,7 +27,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
  * via this timestamp extractor.
  *
  * If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide
- * <i>event-time</i> semantics.
+ * <i>event-time</i> semantics. If <i>LogAppendTime</i> is used to define the built-in timestamps, using
+ * this extractor effectively provides <i>ingestion-time</i> semantics.
  *
  * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index fbd72f0..92fcf12 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -37,10 +37,10 @@ public interface Processor<K, V> {
     void init(ProcessorContext context);
 
     /**
-     * Process the message with the given key and value.
+     * Process the record with the given key and value.
      * 
-     * @param key the key for the message
-     * @param value the value for the message
+     * @param key the key for the record
+     * @param value the value for the record
      */
     void process(K key, V value);
 
@@ -53,7 +53,8 @@ public interface Processor<K, V> {
     void punctuate(long timestamp);
 
     /**
-     * Close this processor and clean up any resources.
+     * Close this processor and clean up any resources. Be aware that {@link #close()} is called after an internal cleanup.
+     * Thus, it is not possible to write anything to Kafka as underlying clients are already closed.
      */
     void close();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
index 6561899..7976e16 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
@@ -29,5 +29,10 @@ package org.apache.kafka.streams.processor;
  */
 public interface ProcessorSupplier<K, V> {
 
+    /**
+     * Return a new {@link Processor} instance.
+     *
+     * @return  a new {@link Processor} instance
+     */
     Processor<K, V> get();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
index 993500d..f2ae020 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -22,7 +22,17 @@ package org.apache.kafka.streams.processor;
  */
 public interface StateStoreSupplier {
 
+    /**
+     * Return the name of this state store supplier.
+     *
+     * @return the name of this state store supplier
+     */
     String name();
 
+    /**
+     * Return a new {@link StateStore} instance.
+     *
+     * @return  a new {@link StateStore} instance
+     */
     StateStore get();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
index f14d9d9..fbb0378 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
@@ -17,21 +17,21 @@
 package org.apache.kafka.streams.processor;
 
 /**
- * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
+ * Determine how records are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
  * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
  * <p>
  * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
  * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
- * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
+ * to use multiple instances of your topology to process in parallel all of the records on the topology's source topics.
  * <p>
  * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
- * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
+ * those processors in that topology instance will consume the records from those partitions. In many cases, Kafka Streams will
  * automatically manage these instances, and adjust when new topology instances are added or removed.
  * <p>
- * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
- * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
- * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
- * determine to which partition each message should be written.
+ * Some topologies, though, need more control over which records appear in each partition. For example, some topologies that have
+ * stateful processors may want all records within a range of keys to always be delivered to and handled by the same topology instance.
+ * An upstream topology producing records to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
+ * determine to which partition each record should be written.
  * <p>
  * To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner
  * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
@@ -48,10 +48,10 @@ package org.apache.kafka.streams.processor;
 public interface StreamPartitioner<K, V> {
 
     /**
-     * Determine the partition number for a message with the given key and value and the current number of partitions.
+     * Determine the partition number for a record with the given key and value and the current number of partitions.
      * 
-     * @param key the key of the message
-     * @param value the value of the message
+     * @param key the key of the record
+     * @param value the value of the record
      * @param numPartitions the total number of partitions
      * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index fa7c73c..7fc00d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -25,11 +25,13 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
- * The task id representation composed as topic group id plus the assigned partition id.
+ * The task ID representation composed as topic group ID plus the assigned partition ID.
  */
 public class TaskId implements Comparable<TaskId> {
 
+    /** The ID of the topic group. */
     public final int topicGroupId;
+    /** The ID of the partition. */
     public final int partition;
 
     public TaskId(int topicGroupId, int partition) {
@@ -42,7 +44,7 @@ public class TaskId implements Comparable<TaskId> {
     }
 
     /**
-     * @throws TaskIdFormatException if the string is not a valid TaskId
+     * @throws TaskIdFormatException if the string is not a valid {@link TaskId}
      */
     public static TaskId parse(String string) {
         int index = string.indexOf('_');

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
index 224d580..c872fa1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
@@ -26,10 +26,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 public interface TimestampExtractor {
 
     /**
-     * Extracts a timestamp from a message
+     * Extracts a timestamp from a record.
+     * <p>
+     * Typically, the timestamp represents the milliseconds since midnight, January 1, 1970 UTC.
      *
-     * @param record ConsumerRecord
-     * @return timestamp
+     * @param record  a data record
+     * @return        the timestamp of the record
      */
     long extract(ConsumerRecord<Object, Object> record);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 487d5fe..5425149 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -43,11 +43,11 @@ import java.util.Set;
 /**
  * A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
  * and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to
- * its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
- * processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
- * is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
+ * its child nodes. A {@link Processor processor} is a node in the graph that receives input records from upstream nodes,
+ * processes that records, and optionally forwarding new records to one or all of its children. Finally, a {@link SinkNode sink}
+ * is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic. This builder allows you
  * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams}
- * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing messages}.
+ * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing records}.
  */
 public class TopologyBuilder {
 
@@ -193,7 +193,7 @@ public class TopologyBuilder {
     public TopologyBuilder() {}
 
     /**
-     * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
+     * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
      * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
      * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
@@ -208,15 +208,15 @@ public class TopologyBuilder {
     }
 
     /**
-     * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
+     * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
      * The source will use the specified key and value deserializers.
      *
      * @param name the unique name of the source used to reference this node when
      * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
+     * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source
      * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
+     * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source
      * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param topics the name of one or more Kafka topics that this source is to consume
@@ -242,14 +242,14 @@ public class TopologyBuilder {
     }
 
     /**
-     * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
      * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and
      * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      *
      * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should write its messages
-     * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
+     * @param topic the name of the Kafka topic to which this sink should write its records
+     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, StreamPartitioner, String...)
@@ -261,22 +261,22 @@ public class TopologyBuilder {
     }
 
     /**
-     * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic, using
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, using
      * the supplied partitioner.
      * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and
      * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      * <p>
-     * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
+     * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
      * the named Kafka topic's partitions. Such control is often useful with topologies that use
      * {@link #addStateStore(StateStoreSupplier, String...) state stores}
-     * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
-     * messages among partitions using Kafka's default partitioning logic.
+     * in its processors. In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
+     * records among partitions using Kafka's default partitioning logic.
      *
      * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should write its messages
-     * @param partitioner the function that should be used to determine the partition for each message processed by the sink
-     * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
+     * @param topic the name of the Kafka topic to which this sink should write its records
+     * @param partitioner the function that should be used to determine the partition for each record processed by the sink
+     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
@@ -288,18 +288,18 @@ public class TopologyBuilder {
     }
 
     /**
-     * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
      * The sink will use the specified key and value serializers.
      *
      * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should write its messages
-     * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
+     * @param topic the name of the Kafka topic to which this sink should write its records
+     * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
      * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
+     * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
      * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
+     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
@@ -311,19 +311,19 @@ public class TopologyBuilder {
     }
 
     /**
-     * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
      * The sink will use the specified key and value serializers, and the supplied partitioner.
      *
      * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should write its messages
-     * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
+     * @param topic the name of the Kafka topic to which this sink should write its records
+     * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
      * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
+     * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
      * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param partitioner the function that should be used to determine the partition for each message processed by the sink
-     * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
+     * @param partitioner the function that should be used to determine the partition for each record processed by the sink
+     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
@@ -354,11 +354,11 @@ public class TopologyBuilder {
     }
 
     /**
-     * Add a new processor node that receives and processes messages output by one or more parent source or processor node.
-     * Any new messages output by this processor will be forwarded to its child processor or sink nodes.
+     * Add a new processor node that receives and processes records output by one or more parent source or processor node.
+     * Any new record output by this processor will be forwarded to its child processor or sink nodes.
      * @param name the unique name of the processor node
      * @param supplier the supplier used to obtain this node's {@link Processor} instance
-     * @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive
+     * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
      * and process
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
index 81821ce..305573b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
@@ -25,9 +25,16 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
  * Using this extractor effectively provides <i>processing-time</i> semantics.
  *
  * If you need <i>event-time</i> semantics, use {@link ConsumerRecordTimestampExtractor} with
- * built-in <i>CreateTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details).
+ * built-in <i>CreateTime</i> or <i>LogAppendTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details).
  */
 public class WallclockTimestampExtractor implements TimestampExtractor {
+
+    /**
+     * Return the current wall clock time as timestamp.
+     *
+     * @param record  a data record
+     * @return        the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC
+     */
     @Override
     public long extract(ConsumerRecord<Object, Object> record) {
         return System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index eff90e8..d4cb78c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -286,7 +286,7 @@ public class StreamThread extends Thread {
         removeStandbyTasks();
 
         // We need to first close the underlying clients before closing the state
-        // manager, for example we need to make sure producer's message sends
+        // manager, for example we need to make sure producer's record sends
         // have all been acked before the state manager records
         // changelog sent offsets
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index 933bf72..b19510c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -24,13 +24,23 @@ import org.apache.kafka.common.serialization.Serializer;
 /**
  * Factory for creating serializers / deserializers for state stores in Kafka Streams.
  *
- * @param <K> key type of serdes
- * @param <V> value type of serdes
+ * @param <K> key type of serde
+ * @param <V> value type of serde
  */
 public final class StateSerdes<K, V> {
 
-    public static <K, V> StateSerdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) {
-        return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
+    /**
+     * Create a new instance of {@link StateSerdes} for the given state name and key-/value-type classes.
+     *
+     * @param stateName   the name of the state
+     * @param keyClass    the class of the key type
+     * @param valueClass  the class of the value type
+     * @param <K>         the key type
+     * @param <V>         the value type
+     * @return            a new instance of {@link StateSerdes}
+     */
+    public static <K, V> StateSerdes<K, V> withBuiltinTypes(String stateName, Class<K> keyClass, Class<V> valueClass) {
+        return new StateSerdes<>(stateName, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
     }
 
     private final String stateName;
@@ -63,46 +73,105 @@ public final class StateSerdes<K, V> {
         this.valueSerde = valueSerde;
     }
 
+    /**
+     * Return the key serde.
+     *
+     * @return the key serde
+     */
     public Serde<K> keySerde() {
         return keySerde;
     }
 
+    /**
+     * Return the value serde.
+     *
+     * @return the value serde
+     */
     public Serde<V> valueSerde() {
         return valueSerde;
     }
 
+    /**
+     * Return the key deserializer.
+     *
+     * @return the key deserializer
+     */
     public Deserializer<K> keyDeserializer() {
         return keySerde.deserializer();
     }
 
+    /**
+     * Return the key serializer.
+     *
+     * @return the key serializer
+     */
     public Serializer<K> keySerializer() {
         return keySerde.serializer();
     }
 
+    /**
+     * Return the value deserializer.
+     *
+     * @return the value deserializer
+     */
     public Deserializer<V> valueDeserializer() {
         return valueSerde.deserializer();
     }
 
+    /**
+     * Return the value serializer.
+     *
+     * @return the value serializer
+     */
     public Serializer<V> valueSerializer() {
         return valueSerde.serializer();
     }
 
-    public String topic() {
+    /**
+     * Return the name of the state.
+     *
+     * @return the name of the state
+     */
+    public String stateName() {
         return stateName;
     }
 
+    /**
+     * Deserialize the key from raw bytes.
+     *
+     * @param rawKey  the key as raw bytes
+     * @return        the key as typed object
+     */
     public K keyFrom(byte[] rawKey) {
         return keySerde.deserializer().deserialize(stateName, rawKey);
     }
 
+    /**
+     * Deserialize the value from raw bytes.
+     *
+     * @param rawValue  the value as raw bytes
+     * @return          the value as typed object
+     */
     public V valueFrom(byte[] rawValue) {
         return valueSerde.deserializer().deserialize(stateName, rawValue);
     }
 
+    /**
+     * Serialize the given key.
+     *
+     * @param key  the key to be serialized
+     * @return     the serialized key
+     */
     public byte[] rawKey(K key) {
         return keySerde.serializer().serialize(stateName, key);
     }
 
+    /**
+     * Serialize the given value.
+     *
+     * @param value  the value to be serialized
+     * @return       the serialized value
+     */
     public byte[] rawValue(V value) {
         return valueSerde.serializer().serialize(stateName, value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index c7a882f..e400cef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.streams.processor.StateStore;
 
 /**
- * A windowed store interface extending {@link StateStore}
+ * A windowed store interface extending {@link StateStore}.
  *
  * @param <K> Type of keys
  * @param <V> Type of values

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 733c1ea..fbe7754 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -213,7 +213,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                 new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
                     @Override
                     public KeyValue<String, Long> apply(Windowed<String> key, Long value) {
-                        return new KeyValue<>(key.value() + "@" + key.window().start(), value);
+                        return new KeyValue<>(key.key() + "@" + key.window().start(), value);
                     }
                 }
         ).to(stringSerde, longSerde, "wcnt");

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
index c5ded5e..b0d7a0b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -77,7 +77,7 @@ public class SmokeTestUtil {
 
     public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, KeyValue<K, V>> {
         public KeyValue<K, V> apply(Windowed<K> winKey, V value) {
-            return new KeyValue<K, V>(winKey.value(), value);
+            return new KeyValue<K, V>(winKey.key(), value);
         }
     }