You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/03 20:35:08 UTC
kafka git commit: MINOR: Some more Kafka Streams Javadocs
Repository: kafka
Updated Branches:
refs/heads/trunk 1d80f563b -> 79eacf6c9
MINOR: Some more Kafka Streams Javadocs
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Matsuda <ya...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #853 from guozhangwang/KJavaDoc
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/79eacf6c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/79eacf6c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/79eacf6c
Branch: refs/heads/trunk
Commit: 79eacf6c95506d5d6819add5a1256681b13170b1
Parents: 1d80f56
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Feb 3 11:31:32 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Feb 3 11:31:32 2016 -0800
----------------------------------------------------------------------
.../org/apache/kafka/streams/KafkaStreams.java | 35 ++++++++++++++------
.../java/org/apache/kafka/streams/KeyValue.java | 8 +++++
.../apache/kafka/streams/StreamsMetrics.java | 3 ++
.../streams/errors/ProcessorStateException.java | 3 ++
.../streams/errors/TaskAssignmentException.java | 2 +-
.../streams/errors/TaskIdFormatException.java | 3 ++
.../errors/TopologyBuilderException.java | 3 ++
.../kafka/streams/kstream/Aggregator.java | 7 ++++
.../kafka/streams/kstream/HoppingWindows.java | 3 ++
.../kafka/streams/kstream/Initializer.java | 5 +++
.../kafka/streams/kstream/JoinWindows.java | 2 +-
.../apache/kafka/streams/kstream/KStream.java | 2 +-
.../kafka/streams/kstream/KStreamBuilder.java | 11 +++++-
.../apache/kafka/streams/kstream/KTable.java | 1 +
.../kafka/streams/kstream/KeyValueMapper.java | 7 ++++
.../apache/kafka/streams/kstream/Predicate.java | 7 ++--
.../apache/kafka/streams/kstream/Reducer.java | 5 +++
.../kafka/streams/kstream/TumblingWindows.java | 4 ++-
.../kafka/streams/kstream/ValueJoiner.java | 7 ++++
.../kafka/streams/kstream/ValueMapper.java | 6 ++++
.../kstream/ValueTransformerSupplier.java | 1 -
.../apache/kafka/streams/kstream/Window.java | 3 ++
.../apache/kafka/streams/kstream/Windows.java | 5 +++
.../processor/DefaultPartitionGrouper.java | 12 +++++--
.../kafka/streams/processor/Processor.java | 2 +-
.../streams/processor/ProcessorContext.java | 3 ++
.../streams/processor/StateStoreSupplier.java | 3 ++
.../apache/kafka/streams/processor/TaskId.java | 3 ++
.../kafka/streams/state/KeyValueIterator.java | 6 ++++
.../apache/kafka/streams/state/WindowStore.java | 17 ++++++++++
30 files changed, 155 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 071cef6..a19f697 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams;
+import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -38,15 +39,18 @@ import java.util.concurrent.atomic.AtomicInteger;
* Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
* sends output to zero or more output topics.
* <p>
- * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify
- * the transformation.
- * The {@link KafkaStreams} instance will be responsible for the lifecycle of these processors. It will instantiate and
- * start one or more of these processors to process the Kafka partitions assigned to this particular instance.
+ * The computational logic can be specified either by using the {@link TopologyBuilder} class to define the a DAG topology of
+ * {@link org.apache.kafka.streams.processor.Processor}s or by using the {@link org.apache.kafka.streams.kstream.KStreamBuilder}
+ * class which provides the high-level {@link org.apache.kafka.streams.kstream.KStream} DSL to define the transformation.
+ *
+ * The {@link KafkaStreams} class manages the lifecycle of a Kafka Streams instance. One stream instance can contain one or
+ * more threads specified in the configs for the processing work.
* <p>
- * This {@link KafkaStreams} instance will co-ordinate with any other instances (whether in this same process, on other processes
- * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being
- * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or
- * started in the appropriate processes to balance processing load.
+ * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same job ID (whether in this same process, on other processes
+ * on this machine, or on remote machines) as a single (possibly distributed) stream processing client. These instances will divide up the work
+ * based on the assignment of the input topic partitions so that all partitions are being
+ * consumed. If instances are added or failed, all instances will rebelance the partition assignment among themselves
+ * to balance processing load.
* <p>
* Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
* and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
@@ -70,6 +74,9 @@ import java.util.concurrent.atomic.AtomicInteger;
* </pre>
*
*/
+// TODO: about example may need to be updated after KAFKA-3153
+
+@InterfaceStability.Unstable
public class KafkaStreams {
private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
@@ -94,6 +101,12 @@ public class KafkaStreams {
this(builder, new StreamsConfig(props));
}
+ /**
+ * Construct the stream instance.
+ *
+ * @param builder The processor topology builder specifying the computational logic
+ * @param config The stream configs
+ */
public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
// create the metrics
Time time = new SystemTime();
@@ -124,7 +137,7 @@ public class KafkaStreams {
}
/**
- * Start the stream process by starting all its threads
+ * Start the stream instance by starting all its threads.
*/
public synchronized void start() {
log.debug("Starting Kafka Stream process");
@@ -142,8 +155,8 @@ public class KafkaStreams {
}
/**
- * Shutdown this stream process by signaling the threads to stop,
- * wait for them to join and clean up the process instance.
+ * Shutdown this stream instance by signaling all the threads to stop,
+ * and then wait for them to join.
*/
public synchronized void close() {
log.debug("Stopping Kafka Stream process");
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
index 472e677..d813c47 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -17,6 +17,14 @@
package org.apache.kafka.streams;
+/**
+ * A key-value pair defined for a single Kafka Streams record.
+ * If the record comes directly from a Kafka topic then its
+ * key / value are defined as the message key / value.
+ *
+ * @param <K> Key type
+ * @param <V> Value type
+ */
public class KeyValue<K, V> {
public final K key;
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
index a151392..d392eef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
@@ -19,6 +19,9 @@ package org.apache.kafka.streams;
import org.apache.kafka.common.metrics.Sensor;
+/**
+ * The stream metrics interface for adding metric sensors and collecting metric values.
+ */
public interface StreamsMetrics {
Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
index 6434d04..e6f872a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
@@ -17,6 +17,9 @@
package org.apache.kafka.streams.errors;
+/**
+ * Indicates a processor state operation (e.g. put, get) has failed.
+ */
public class ProcessorStateException extends StreamsException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
index 3ae8503..3936ef5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.errors;
/**
- * The run time exception class for stream task assignments
+ * Indicates a run time error incurred while trying to assign stream tasks to threads
*/
public class TaskAssignmentException extends StreamsException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
index bf0ebf5..576b972 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
@@ -17,6 +17,9 @@
package org.apache.kafka.streams.errors;
+/**
+ * Indicates a run time error incurred while trying parse the task id from the read string
+ */
public class TaskIdFormatException extends StreamsException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
index 9dd740b..8745693 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
@@ -17,6 +17,9 @@
package org.apache.kafka.streams.errors;
+/**
+ * Indicates a pre-run time error incurred while parsing the builder to construct the processor topology
+ */
public class TopologyBuilderException extends StreamsException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index e3eb18f..0d29409 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -17,6 +17,13 @@
package org.apache.kafka.streams.kstream;
+/**
+ * The Aggregator interface for aggregating values of the given key.
+ *
+ * @param <K> Key type.
+ * @param <V> Receiving value type.
+ * @param <T> Aggregate value type.
+ */
public interface Aggregator<K, V, T> {
T apply(K aggKey, V value, T aggregate);
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
index f354ef9..aa866e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
@@ -22,6 +22,9 @@ import org.apache.kafka.streams.kstream.internals.HoppingWindow;
import java.util.HashMap;
import java.util.Map;
+/**
+ * The hopping window specifications used for aggregations.
+ */
public class HoppingWindows extends Windows<HoppingWindow> {
private static final long DEFAULT_SIZE_MS = 1000L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
index 0aeddc9..fdd5220 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
@@ -17,6 +17,11 @@
package org.apache.kafka.streams.kstream;
+/**
+ * The Initializer interface for creating an initial value for aggregations.
+ *
+ * @param <T> Aggregate value type.
+ */
public interface Initializer<T> {
T apply();
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index ffc1c1c..70294a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.internals.TumblingWindow;
import java.util.Map;
/**
- * This class is used to specify the behaviour of windowed joins.
+ * The window specifications used for joins.
*/
public class JoinWindows extends Windows<TumblingWindow> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index f6fa48d..b83298f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-
+// TODO: Javadoc needs to be updated
/**
* KStream is an abstraction of a stream of key-value pairs.
*
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index b50cffb..3cf198c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -29,7 +29,8 @@ import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * KStreamBuilder is the class to create KStream instances.
+ * KStreamBuilder is a subclass of {@link TopologyBuilder} that provides the {@link KStream} DSL
+ * for users to specify computational logic and translates the given logic to a processor topology.
*/
public class KStreamBuilder extends TopologyBuilder {
@@ -39,6 +40,7 @@ public class KStreamBuilder extends TopologyBuilder {
super();
}
+ // TODO: needs updated
/**
* Creates a KStream instance for the specified topic.
* The default deserializers specified in the config are used.
@@ -115,6 +117,13 @@ public class KStreamBuilder extends TopologyBuilder {
return KStreamImpl.merge(this, streams);
}
+ /**
+ * Create a unique processor name used for translation into the processor topology.
+ * This function is only for internal usage.
+ *
+ * @param prefix Processor name prefix.
+ * @return The unique processor name.
+ */
public String newName(String prefix) {
return prefix + String.format("%010d", index.getAndIncrement());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 5cd9d9c..b83b0de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
+// TODO: Javadoc needs to be updated.
/**
* KTable is an abstraction of a change log stream.
*
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
index 62b07f6..9c04ef5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
@@ -17,6 +17,13 @@
package org.apache.kafka.streams.kstream;
+/**
+ * The KeyValueMapper interface for mapping a key-value pair to a new value (could be another key-value pair).
+ *
+ * @param <K> Original key type.
+ * @param <V> Original value type.
+ * @param <R> Mapped value type.
+ */
public interface KeyValueMapper<K, V, R> {
R apply(K key, V value);
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
index c73622e..784f5b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
@@ -18,13 +18,12 @@
package org.apache.kafka.streams.kstream;
/**
- * Represents a predicate (boolean-valued function) of two arguments.
+ * The Predicate interface represents a predicate (boolean-valued function) of a key-value pair.
*
- * @param <K> the type of key
- * @param <V> the type of value
+ * @param <K> Key type.
+ * @param <V> Value type.
*/
public interface Predicate<K, V> {
boolean test(K key, V value);
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
index 418f442..bf25f73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
@@ -17,6 +17,11 @@
package org.apache.kafka.streams.kstream;
+/**
+ * The Reducer interface for combining two values of the same type into a new value.
+ *
+ * @param <V> Value type.
+ */
public interface Reducer<V> {
V apply(V value1, V value2);
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
index 188fe66..cadedba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
@@ -17,12 +17,14 @@
package org.apache.kafka.streams.kstream;
-
import org.apache.kafka.streams.kstream.internals.TumblingWindow;
import java.util.HashMap;
import java.util.Map;
+/**
+ * The tumbling window specifications used for aggregations.
+ */
public class TumblingWindows extends Windows<TumblingWindow> {
private static final long DEFAULT_SIZE_MS = 1000L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
index 93fc359..41005b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
@@ -17,6 +17,13 @@
package org.apache.kafka.streams.kstream;
+/**
+ * The ValueJoiner interface for joining two values and return a the joined new value.
+ *
+ * @param <V1> First value type.
+ * @param <V2> Second value type.
+ * @param <R> Joined value type.
+ */
public interface ValueJoiner<V1, V2, R> {
R apply(V1 value1, V2 value2);
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
index a32423d..d507c87 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -17,6 +17,12 @@
package org.apache.kafka.streams.kstream;
+/**
+ * The KeyValueMapper interface for mapping an original value to a new value (could be another key-value pair).
+ *
+ * @param <V1> Original value type.
+ * @param <V2> Mapped value type.
+ */
public interface ValueMapper<V1, V2> {
V2 apply(V1 value);
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
index 5c053c7..04fa9eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
@@ -20,5 +20,4 @@ package org.apache.kafka.streams.kstream;
public interface ValueTransformerSupplier<V, R> {
ValueTransformer<V, R> get();
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index b9401b0..f2965dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -17,6 +17,9 @@
package org.apache.kafka.streams.kstream;
+/**
+ * A single window instance, defined by its start and end timestamp.
+ */
public abstract class Window {
private long start;
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index e4d7d9d..678e351 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -21,6 +21,11 @@ package org.apache.kafka.streams.kstream;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * The window specification interface that can be extended for windowing operation in joins and aggregations.
+ *
+ * @param <W> Type of the window instance
+ */
public abstract class Windows<W extends Window> {
private static final int DEFAULT_NUM_SEGMENTS = 3;
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 57df685..dad5c6f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -31,12 +31,20 @@ import java.util.Set;
/**
* DefaultPartitionGrouper groups partitions by the partition id. This behavior is assumed by the join processing in KStream.
- * Join processing requires that topics are copartitoned, i.e., being partitioned by the same key and having the same
- * number of partitions, are grouped together. Copartitioning is ensured by having the same number of partitions on
+ *
+ * Join operations requires that topics of the joining entities are copartitoned, i.e., being partitioned by the same key and having the same
+ * number of partitions. Copartitioning is ensured by having the same number of partitions on
* joined topics, and by using the serialization and Producer's default partitioner.
*/
public class DefaultPartitionGrouper implements PartitionGrouper {
+ /**
+ * Generate tasks with the assigned topic partitions
+ *
+ * @param topicGroups {@link TopologyBuilder#topicGroups()} where topics of the same group need to be joined together
+ * @param metadata Metadata of the consuming cluster
+ * @return The map from generated task ids to the assigned partitions.
+ */
public Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata) {
Map<TaskId, Set<TopicPartition>> groups = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index 3cade3a..fbd72f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -18,7 +18,7 @@
package org.apache.kafka.streams.processor;
/**
- * A processor of messages.
+ * A processor of key-value pair records.
*
* @param <K> the type of keys
* @param <V> the type of values
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index af98300..9740fa3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -23,6 +23,9 @@ import org.apache.kafka.streams.StreamsMetrics;
import java.io.File;
+/**
+ * Processor context interface.
+ */
public interface ProcessorContext {
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
index 11545c5..993500d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -17,6 +17,9 @@
package org.apache.kafka.streams.processor;
+/**
+ * A state store supplier which can create one or more {@link StateStore} instances.
+ */
public interface StateStoreSupplier {
String name();
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index 6e7150e..69b29bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -24,6 +24,9 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+/**
+ * The task id representation composed as topic group id plus the assigned partition id.
+ */
public class TaskId implements Comparable<TaskId> {
public final int topicGroupId;
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
index bd118a2..55ec8cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
@@ -24,6 +24,12 @@ import org.apache.kafka.streams.KeyValue;
import java.io.Closeable;
import java.util.Iterator;
+/**
+ * Iterator interface of {@link KeyValue}.
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ */
public interface KeyValueIterator<K, V> extends Iterator<KeyValue<K, V>>, Closeable {
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 1d806e0..cbd373c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -21,11 +21,28 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.streams.processor.StateStore;
+/**
+ * A windowed store interface extending {@link StateStore}
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ */
public interface WindowStore<K, V> extends StateStore {
+ /**
+ * Put a key-value pair with the current wall-clock time as the timestamp
+ * into the corresponding window
+ */
void put(K key, V value);
+ /**
+ * Put a key-value pair with the given timestamp into the corresponding window
+ */
void put(K key, V value, long timestamp);
+ /**
+ * Get all the key-value pairs with the given key and the time range from all
+ * the existing windows.
+ */
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
}