You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:54 UTC
[40/50] [abbrv] kafka git commit: KAFKA-3598: Improve JavaDoc of
public API
KAFKA-3598: Improve JavaDoc of public API
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Michael G. Noll, Guozhang Wang
Closes #1250 from mjsax/JavaDoc-publicAPI
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4ab4e4af
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4ab4e4af
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4ab4e4af
Branch: refs/heads/0.10.0
Commit: 4ab4e4af814fb791fe6e8c2bd3381da8ca80b0b5
Parents: 68433dc
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Apr 29 08:49:16 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Apr 29 08:49:16 2016 -0700
----------------------------------------------------------------------
.../examples/pageview/PageViewTypedDemo.java | 4 +-
.../examples/pageview/PageViewUntypedDemo.java | 2 +-
.../org/apache/kafka/streams/KafkaStreams.java | 10 ++-
.../java/org/apache/kafka/streams/KeyValue.java | 17 +++++
.../kafka/streams/kstream/Aggregator.java | 10 ++-
.../kafka/streams/kstream/ForeachAction.java | 10 ++-
.../kafka/streams/kstream/Initializer.java | 7 +-
.../kafka/streams/kstream/JoinWindows.java | 21 +++---
.../apache/kafka/streams/kstream/KStream.java | 10 +--
.../kafka/streams/kstream/KStreamBuilder.java | 15 ++--
.../apache/kafka/streams/kstream/KTable.java | 19 ++---
.../kafka/streams/kstream/KeyValueMapper.java | 9 ++-
.../apache/kafka/streams/kstream/Predicate.java | 9 ++-
.../apache/kafka/streams/kstream/Reducer.java | 9 ++-
.../kafka/streams/kstream/TimeWindows.java | 18 +++--
.../kafka/streams/kstream/Transformer.java | 8 +-
.../streams/kstream/TransformerSupplier.java | 7 +-
.../kafka/streams/kstream/UnlimitedWindows.java | 9 ++-
.../kafka/streams/kstream/ValueJoiner.java | 9 ++-
.../kafka/streams/kstream/ValueMapper.java | 8 +-
.../kafka/streams/kstream/ValueTransformer.java | 8 +-
.../kstream/ValueTransformerSupplier.java | 7 +-
.../apache/kafka/streams/kstream/Window.java | 18 ++++-
.../apache/kafka/streams/kstream/Windowed.java | 30 +++++---
.../apache/kafka/streams/kstream/Windows.java | 37 ++++-----
.../internals/KStreamWindowAggregate.java | 2 +-
.../kstream/internals/KStreamWindowReduce.java | 2 +-
.../kstream/internals/WindowedSerializer.java | 4 +-
.../internals/WindowedStreamPartitioner.java | 6 +-
.../ConsumerRecordTimestampExtractor.java | 3 +-
.../kafka/streams/processor/Processor.java | 9 ++-
.../streams/processor/ProcessorSupplier.java | 5 ++
.../streams/processor/StateStoreSupplier.java | 10 +++
.../streams/processor/StreamPartitioner.java | 20 ++---
.../apache/kafka/streams/processor/TaskId.java | 6 +-
.../streams/processor/TimestampExtractor.java | 8 +-
.../streams/processor/TopologyBuilder.java | 64 ++++++++--------
.../processor/WallclockTimestampExtractor.java | 9 ++-
.../processor/internals/StreamThread.java | 2 +-
.../apache/kafka/streams/state/StateSerdes.java | 79 ++++++++++++++++++--
.../apache/kafka/streams/state/WindowStore.java | 2 +-
.../streams/smoketest/SmokeTestClient.java | 2 +-
.../kafka/streams/smoketest/SmokeTestUtil.java | 2 +-
43 files changed, 383 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 39ec41f..e53b037 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -168,10 +168,10 @@ public class PageViewTypedDemo {
public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
wViewByRegion.windowStart = key.window().start();
- wViewByRegion.region = key.value();
+ wViewByRegion.region = key.key();
RegionCount rCount = new RegionCount();
- rCount.region = key.value();
+ rCount.region = key.key();
rCount.count = value;
return new KeyValue<>(wViewByRegion, rCount);
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 9a41b9e..8a0af6c 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -107,7 +107,7 @@ public class PageViewUntypedDemo {
public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
keyNode.put("window-start", key.window().start())
- .put("region", key.value());
+ .put("region", key.key());
ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
valueNode.put("count", value);
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 4d1306d..45024f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -97,6 +97,12 @@ public class KafkaStreams {
// usage only and should not be exposed to users at all.
private final UUID processId;
+ /**
+ * Construct the stream instance.
+ *
+ * @param builder the processor topology builder specifying the computational logic
+ * @param props properties for the {@link StreamsConfig}
+ */
public KafkaStreams(TopologyBuilder builder, Properties props) {
this(builder, new StreamsConfig(props));
}
@@ -104,8 +110,8 @@ public class KafkaStreams {
/**
* Construct the stream instance.
*
- * @param builder The processor topology builder specifying the computational logic
- * @param config The stream configs
+ * @param builder the processor topology builder specifying the computational logic
+ * @param config the stream configs
*/
public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
// create the metrics
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
index ca86fc4..58f2083 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -29,14 +29,31 @@ import java.util.Objects;
*/
public class KeyValue<K, V> {
+ /** The key of the key-value pair. */
public final K key;
+ /** The value of the key-value pair. */
public final V value;
+ /**
+ * Create a new key-value pair.
+ *
+ * @param key the key
+ * @param value the value
+ */
public KeyValue(K key, V value) {
this.key = key;
this.value = value;
}
+ /**
+ * Create a new key-value pair.
+ *
+ * @param key the key
+ * @param value the value
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ * @return a new key value pair
+ */
public static <K, V> KeyValue<K, V> pair(K key, V value) {
return new KeyValue<>(key, value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index 9ec9f96..989d89f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -18,7 +18,7 @@
package org.apache.kafka.streams.kstream;
/**
- * The Aggregator interface for aggregating values of the given key.
+ * The {@link Aggregator} interface for aggregating values of the given key.
*
* @param <K> key type
* @param <V> original value type
@@ -26,5 +26,13 @@ package org.apache.kafka.streams.kstream;
*/
public interface Aggregator<K, V, T> {
+ /**
+ * Compute a new aggregate from the key and value of a record and the current aggregate of the same key.
+ *
+ * @param aggKey the key of the record
+ * @param value the value of the record
+ * @param aggregate the current aggregate value
+ * @return the new aggregate value
+ */
T apply(K aggKey, V value, T aggregate);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
index 83064e8..b3e3169 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
@@ -18,9 +18,8 @@
package org.apache.kafka.streams.kstream;
-
/**
- * The ForeachAction interface for performing an action on a key-value pair.
+ * The {@link ForeachAction} interface for performing an action on a key-value pair.
* Note that this action is stateless. If stateful processing is required, consider
* using {@link KStream#transform(TransformerSupplier, String...)} or
* {@link KStream#process(ProcessorSupplier, String...)} instead.
@@ -29,6 +28,13 @@ package org.apache.kafka.streams.kstream;
* @param <V> original value type
*/
public interface ForeachAction<K, V> {
+
+ /**
+ * Perform an action for each record of a stream.
+ *
+ * @param key the key of the record
+ * @param value the value of the record
+ */
void apply(K key, V value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
index 67c1c21..39bc40d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
@@ -18,11 +18,16 @@
package org.apache.kafka.streams.kstream;
/**
- * The Initializer interface for creating an initial value in aggregations.
+ * The {@link Initializer} interface for creating an initial value in aggregations.
*
* @param <T> aggregate value type
*/
public interface Initializer<T> {
+ /**
+ * Return the initial value for an aggregation.
+ *
+ * @return the initial value for an aggregation
+ */
T apply();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index a74984a..a6d5603 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -26,7 +26,9 @@ import java.util.Map;
*/
public class JoinWindows extends Windows<TimeWindow> {
+ /** Maximum time difference for tuples that are before the join tuple. */
public final long before;
+ /** Maximum time difference for tuples that are after the join tuple. */
public final long after;
private JoinWindows(String name, long before, long after) {
@@ -41,40 +43,41 @@ public class JoinWindows extends Windows<TimeWindow> {
}
/**
- * Specifies that records of the same key are joinable if their timestamp stamps are within
- * timeDifference.
+ * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}.
*
- * @param timeDifference join window interval in milliseconds
+ * @param timeDifference join window interval
*/
public JoinWindows within(long timeDifference) {
return new JoinWindows(this.name, timeDifference, timeDifference);
}
/**
- * Specifies that records of the same key are joinable if their timestamp stamps are within
+ * Specifies that records of the same key are joinable if their timestamps are within
* the join window interval, and if the timestamp of a record from the secondary stream is
* earlier than or equal to the timestamp of a record from the first stream.
*
- * @param timeDifference join window interval in milliseconds
+ * @param timeDifference join window interval
*/
public JoinWindows before(long timeDifference) {
return new JoinWindows(this.name, timeDifference, this.after);
}
/**
- * Specifies that records of the same key are joinable if their timestamp stamps are within
+ * Specifies that records of the same key are joinable if their timestamps are within
* the join window interval, and if the timestamp of a record from the secondary stream
* is later than or equal to the timestamp of a record from the first stream.
*
- * @param timeDifference join window interval in milliseconds
+ * @param timeDifference join window interval
*/
public JoinWindows after(long timeDifference) {
return new JoinWindows(this.name, this.before, timeDifference);
}
+ /**
+ * Not supported by {@link JoinWindows}. Throws {@link UnsupportedOperationException}.
+ */
@Override
public Map<Long, TimeWindow> windowsFor(long timestamp) {
- // this function should never be called
throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
}
@@ -98,4 +101,4 @@ public class JoinWindows extends Windows<TimeWindow> {
return result;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 7e3562c..6df2deb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
/**
- * KStream is an abstraction of a <i>record stream</i> of key-value pairs.
+ * {@link KStream} is an abstraction of a <i>record stream</i> of key-value pairs.
*
* @param <K> Type of keys
* @param <V> Type of values
@@ -510,7 +510,7 @@ public interface KStream<K, V> {
String name);
/**
- * Count number of messages of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+ * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
*
* @param windows the specification of the aggregation {@link Windows}
* @param keySerde key serdes for materializing the counting table,
@@ -519,7 +519,7 @@ public interface KStream<K, V> {
<W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde);
/**
- * Count number of messages of this stream by key on a window basis into a new instance of windowed {@link KTable}
+ * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}
* with default serializers and deserializers.
*
* @param windows the specification of the aggregation {@link Windows}
@@ -527,7 +527,7 @@ public interface KStream<K, V> {
<W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows);
/**
- * Count number of messages of this stream by key into a new instance of ever-updating {@link KTable}.
+ * Count number of records of this stream by key into a new instance of ever-updating {@link KTable}.
*
* @param keySerde key serdes for materializing the counting table,
* if not specified the default serdes defined in the configs will be used
@@ -536,7 +536,7 @@ public interface KStream<K, V> {
KTable<K, Long> countByKey(Serde<K> keySerde, String name);
/**
- * Count number of messages of this stream by key into a new instance of ever-updating {@link KTable}
+ * Count number of records of this stream by key into a new instance of ever-updating {@link KTable}
* with default serializers and deserializers.
*
* @param name the name of the resulted {@link KTable}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 6b770b4..159876c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -28,19 +28,22 @@ import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * KStreamBuilder is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL
+ * {@link KStreamBuilder} is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL
* for users to specify computational logic and translates the given logic to a {@link org.apache.kafka.streams.processor.internals.ProcessorTopology}.
*/
public class KStreamBuilder extends TopologyBuilder {
private final AtomicInteger index = new AtomicInteger(0);
+ /**
+ * Create a new {@link KStreamBuilder} instance.
+ */
public KStreamBuilder() {
super();
}
/**
- * Creates a {@link KStream} instance from the specified topics.
+ * Create a {@link KStream} instance from the specified topics.
* The default deserializers specified in the config are used.
*
* @param topics the topic names; must contain at least one topic name
@@ -50,7 +53,7 @@ public class KStreamBuilder extends TopologyBuilder {
}
/**
- * Creates a {@link KStream} instance for the specified topics.
+ * Create a {@link KStream} instance for the specified topics.
*
* @param keySerde key serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
@@ -67,7 +70,7 @@ public class KStreamBuilder extends TopologyBuilder {
}
/**
- * Creates a {@link KTable} instance for the specified topic.
+ * Create a {@link KTable} instance for the specified topic.
* The default deserializers specified in the config are used.
*
* @param topic the topic name; cannot be null
@@ -77,7 +80,7 @@ public class KStreamBuilder extends TopologyBuilder {
}
/**
- * Creates a {@link KTable} instance for the specified topic.
+ * Create a {@link KTable} instance for the specified topic.
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -98,7 +101,7 @@ public class KStreamBuilder extends TopologyBuilder {
}
/**
- * Creates a new instance of {@link KStream} by merging the given streams
+ * Create a new instance of {@link KStream} by merging the given streams.
*
* @param streams the instances of {@link KStream} to be merged
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 8414279..4ff9b48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.StreamPartitioner;
/**
- * KTable is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
+ * {@link KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
*
* @param <K> Type of primary keys
* @param <V> Type of value changes
@@ -39,7 +39,7 @@ public interface KTable<K, V> {
KTable<K, V> filter(Predicate<K, V> predicate);
/**
- * Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate
+ * Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate.
*
* @param predicate the instance of {@link Predicate}
*/
@@ -55,7 +55,7 @@ public interface KTable<K, V> {
/**
- * Print the elements of this stream to System.out
+ * Print the elements of this stream to {@code System.out}
*
* Implementors will need to override toString for keys and values that are not of
* type String, Integer etc to get meaningful information.
@@ -63,7 +63,7 @@ public interface KTable<K, V> {
void print();
/**
- * Print the elements of this stream to System.out
+ * Print the elements of this stream to {@code System.out}
* @param keySerde key serde used to send key-value pairs,
* if not specified the default serde defined in the configs will be used
* @param valSerde value serde used to send key-value pairs,
@@ -75,15 +75,16 @@ public interface KTable<K, V> {
void print(Serde<K> keySerde, Serde<V> valSerde);
/**
- * Write the elements of this stream to a file at the given path.
+ * Write the elements of this stream to a file at the given path using default serializers and deserializers.
* @param filePath name of file to write to
*
- * Implementors will need to override toString for keys and values that are not of
- * type String, Integer etc to get meaningful information.
+ * Implementors will need to override {@code toString} for keys and values that are not of
+ * type {@link String}, {@link Integer} etc. to get meaningful information.
*/
void writeAsText(String filePath);
/**
+ * Write the elements of this stream to a file at the given path.
*
* @param filePath name of file to write to
* @param keySerde key serde used to send key-value pairs,
@@ -91,8 +92,8 @@ public interface KTable<K, V> {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default serde defined in the configs will be used
*
- * Implementors will need to override toString for keys and values that are not of
- * type String, Integer etc to get meaningful information.
+ * Implementors will need to override {@code toString} for keys and values that are not of
+ * type {@link String}, {@link Integer} etc. to get meaningful information.
*/
void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
index a4aed91..b36ed63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
@@ -18,7 +18,7 @@
package org.apache.kafka.streams.kstream;
/**
- * The KeyValueMapper interface for mapping a key-value pair to a new value (could be another key-value pair).
+ * The {@link KeyValueMapper} interface for mapping a key-value pair to a new value (could be another key-value pair).
*
* @param <K> original key type
* @param <V> original value type
@@ -26,5 +26,12 @@ package org.apache.kafka.streams.kstream;
*/
public interface KeyValueMapper<K, V, R> {
+ /**
+ * Map a record with the given key and value to a new value.
+ *
+ * @param key the key of the record
+ * @param value the value of the record
+ * @return the new value
+ */
R apply(K key, V value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
index c90554b..2df2d5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
@@ -18,12 +18,19 @@
package org.apache.kafka.streams.kstream;
/**
- * The Predicate interface represents a predicate (boolean-valued function) of a key-value pair.
+ * The {@link Predicate} interface represents a predicate (boolean-valued function) of a key-value pair.
*
* @param <K> key type
* @param <V> value type
*/
public interface Predicate<K, V> {
+ /**
+ * Test if the record with the given key and value satisfies the predicate.
+ *
+ * @param key the key of the record
+ * @param value the value of the record
+ * @return return {@code true} if the key-value pair satisfies the predicate—{@code false} otherwise
+ */
boolean test(K key, V value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
index 551a672..e7cfa0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
@@ -18,11 +18,18 @@
package org.apache.kafka.streams.kstream;
/**
- * The Reducer interface for combining two values of the same type into a new value.
+ * The {@link Reducer} interface for combining two values of the same type into a new value.
*
* @param <V> value type
*/
public interface Reducer<V> {
+ /**
+ * Aggregate the two given values into a single one.
+ *
+ * @param value1 the first value for the aggregation
+ * @param value2 the second value for the aggregation
+ * @return the aggregated value
+ */
V apply(V value1, V value2);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index fa3a9d8..e4ce883 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -24,6 +24,14 @@ import java.util.Map;
/**
* The time-based window specifications used for aggregations.
+ * <p>
+ * The semantics of a time-based window are: Every T1 (advance) time-units, compute the aggregate total for T2 (size) time-units.
+ * <ul>
+ * <li> If {@code advance < size} a hopping windows is defined: <br />
+ * it discretize a stream into overlapping windows, which implies that a record maybe contained in one and or more "adjacent" windows.</li>
+ * <li> If {@code advance == size} a tumbling window is defined:<br />
+ * it discretize a stream into non-overlapping windows, which implies that a record is only ever contained in one and only one tumbling window.</li>
+ * </ul>
*/
public class TimeWindows extends Windows<TimeWindow> {
@@ -36,7 +44,7 @@ public class TimeWindows extends Windows<TimeWindow> {
/**
* The size of the window's advance interval, i.e. by how much a window moves forward relative
- * to the previous one. The interval's effective time unit is determined by the semantics of
+ * to the previous one. The interval's effective time unit is determined by the semantics of
* the topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
*/
public final long advance;
@@ -56,13 +64,13 @@ public class TimeWindows extends Windows<TimeWindow> {
/**
* Returns a window definition with the given window size, and with the advance interval being
- * equal to the window size. Think: [N * size, N * size + size), with N denoting the N-th
+ * equal to the window size. Think: [N * size, N * size + size), with N denoting the N-th
* window.
*
* This provides the semantics of tumbling windows, which are fixed-sized, gap-less,
- * non-overlapping windows. Tumbling windows are a specialization of hopping windows.
+ * non-overlapping windows. Tumbling windows are a specialization of hopping windows.
*
- * @param name The name of the window. Must not be null or empty.
+ * @param name The name of the window. Must not be null or empty.
* @param size The size of the window, with the requirement that size > 0.
* The window size's effective time unit is determined by the semantics of the
* topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
@@ -80,7 +88,7 @@ public class TimeWindows extends Windows<TimeWindow> {
* This provides the semantics of hopping windows, which are fixed-sized, overlapping windows.
*
* @param interval The advance interval ("hop") of the window, with the requirement that
- * 0 < interval ≤ size. The interval's effective time unit is
+ * 0 < interval ≤ size. The interval's effective time unit is
* determined by the semantics of the topology's configured
* {@link org.apache.kafka.streams.processor.TimestampExtractor}.
* @return a new window definition
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 5197e94..239854b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
- * A stateful Transformer interface for transform a key-value pair into a new value.
+ * A stateful {@link Transformer} interface for transform a key-value pair into a new value.
*
* @param <K> key type
* @param <V> value type
@@ -40,10 +40,10 @@ public interface Transformer<K, V, R> {
void init(ProcessorContext context);
/**
- * Transform the message with the given key and value.
+ * Transform the record with the given key and value.
*
- * @param key the key for the message
- * @param value the value for the message
+ * @param key the key for the record
+ * @param value the value for the record
* @return new value; if null no key-value pair will be forwarded to down stream
*/
R transform(K key, V value);
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
index fc7ba60..0341702 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
@@ -18,9 +18,14 @@
package org.apache.kafka.streams.kstream;
/**
- * A transformer supplier which can create one or more {@link Transformer} instances.
+ * A {@link TransformerSupplier} interface which can create one or more {@link Transformer} instances.
*/
public interface TransformerSupplier<K, V, R> {
+ /**
+ * Return a new {@link Transformer} instance.
+ *
+ * @return a new {@link Transformer} instance
+ */
Transformer<K, V, R> get();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index bea3b57..f45f8c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -29,6 +29,7 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
private static final long DEFAULT_START_TIMESTAMP = 0L;
+ /** The start timestamp of the window. */
public final long start;
private UnlimitedWindows(String name, long start) {
@@ -41,12 +42,18 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
}
/**
- * Returns an unlimited window definition
+ * Return an unlimited window starting at timestamp zero.
*/
public static UnlimitedWindows of(String name) {
return new UnlimitedWindows(name, DEFAULT_START_TIMESTAMP);
}
+ /**
+ * Return a new unlimited window for the specified start timestamp.
+ *
+ * @param start the window start time
+ * @return a new unlimited window that starts at {@code start}
+ */
public UnlimitedWindows startOn(long start) {
return new UnlimitedWindows(this.name, start);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
index 5f00a1a..8d4a8e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
@@ -18,7 +18,7 @@
package org.apache.kafka.streams.kstream;
/**
- * The ValueJoiner interface for joining two values and return a the joined new value.
+ * The {@link ValueJoiner} interface for joining two values into a new value.
*
* @param <V1> first value type
* @param <V2> second value type
@@ -26,5 +26,12 @@ package org.apache.kafka.streams.kstream;
*/
public interface ValueJoiner<V1, V2, R> {
+ /**
+ * Return a joined value consisting of {@code value1} and {@code value2}.
+ *
+ * @param value1 the first value for joining
+ * @param value2 the second value for joining
+ * @return the joined value
+ */
R apply(V1 value1, V2 value2);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
index 6e62a55..e168e37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -18,12 +18,18 @@
package org.apache.kafka.streams.kstream;
/**
- * The KeyValueMapper interface for mapping an original value to a new value (could be another key-value pair).
+ * The {@link ValueMapper} interface for mapping an original value to a new value (could be another key-value pair).
*
* @param <V1> original value type
* @param <V2> mapped value type
*/
public interface ValueMapper<V1, V2> {
+ /**
+ * Map the given value to a new value.
+ *
+ * @param value the value to be mapped
+ * @return the new value
+ */
V2 apply(V1 value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 63214fd..f92d9a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
- * A stateful Value Transformer interface for transform a value into a new value.
+ * A stateful {@link ValueTransformer} interface to transform a value into a new value.
*
* @param <V> value type
* @param <R> return type
@@ -31,7 +31,7 @@ public interface ValueTransformer<V, R> {
* Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
* that contains it is initialized.
* <p>
- * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
+ * If this transformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
* {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
*
* @param context the context; may not be null
@@ -39,9 +39,9 @@ public interface ValueTransformer<V, R> {
void init(ProcessorContext context);
/**
- * Transform the message with the given key and value.
+ * Transform the record with the given key and value.
*
- * @param value the value for the message
+ * @param value the value for the record
* @return new value
*/
R transform(V value);
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
index 6bc86bc..ecd454a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
@@ -18,9 +18,14 @@
package org.apache.kafka.streams.kstream;
/**
- * A value transformer supplier which can create one or more {@link ValueTransformer} instances.
+ * A {@link ValueTransformerSupplier} interface which can create one or more {@link ValueTransformer} instances.
*/
public interface ValueTransformerSupplier<V, R> {
+ /**
+ * Return a new {@link ValueTransformer} instance.
+ *
+ * @return a new {@link ValueTransformer} instance.
+ */
ValueTransformer<V, R> get();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index 784d5c3..e1ea9a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -25,25 +25,37 @@ public abstract class Window {
private long start;
private long end;
+ /**
+ * Create a new window for the given start time (inclusive) and end time (exclusive).
+ *
+ * @param start the start timestamp of the window (inclusive)
+ * @param end the end timestamp of the window (exclusive)
+ */
public Window(long start, long end) {
this.start = start;
this.end = end;
}
/**
- * Returns the start timestamp of this window, inclusive
+ * Return the start timestamp of this window, inclusive
*/
public long start() {
return start;
}
/**
- * Returns the end timestamp of this window, exclusive
+ * Return the end timestamp of this window, exclusive
*/
public long end() {
return end;
}
+ /**
+ * Check if the given window overlaps with this window.
+ *
+ * @param other another window
+ * @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise
+ */
public boolean overlap(Window other) {
return this.start() < other.end() || other.start() < this.end();
}
@@ -68,4 +80,4 @@ public abstract class Window {
return (int) (n % 0xFFFFFFFFL);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index 3691282..feaf6a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -22,30 +22,40 @@ package org.apache.kafka.streams.kstream;
* i.e. {@link KStream#aggregateByKey(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde,
* org.apache.kafka.common.serialization.Serde)}
*
- * @param <T> Type of the key
+ * @param <K> Type of the key
*/
-public class Windowed<T> {
+public class Windowed<K> {
- private T value;
+ private K key;
private Window window;
- public Windowed(T value, Window window) {
- this.value = value;
+ public Windowed(K key, Window window) {
+ this.key = key;
this.window = window;
}
- public T value() {
- return value;
+ /**
+ * Return the key of the window.
+ *
+ * @return the key of the window
+ */
+ public K key() {
+ return key;
}
+ /**
+ * Return the window containing the values associated with this key.
+ *
+ * @return the window containing the values
+ */
public Window window() {
return window;
}
@Override
public String toString() {
- return "[" + value + "@" + window.start() + "]";
+ return "[" + key + "@" + window.start() + "]";
}
@Override
@@ -58,12 +68,12 @@ public class Windowed<T> {
Windowed<?> that = (Windowed) obj;
- return this.window.equals(that.window) && this.value.equals(that.value);
+ return this.window.equals(that.window) && this.key.equals(that.key);
}
@Override
public int hashCode() {
- long n = ((long) window.hashCode() << 32) | value.hashCode();
+ long n = ((long) window.hashCode() << 32) | key.hashCode();
return (int) (n % 0xFFFFFFFFL);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 1406de6..06cacb4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -30,16 +30,12 @@ public abstract class Windows<W extends Window> {
private static final int DEFAULT_NUM_SEGMENTS = 3;
- private static final long DEFAULT_EMIT_DURATION = 1000L;
-
private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L; // one day
private static final AtomicInteger NAME_INDEX = new AtomicInteger(0);
protected String name;
- private long emitDurationMs;
-
private long maintainDurationMs;
public int segments;
@@ -50,7 +46,6 @@ public abstract class Windows<W extends Window> {
}
this.name = name;
this.segments = DEFAULT_NUM_SEGMENTS;
- this.emitDurationMs = DEFAULT_EMIT_DURATION;
this.maintainDurationMs = DEFAULT_MAINTAIN_DURATION;
}
@@ -59,16 +54,9 @@ public abstract class Windows<W extends Window> {
}
/**
- * Set the window emit duration in milliseconds of system time.
- */
- public Windows emit(long durationMs) {
- this.emitDurationMs = durationMs;
-
- return this;
- }
-
- /**
* Set the window maintain duration in milliseconds of system time.
+ *
+ * @return itself
*/
public Windows until(long durationMs) {
this.maintainDurationMs = durationMs;
@@ -79,6 +67,8 @@ public abstract class Windows<W extends Window> {
/**
* Specify the number of segments to be used for rolling the window store,
* this function is not exposed to users but can be called by developers that extend this JoinWindows specs.
+ *
+ * @return itself
*/
protected Windows segments(int segments) {
this.segments = segments;
@@ -86,18 +76,21 @@ public abstract class Windows<W extends Window> {
return this;
}
- public long emitEveryMs() {
- return this.emitDurationMs;
- }
-
+ /**
+ * Return the window maintain duration in milliseconds of system time.
+ *
+ * @return the window maintain duration in milliseconds of system time
+ */
public long maintainMs() {
return this.maintainDurationMs;
}
- protected String newName(String prefix) {
- return prefix + String.format("%010d", NAME_INDEX.getAndIncrement());
- }
-
+ /**
+ * Creates all windows that contain the provided timestamp.
+ *
+ * @param timestamp the timestamp window should get created for
+ * @return a map of {@code windowStartTimestamp -> Window} entries
+ */
public abstract Map<Long, W> windowsFor(long timestamp);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index f36cc8c..b4272f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -163,7 +163,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
@SuppressWarnings("unchecked")
@Override
public T get(Windowed<K> windowedKey) {
- K key = windowedKey.value();
+ K key = windowedKey.key();
W window = (W) windowedKey.window();
// this iterator should contain at most one element
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 6c05ce3..3ed1499 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -157,7 +157,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
@SuppressWarnings("unchecked")
@Override
public V get(Windowed<K> windowedKey) {
- K key = windowedKey.value();
+ K key = windowedKey.key();
W window = (W) windowedKey.window();
// this iterator should only contain one element
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
index 0afcad1..2e19816 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
@@ -40,7 +40,7 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
@Override
public byte[] serialize(String topic, Windowed<T> data) {
- byte[] serializedKey = inner.serialize(topic, data.value());
+ byte[] serializedKey = inner.serialize(topic, data.key());
ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE);
buf.put(serializedKey);
@@ -55,7 +55,7 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
}
public byte[] serializeBaseKey(String topic, Windowed<T> data) {
- return inner.serialize(topic, data.value());
+ return inner.serialize(topic, data.key());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index 10e69cc..1e30864 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -29,12 +29,12 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window
}
/**
- * WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value
+ * WindowedStreamPartitioner determines the partition number for a record with the given windowed key and value
* and the current number of partitions. The partition number id determined by the original key of the windowed key
* using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
*
- * @param windowedKey the key of the message
- * @param value the value of the message
+ * @param windowedKey the key of the record
+ * @param value the value of the record
* @param numPartitions the total number of partitions
* @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
index 61b1c98..0d3424e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
@@ -27,7 +27,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
* via this timestamp extractor.
*
* If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide
- * <i>event-time</i> semantics.
+ * <i>event-time</i> semantics. If <i>LogAppendTime</i> is used to define the built-in timestamps, using
+ * this extractor effectively provides <i>ingestion-time</i> semantics.
*
* If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index fbd72f0..92fcf12 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -37,10 +37,10 @@ public interface Processor<K, V> {
void init(ProcessorContext context);
/**
- * Process the message with the given key and value.
+ * Process the record with the given key and value.
*
- * @param key the key for the message
- * @param value the value for the message
+ * @param key the key for the record
+ * @param value the value for the record
*/
void process(K key, V value);
@@ -53,7 +53,8 @@ public interface Processor<K, V> {
void punctuate(long timestamp);
/**
- * Close this processor and clean up any resources.
+ * Close this processor and clean up any resources. Be aware that {@link #close()} is called after an internal cleanup.
+ * Thus, it is not possible to write anything to Kafka as underlying clients are already closed.
*/
void close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
index 6561899..7976e16 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
@@ -29,5 +29,10 @@ package org.apache.kafka.streams.processor;
*/
public interface ProcessorSupplier<K, V> {
+ /**
+ * Return a new {@link Processor} instance.
+ *
+ * @return a new {@link Processor} instance
+ */
Processor<K, V> get();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
index 993500d..f2ae020 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -22,7 +22,17 @@ package org.apache.kafka.streams.processor;
*/
public interface StateStoreSupplier {
+ /**
+ * Return the name of this state store supplier.
+ *
+ * @return the name of this state store supplier
+ */
String name();
+ /**
+ * Return a new {@link StateStore} instance.
+ *
+ * @return a new {@link StateStore} instance
+ */
StateStore get();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
index f14d9d9..fbb0378 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
@@ -17,21 +17,21 @@
package org.apache.kafka.streams.processor;
/**
- * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
+ * Determine how records are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
* {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
* <p>
* Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
* using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
- * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
+ * to use multiple instances of your topology to process in parallel all of the records on the topology's source topics.
* <p>
* When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
- * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
+ * those processors in that topology instance will consume the records from those partitions. In many cases, Kafka Streams will
* automatically manage these instances, and adjust when new topology instances are added or removed.
* <p>
- * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
- * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
- * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
- * determine to which partition each message should be written.
+ * Some topologies, though, need more control over which records appear in each partition. For example, some topologies that have
+ * stateful processors may want all records within a range of keys to always be delivered to and handled by the same topology instance.
+ * An upstream topology producing records to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
+ * determine to which partition each record should be written.
* <p>
* To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner
* when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
@@ -48,10 +48,10 @@ package org.apache.kafka.streams.processor;
public interface StreamPartitioner<K, V> {
/**
- * Determine the partition number for a message with the given key and value and the current number of partitions.
+ * Determine the partition number for a record with the given key and value and the current number of partitions.
*
- * @param key the key of the message
- * @param value the value of the message
+ * @param key the key of the record
+ * @param value the value of the record
* @param numPartitions the total number of partitions
* @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index fa7c73c..7fc00d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -25,11 +25,13 @@ import java.io.IOException;
import java.nio.ByteBuffer;
/**
- * The task id representation composed as topic group id plus the assigned partition id.
+ * The task ID representation composed as topic group ID plus the assigned partition ID.
*/
public class TaskId implements Comparable<TaskId> {
+ /** The ID of the topic group. */
public final int topicGroupId;
+ /** The ID of the partition. */
public final int partition;
public TaskId(int topicGroupId, int partition) {
@@ -42,7 +44,7 @@ public class TaskId implements Comparable<TaskId> {
}
/**
- * @throws TaskIdFormatException if the string is not a valid TaskId
+ * @throws TaskIdFormatException if the string is not a valid {@link TaskId}
*/
public static TaskId parse(String string) {
int index = string.indexOf('_');
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
index 224d580..c872fa1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
@@ -26,10 +26,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
public interface TimestampExtractor {
/**
- * Extracts a timestamp from a message
+ * Extracts a timestamp from a record.
+ * <p>
+ * Typically, the timestamp represents the milliseconds since midnight, January 1, 1970 UTC.
*
- * @param record ConsumerRecord
- * @return timestamp
+ * @param record a data record
+ * @return the timestamp of the record
*/
long extract(ConsumerRecord<Object, Object> record);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 487d5fe..5425149 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -43,11 +43,11 @@ import java.util.Set;
/**
* A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
* and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to
- * its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
- * processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
- * is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
+ * its child nodes. A {@link Processor processor} is a node in the graph that receives input records from upstream nodes,
+ * processes that records, and optionally forwarding new records to one or all of its children. Finally, a {@link SinkNode sink}
+ * is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic. This builder allows you
* to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams}
- * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing messages}.
+ * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing records}.
*/
public class TopologyBuilder {
@@ -193,7 +193,7 @@ public class TopologyBuilder {
public TopologyBuilder() {}
/**
- * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
+ * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
* The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
* {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
@@ -208,15 +208,15 @@ public class TopologyBuilder {
}
/**
- * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
+ * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
* The source will use the specified key and value deserializers.
*
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
- * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
+ * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source
* should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
- * @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
+ * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source
* should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param topics the name of one or more Kafka topics that this source is to consume
@@ -242,14 +242,14 @@ public class TopologyBuilder {
}
/**
- * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
+ * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and
* {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
*
* @param name the unique name of the sink
- * @param topic the name of the Kafka topic to which this sink should write its messages
- * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
+ * @param topic the name of the Kafka topic to which this sink should write its records
+ * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, StreamPartitioner, String...)
@@ -261,22 +261,22 @@ public class TopologyBuilder {
}
/**
- * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic, using
+ * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, using
* the supplied partitioner.
* The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and
* {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
* <p>
- * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
+ * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
* the named Kafka topic's partitions. Such control is often useful with topologies that use
* {@link #addStateStore(StateStoreSupplier, String...) state stores}
- * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
- * messages among partitions using Kafka's default partitioning logic.
+ * in its processors. In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
+ * records among partitions using Kafka's default partitioning logic.
*
* @param name the unique name of the sink
- * @param topic the name of the Kafka topic to which this sink should write its messages
- * @param partitioner the function that should be used to determine the partition for each message processed by the sink
- * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
+ * @param topic the name of the Kafka topic to which this sink should write its records
+ * @param partitioner the function that should be used to determine the partition for each record processed by the sink
+ * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
@@ -288,18 +288,18 @@ public class TopologyBuilder {
}
/**
- * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
+ * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the specified key and value serializers.
*
* @param name the unique name of the sink
- * @param topic the name of the Kafka topic to which this sink should write its messages
- * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
+ * @param topic the name of the Kafka topic to which this sink should write its records
+ * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
* should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
- * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
+ * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
* should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
- * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
+ * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
@@ -311,19 +311,19 @@ public class TopologyBuilder {
}
/**
- * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
+ * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the specified key and value serializers, and the supplied partitioner.
*
* @param name the unique name of the sink
- * @param topic the name of the Kafka topic to which this sink should write its messages
- * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
+ * @param topic the name of the Kafka topic to which this sink should write its records
+ * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
* should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
- * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
+ * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
* should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
- * @param partitioner the function that should be used to determine the partition for each message processed by the sink
- * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
+ * @param partitioner the function that should be used to determine the partition for each record processed by the sink
+ * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
@@ -354,11 +354,11 @@ public class TopologyBuilder {
}
/**
- * Add a new processor node that receives and processes messages output by one or more parent source or processor node.
- * Any new messages output by this processor will be forwarded to its child processor or sink nodes.
+ * Add a new processor node that receives and processes records output by one or more parent source or processor node.
+ * Any new record output by this processor will be forwarded to its child processor or sink nodes.
* @param name the unique name of the processor node
* @param supplier the supplier used to obtain this node's {@link Processor} instance
- * @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive
+ * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
* and process
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
index 81821ce..305573b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
@@ -25,9 +25,16 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
* Using this extractor effectively provides <i>processing-time</i> semantics.
*
* If you need <i>event-time</i> semantics, use {@link ConsumerRecordTimestampExtractor} with
- * built-in <i>CreateTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details).
+ * built-in <i>CreateTime</i> or <i>LogAppendTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details).
*/
public class WallclockTimestampExtractor implements TimestampExtractor {
+
+ /**
+ * Return the current wall clock time as timestamp.
+ *
+ * @param record a data record
+ * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC
+ */
@Override
public long extract(ConsumerRecord<Object, Object> record) {
return System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index eff90e8..d4cb78c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -286,7 +286,7 @@ public class StreamThread extends Thread {
removeStandbyTasks();
// We need to first close the underlying clients before closing the state
- // manager, for example we need to make sure producer's message sends
+ // manager, for example we need to make sure producer's record sends
// have all been acked before the state manager records
// changelog sent offsets
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index 933bf72..b19510c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -24,13 +24,23 @@ import org.apache.kafka.common.serialization.Serializer;
/**
* Factory for creating serializers / deserializers for state stores in Kafka Streams.
*
- * @param <K> key type of serdes
- * @param <V> value type of serdes
+ * @param <K> key type of serde
+ * @param <V> value type of serde
*/
public final class StateSerdes<K, V> {
- public static <K, V> StateSerdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) {
- return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
+ /**
+ * Create a new instance of {@link StateSerdes} for the given state name and key-/value-type classes.
+ *
+ * @param stateName the name of the state
+ * @param keyClass the class of the key type
+ * @param valueClass the class of the value type
+ * @param <K> the key type
+ * @param <V> the value type
+ * @return a new instance of {@link StateSerdes}
+ */
+ public static <K, V> StateSerdes<K, V> withBuiltinTypes(String stateName, Class<K> keyClass, Class<V> valueClass) {
+ return new StateSerdes<>(stateName, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
}
private final String stateName;
@@ -63,46 +73,105 @@ public final class StateSerdes<K, V> {
this.valueSerde = valueSerde;
}
+ /**
+ * Return the key serde.
+ *
+ * @return the key serde
+ */
public Serde<K> keySerde() {
return keySerde;
}
+ /**
+ * Return the value serde.
+ *
+ * @return the value serde
+ */
public Serde<V> valueSerde() {
return valueSerde;
}
+ /**
+ * Return the key deserializer.
+ *
+ * @return the key deserializer
+ */
public Deserializer<K> keyDeserializer() {
return keySerde.deserializer();
}
+ /**
+ * Return the key serializer.
+ *
+ * @return the key serializer
+ */
public Serializer<K> keySerializer() {
return keySerde.serializer();
}
+ /**
+ * Return the value deserializer.
+ *
+ * @return the value deserializer
+ */
public Deserializer<V> valueDeserializer() {
return valueSerde.deserializer();
}
+ /**
+ * Return the value serializer.
+ *
+ * @return the value serializer
+ */
public Serializer<V> valueSerializer() {
return valueSerde.serializer();
}
- public String topic() {
+ /**
+ * Return the name of the state.
+ *
+ * @return the name of the state
+ */
+ public String stateName() {
return stateName;
}
+ /**
+ * Deserialize the key from raw bytes.
+ *
+ * @param rawKey the key as raw bytes
+ * @return the key as typed object
+ */
public K keyFrom(byte[] rawKey) {
return keySerde.deserializer().deserialize(stateName, rawKey);
}
+ /**
+ * Deserialize the value from raw bytes.
+ *
+ * @param rawValue the value as raw bytes
+ * @return the value as typed object
+ */
public V valueFrom(byte[] rawValue) {
return valueSerde.deserializer().deserialize(stateName, rawValue);
}
+ /**
+ * Serialize the given key.
+ *
+ * @param key the key to be serialized
+ * @return the serialized key
+ */
public byte[] rawKey(K key) {
return keySerde.serializer().serialize(stateName, key);
}
+ /**
+ * Serialize the given value.
+ *
+ * @param value the value to be serialized
+ * @return the serialized value
+ */
public byte[] rawValue(V value) {
return valueSerde.serializer().serialize(stateName, value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index c7a882f..e400cef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.processor.StateStore;
/**
- * A windowed store interface extending {@link StateStore}
+ * A windowed store interface extending {@link StateStore}.
*
* @param <K> Type of keys
* @param <V> Type of values
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 733c1ea..fbe7754 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -213,7 +213,7 @@ public class SmokeTestClient extends SmokeTestUtil {
new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
@Override
public KeyValue<String, Long> apply(Windowed<String> key, Long value) {
- return new KeyValue<>(key.value() + "@" + key.window().start(), value);
+ return new KeyValue<>(key.key() + "@" + key.window().start(), value);
}
}
).to(stringSerde, longSerde, "wcnt");
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
index c5ded5e..b0d7a0b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -77,7 +77,7 @@ public class SmokeTestUtil {
public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, KeyValue<K, V>> {
public KeyValue<K, V> apply(Windowed<K> winKey, V value) {
- return new KeyValue<K, V>(winKey.value(), value);
+ return new KeyValue<K, V>(winKey.key(), value);
}
}