You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/10/03 14:36:35 UTC
[1/2] kafka git commit: MINOR: fix JavaDocs warnings
Repository: kafka
Updated Branches:
refs/heads/trunk 716330a5b -> 3dcbbf703
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 66ec0d7..1abc5e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -166,7 +166,7 @@ public interface KTable<K, V> {
* (i.e., that would be equivalent to calling {@link KTable#filter(Predicate)}.
* @return a {@code KTable} that contains only those records that satisfy the given predicate
* @see #filterNot(Predicate, Materialized)
- * @deprecated use {@link #filter(Predicate, Materialized)}
+ * @deprecated use {@link #filter(Predicate, Materialized) filter(predicate, Materialized.as(queryableStoreName))}
*/
@Deprecated
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
@@ -203,7 +203,7 @@ public interface KTable<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains only those records that satisfy the given predicate
* @see #filterNot(Predicate, Materialized)
- * @deprecated use {@link #filter(Predicate, Materialized)}
+ * @deprecated use {@link #filter(Predicate, Materialized) filter(predicate, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -297,7 +297,7 @@ public interface KTable<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
* @see #filter(Predicate, Materialized)
- * @deprecated use {@link #filterNot(Predicate, Materialized)}
+ * @deprecated use {@link #filterNot(Predicate, Materialized) filterNot(predicate, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -336,7 +336,7 @@ public interface KTable<K, V> {
* (i.e., that would be equivalent to calling {@link KTable#filterNot(Predicate)}.
* @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
* @see #filter(Predicate, Materialized)
- * @deprecated use {@link #filter(Predicate, Materialized)}
+ * @deprecated use {@link #filter(Predicate, Materialized) filterNot(predicate, Materialized.as(queryableStoreName))}
*/
@Deprecated
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
@@ -463,7 +463,7 @@ public interface KTable<K, V> {
* @param <VR> the value type of the result {@code KTable}
*
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
- * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
+ * @deprecated use {@link #mapValues(ValueMapper, Materialized) mapValues(mapper, Materialized.as(queryableStoreName).withValueSerde(valueSerde))}
*/
@Deprecated
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);
@@ -507,7 +507,7 @@ public interface KTable<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
- * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
+ * @deprecated use {@link #mapValues(ValueMapper, Materialized) mapValues(mapper, Materialized.as(KeyValueByteStoreSupplier).withValueSerde(valueSerde))}
*/
@Deprecated
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
@@ -530,7 +530,8 @@ public interface KTable<K, V> {
* update record.
* @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
* followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a KStream using {@code toStream()} and then use {@link KStream#print()} on the result.
+ * convert to a {@link KStream} using {@link #toStream()} and then use
+ * {@link KStream#print(Printed) print(Printed.toSysOut())} on the result.
*/
@Deprecated
void print();
@@ -551,7 +552,8 @@ public interface KTable<K, V> {
* @param label the name used to label the key/value pairs printed to the console
* @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
* followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a KStream using {@code toStream()} and then use {@link KStream#print(String)} on the result.
+ * convert to a {@link KStream} using {@link #toStream()} and then use
+ * {@link KStream#print(Printed) print(Printed.toSysOut().withLabel(lable))} on the result.
*/
@Deprecated
void print(final String label);
@@ -574,7 +576,8 @@ public interface KTable<K, V> {
* @param valSerde value serde used to deserialize value if type is {@code byte[]}
* @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
* followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde, Serde)} on the result.
+ * convert to a {@link KStream} using {@link #toStream()} and then use
+ * {@link KStream#print(Printed) print(Printed.toSysOut().withKeyValueMapper(...)} on the result.
*/
@Deprecated
void print(final Serde<K> keySerde,
@@ -598,7 +601,8 @@ public interface KTable<K, V> {
* @param label the name used to label the key/value pairs printed to the console
* @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
* followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde, Serde, String)} on the result.
+ * convert to a {@link KStream} using {@link #toStream()} and then use
+ * {@link KStream#print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(...)} on the result.
*/
@Deprecated
void print(final Serde<K> keySerde,
@@ -622,7 +626,8 @@ public interface KTable<K, V> {
* @param filePath name of file to write to
* @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
* followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String)}} on the result.
+ * convert to a {@link KStream} using {@link #toStream()} and then use
+ * {@link KStream#print(Printed) print(Printed.toFile(filePath)} on the result.
*/
@Deprecated
void writeAsText(final String filePath);
@@ -644,7 +649,8 @@ public interface KTable<K, V> {
* @param label the name used to label the key/value pairs printed out to the console
* @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
* followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, String)}} on the result.
+ * convert to a {@link KStream} using {@link #toStream()} and then use
+ * {@link KStream#print(Printed) print(Printed.toFile(filePath).withLabel(label)} on the result.
*/
@Deprecated
void writeAsText(final String filePath,
@@ -669,7 +675,8 @@ public interface KTable<K, V> {
* @param valSerde value serde used to deserialize value if type is {@code byte[]}
* @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
* followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, Serde, Serde)}} on the result.
+ * convert to a {@link KStream} using {@link #toStream()} and then use
+ * {@link KStream#print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(...)} on the result.
*/
@Deprecated
void writeAsText(final String filePath,
@@ -695,8 +702,8 @@ public interface KTable<K, V> {
* @param valSerde value serde used to deserialize value if type is {@code byte[]}
* @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
* followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, String, Serde, Serde)}} on the result.
-
+ * convert to a {@link KStream} using {@link #toStream()} and then use
+ * {@link KStream#print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(...)} on the result.
*/
@Deprecated
void writeAsText(final String filePath,
@@ -714,7 +721,8 @@ public interface KTable<K, V> {
* @param action an action to perform on each record
* @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
* followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
- * convert to a KStream using {@code toStream()} and then use {@link KStream#foreach(ForeachAction)}} on the result.
+ * convert to a {@link KStream} using {@link #toStream()} and then use
+ * {@link KStream#foreach(ForeachAction) foreach(action)} on the result.
*/
@Deprecated
void foreach(final ForeachAction<? super K, ? super V> action);
@@ -763,18 +771,19 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(String) #to(someTopicName)} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link StreamsBuilder#table(String, Materialized)})
+ * {@link KStreamBuilder#table(String, String)})
* The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
*
* @param topic the topic name
* @param queryableStoreName the state store name used for the result {@code KTable}; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)()}
+ * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)}
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
- * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+ * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and
+ * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
+ * to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final String topic,
@@ -787,17 +796,18 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(String) #to(someTopicName)} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link StreamsBuilder#table(String, Materialized)})
+ * {@link KStreamBuilder#table(String, String)})
* The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
*
* @param topic the topic name
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
- * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+ * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and
+ * and {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier))}
+ * to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final String topic,
@@ -810,15 +820,15 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(String) #to(someTopicName)} and
- * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
+ * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
- * {@link StreamsBuilder#table(String)})
+ * {@link KStreamBuilder#table(String)})
*
* @param topic the topic name
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
- * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+ * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)} and
+ * and {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final String topic);
@@ -831,17 +841,18 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
- * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
+ * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
- * {@link StreamsBuilder#table(String)})
+ * {@link KStreamBuilder#table(String)})
*
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
* @param topic the topic name
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
- * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+ * @deprecated use {@link #toStream()} followed by
+ * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and
+ * {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -855,10 +866,10 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link StreamsBuilder#table(String, Materialized)})
+ * {@link KStreamBuilder#table(String, String)})
*
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
@@ -866,8 +877,10 @@ public interface KTable<K, V> {
* @param queryableStoreName the state store name used for the result {@code KTable}.
* If {@code null} this is the equivalent of {@link KTable#through(StreamPartitioner, String)}
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
- * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+ * @deprecated use {@link #toStream()} followed by
+ * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and
+ * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
+ * to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -882,18 +895,20 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link StreamsBuilder#table(String, Materialized)})
+ * {@link KStreamBuilder#table(String, String)})
*
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
* @param topic the topic name
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
- * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+ * @deprecated use {@link #toStream()} followed by
+ * {@link KStream#to(String, Produced) to(topic, Produced.streamPartitioner(partitioner))} and
+ * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier)}
+ * to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -909,10 +924,10 @@ public interface KTable<K, V> {
* used—otherwise producer's {@link DefaultPartitioner} is used.
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link KStreamBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link StreamsBuilder#table(String, Materialized)})
+ * {@link KStreamBuilder#table(String, String)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -920,13 +935,16 @@ public interface KTable<K, V> {
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name
* @param queryableStoreName the state store name used for the result {@code KTable}.
- * If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, String)()}
+ * If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, String)}
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
- * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+ * @deprecated use {@link #toStream()} followed by
+ * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))} and
+ * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
+ * to read back as a {@code KTable}
*/
@Deprecated
- KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+ KTable<K, V> through(final Serde<K> keySerde,
+ final Serde<V> valSerde,
final String topic,
final String queryableStoreName);
@@ -939,7 +957,7 @@ public interface KTable<K, V> {
* used—otherwise producer's {@link DefaultPartitioner} is used.
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
* {@link StreamsBuilder#table(String, Materialized)})
@@ -951,11 +969,14 @@ public interface KTable<K, V> {
* @param topic the topic name
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
- * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+ * @deprecated use {@link #toStream()} followed by
+ * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))} and
+ * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier)}
+ * to read back as a {@code KTable}
*/
@Deprecated
- KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+ KTable<K, V> through(final Serde<K> keySerde,
+ final Serde<V> valSerde,
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -968,10 +989,10 @@ public interface KTable<K, V> {
* used—otherwise producer's {@link DefaultPartitioner} is used.
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
- * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
+ * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with an interna; store name (cf.
- * {@link StreamsBuilder#table(String)})
+ * {@link KStreamBuilder#table(String)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -979,11 +1000,13 @@ public interface KTable<K, V> {
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
- * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+ * @deprecated use {@link #toStream()} followed by
+ * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde))}
+ * and {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
*/
@Deprecated
- KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+ KTable<K, V> through(final Serde<K> keySerde,
+ final Serde<V> valSerde,
final String topic);
/**
@@ -994,10 +1017,10 @@ public interface KTable<K, V> {
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
* #to(keySerde, valueSerde, partitioner, someTopicName)} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link StreamsBuilder#table(String, Materialized)})
+ * {@link KStreamBuilder#table(String, String)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -1011,8 +1034,10 @@ public interface KTable<K, V> {
* @param queryableStoreName the state store name used for the result {@code KTable}.
* If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, StreamPartitioner, String)()}
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
- * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+ * @deprecated use {@link #toStream()} followed by
+ * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and
+ * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(queryableStoreName))}
+ * to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final Serde<K> keySerde,
@@ -1029,10 +1054,10 @@ public interface KTable<K, V> {
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
* #to(keySerde, valueSerde, partitioner, someTopicName)} and
- * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link StreamsBuilder#table(String, Materialized)})
+ * {@link KStreamBuilder#table(String, String)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -1045,8 +1070,10 @@ public interface KTable<K, V> {
* @param topic the topic name
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
- * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+ * @deprecated use {@link #toStream()} followed by
+ * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and
+ * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(topic, Materialized.as(KeyValueBytesStoreSupplier))}
+ * to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final Serde<K> keySerde,
@@ -1063,10 +1090,10 @@ public interface KTable<K, V> {
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
* #to(keySerde, valueSerde, partitioner, someTopicName)} and
- * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
+ * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
- * {@link StreamsBuilder#table(String)})
+ * {@link KStreamBuilder#table(String)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -1078,8 +1105,9 @@ public interface KTable<K, V> {
* be used
* @param topic the topic name
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
- * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable}
+ * @deprecated use {@link #toStream()} followed by
+ * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner))} and
+ * {@link StreamsBuilder#table(String) StreamsBuilder#table(topic)} to read back as a {@code KTable}
*/
@Deprecated
KTable<K, V> through(final Serde<K> keySerde,
@@ -1094,7 +1122,7 @@ public interface KTable<K, V> {
* started).
*
* @param topic the topic name
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)}
+ * @deprecated use {@link #toStream()} followed by {@link KStream#to(String) to(topic)}
*/
@Deprecated
void to(final String topic);
@@ -1108,7 +1136,8 @@ public interface KTable<K, V> {
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
* @param topic the topic name
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+ * @deprecated use {@link #toStream()} followed by
+ * {@link KStream#to(String, Produced) to(topic, Produced.withStreamPartitioner(partitioner)}
*/
@Deprecated
void to(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -1127,7 +1156,8 @@ public interface KTable<K, V> {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+ * @deprecated use {@link #toStream()} followed by
+ * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde)}
*/
@Deprecated
void to(final Serde<K> keySerde,
@@ -1149,7 +1179,8 @@ public interface KTable<K, V> {
* {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will
* be used
* @param topic the topic name
- * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)}
+ * @deprecated use {@link #toStream()} followed by
+ * {@link KStream#to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partioner)}
*/
@Deprecated
void to(final Serde<K> keySerde,
@@ -1243,7 +1274,7 @@ public interface KTable<K, V> {
* @param <KR> the key type of the result {@link KGroupedTable}
* @param <VR> the value type of the result {@link KGroupedTable}
* @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
- * @deprecated use {@link #groupBy(KeyValueMapper, Serialized)}
+ * @deprecated use {@link #groupBy(KeyValueMapper, Serialized) groupBy(selector, Serialized.with(keySerde, valueSerde)}
*/
@Deprecated
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
@@ -1475,7 +1506,7 @@ public interface KTable<K, V> {
* {@link ValueJoiner}, one for each matched record-pair with the same key
* @see #leftJoin(KTable, ValueJoiner, Materialized)
* @see #outerJoin(KTable, ValueJoiner, Materialized)
- * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)}
+ * @deprecated use {@link #join(KTable, ValueJoiner, Materialized) join(other, joiner, Materialized.as(queryableStoreName).withValueSerde(joinSerde)}
*/
@Deprecated
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
@@ -1554,7 +1585,7 @@ public interface KTable<K, V> {
* {@link ValueJoiner}, one for each matched record-pair with the same key
* @see #leftJoin(KTable, ValueJoiner, Materialized)
* @see #outerJoin(KTable, ValueJoiner, Materialized)
- * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)}
+ * @deprecated use {@link #join(KTable, ValueJoiner, Materialized) join(other, joiner, Materialized.as(KeyValueByteStoreSupplier)}
*/
@Deprecated
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
@@ -1811,7 +1842,7 @@ public interface KTable<K, V> {
* left {@code KTable}
* @see #join(KTable, ValueJoiner, Materialized)
* @see #outerJoin(KTable, ValueJoiner, Materialized)
- * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)}
+ * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized) leftJoin(other, joiner, Materialized.as(queryableStoreName).withValueSerde(joinSerde)}
*/
@Deprecated
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
@@ -1898,7 +1929,7 @@ public interface KTable<K, V> {
* left {@code KTable}
* @see #join(KTable, ValueJoiner, Materialized)
* @see #outerJoin(KTable, ValueJoiner, Materialized)
- * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)}
+ * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized) leftJoin(other, joiner, Materialized.as(KeyValueByteStoreSupplier)}
*/
@Deprecated
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
@@ -2153,7 +2184,7 @@ public interface KTable<K, V> {
* both {@code KTable}s
* @see #join(KTable, ValueJoiner, Materialized)
* @see #leftJoin(KTable, ValueJoiner, Materialized)
- * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)}
+ * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized) outerJoin(other, joiner, Materialized.as(queryableStoreName).withValueSerde(joinSerde)}
*/
@Deprecated
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
@@ -2239,7 +2270,7 @@ public interface KTable<K, V> {
* both {@code KTable}s
* @see #join(KTable, ValueJoiner)
* @see #leftJoin(KTable, ValueJoiner)
- * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)}
+ * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized) outerJoin(other, joiner, Materialized.as(KeyValueByteStoreSupplier)}
*/
@Deprecated
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
index d3db087..8d2c22a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
@@ -105,7 +105,6 @@ public class Printed<K, V> {
* The example below shows how to customize output data.
* <pre>{@code
* final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
- * @Override
* public String apply(Integer key, String value) {
* return String.format("(%d, %s)", key, value);
* }
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
index e7cc234..1acd587 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
@@ -28,12 +28,12 @@ import org.apache.kafka.streams.KeyValue;
* {@code Reducer} can be used to implement aggregation functions like sum, min, or max.
*
* @param <V> value type
- * @see KGroupedStream#reduce(Reducer, String)
- * @see KGroupedStream#reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, Windows, String)
- * @see KGroupedStream#reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, String)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#reduce(Reducer)
+ * @see KGroupedStream#reduce(Reducer, Materialized)
+ * @see TimeWindowedKStream#reduce(Reducer)
+ * @see TimeWindowedKStream#reduce(Reducer, Materialized)
+ * @see SessionWindowedKStream#reduce(Reducer)
+ * @see SessionWindowedKStream#reduce(Reducer, Materialized)
* @see Aggregator
*/
public interface Reducer<V> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index 3c3ef7e..ddc0371 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -127,13 +127,13 @@ public interface SessionWindowedKStream<K, V> {
* @param initializer the instance of {@link Initializer}
* @param aggregator the instance of {@link Aggregator}
* @param sessionMerger the instance of {@link Merger}
- * @param <T> the value type of the resulting {@link KTable}
+ * @param <VR> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
*/
- <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
- final Aggregator<? super K, ? super V, T> aggregator,
- final Merger<? super K, T> sessionMerger);
+ <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator,
+ final Merger<? super K, VR> sessionMerger);
/**
* Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
@@ -222,10 +222,10 @@ public interface SessionWindowedKStream<K, V> {
* <pre>{@code
* // At the example of a Reducer<Long>
* new Reducer<Long>() {
- * @Override
* public Long apply(Long aggValue, Long currValue) {
* return aggValue + currValue;
* }
+ * }
* }</pre>
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -260,8 +260,8 @@ public interface SessionWindowedKStream<K, V> {
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
* provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
- * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
- * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}
+ * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
+ * @param materializedAs an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 6e06461..693bee0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -58,12 +58,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
* @see TimeWindows
* @see UnlimitedWindows
* @see JoinWindows
- * @see KGroupedStream#count(SessionWindows, String)
- * @see KGroupedStream#count(SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, String)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#windowedBy(SessionWindows)
* @see TimestampExtractor
*/
public final class SessionWindows {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 38362ad..f9090c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -45,12 +45,7 @@ import java.util.Map;
* @see SessionWindows
* @see UnlimitedWindows
* @see JoinWindows
- * @see KGroupedStream#count(Windows, String)
- * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, Windows, String)
- * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#windowedBy(Windows)
* @see TimestampExtractor
*/
public final class TimeWindows extends Windows<TimeWindow> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index e116a8b..a3b9338 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -34,12 +34,7 @@ import java.util.Map;
* @see TimeWindows
* @see SessionWindows
* @see JoinWindows
- * @see KGroupedStream#count(Windows, String)
- * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, Windows, String)
- * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#windowedBy(Windows)
* @see TimestampExtractor
*/
public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index 576706e..7728317 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -26,18 +26,8 @@ package org.apache.kafka.streams.kstream;
* Thus, a windowed {@link KTable} has type {@code <Windowed<K>,V>}.
*
* @param <K> type of the key
- * @see KGroupedStream#count(Windows, String)
- * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#count(SessionWindows, String)
- * @see KGroupedStream#count(SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, Windows, String)
- * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, String)
- * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
+ * @see KGroupedStream#windowedBy(Windows)
+ * @see KGroupedStream#windowedBy(SessionWindows)
*/
public class Windowed<K> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index e6bc112..4d07f9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -33,7 +33,7 @@ public class QueryableStoreTypes {
* A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore}
* @param <K> key type of the store
* @param <V> value type of the store
- * @return {@link KeyValueStoreType}
+ * @return {@link QueryableStoreTypes.KeyValueStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore() {
return new KeyValueStoreType<>();
@@ -43,7 +43,7 @@ public class QueryableStoreTypes {
* A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}
* @param <K> key type of the store
* @param <V> value type of the store
- * @return {@link WindowStoreType}
+ * @return {@link QueryableStoreTypes.WindowStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStore() {
return new WindowStoreType<>();
@@ -53,7 +53,7 @@ public class QueryableStoreTypes {
* A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}
* @param <K> key type of the store
* @param <V> value type of the store
- * @return {@link SessionStoreType}
+ * @return {@link QueryableStoreTypes.SessionStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> sessionStore() {
return new SessionStoreType<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c0beb9e..05ebd33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -190,7 +190,7 @@ public class Stores {
*
* @param name the name of the store
* @return the factory that can be used to specify other options or configurations for the store; never null
- * @deprected use {@link #persistentKeyValueStore(String)}, {@link #persistentWindowStore(String, long, int, long, boolean)}
+ * @deprecated use {@link #persistentKeyValueStore(String)}, {@link #persistentWindowStore(String, long, int, long, boolean)}
* {@link #persistentSessionStore(String, long)}, {@link #lruMap(String, int)}, or {@link #inMemoryKeyValueStore(String)}
*/
@Deprecated
[2/2] kafka git commit: MINOR: fix JavaDocs warnings
Posted by da...@apache.org.
MINOR: fix JavaDocs warnings
- add some missing annotations for deprecated methods
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Michael G. Noll <mi...@confluent.io>, Damian Guy <da...@gmail.com>
Closes #4005 from mjsax/minor-fix-javadoc-warnings
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3dcbbf70
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3dcbbf70
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3dcbbf70
Branch: refs/heads/trunk
Commit: 3dcbbf703017985c9e212ad69cc7afdddbf358eb
Parents: 716330a
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue Oct 3 07:35:42 2017 -0700
Committer: Damian Guy <da...@gmail.com>
Committed: Tue Oct 3 07:35:42 2017 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/KafkaStreams.java | 9 +-
.../kafka/streams/TopologyDescription.java | 4 +-
.../kafka/streams/kstream/Aggregator.java | 12 +-
.../kafka/streams/kstream/GlobalKTable.java | 2 +-
.../kafka/streams/kstream/Initializer.java | 12 +-
.../kafka/streams/kstream/JoinWindows.java | 2 +-
.../kafka/streams/kstream/KGroupedStream.java | 124 ++++++------
.../kafka/streams/kstream/KGroupedTable.java | 27 +--
.../apache/kafka/streams/kstream/KStream.java | 88 ++++-----
.../kafka/streams/kstream/KStreamBuilder.java | 15 +-
.../apache/kafka/streams/kstream/KTable.java | 189 +++++++++++--------
.../apache/kafka/streams/kstream/Printed.java | 1 -
.../apache/kafka/streams/kstream/Reducer.java | 12 +-
.../streams/kstream/SessionWindowedKStream.java | 14 +-
.../kafka/streams/kstream/SessionWindows.java | 7 +-
.../kafka/streams/kstream/TimeWindows.java | 7 +-
.../kafka/streams/kstream/UnlimitedWindows.java | 7 +-
.../apache/kafka/streams/kstream/Windowed.java | 14 +-
.../streams/state/QueryableStoreTypes.java | 6 +-
.../org/apache/kafka/streams/state/Stores.java | 2 +-
20 files changed, 288 insertions(+), 266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 928d0e9..fd9c729 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
@@ -948,10 +949,10 @@ public class KafkaStreams {
* <p>
* This will use the default Kafka Streams partitioner to locate the partition.
* If a {@link StreamPartitioner custom partitioner} has been
- * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig},
- * {@link KStream#through(StreamPartitioner, String)}, or {@link KTable#through(StreamPartitioner, String, String)},
- * or if the original {@link KTable}'s input {@link StreamsBuilder#table(String, String) topic} is partitioned
- * differently, please use {@link #metadataForKey(String, Object, StreamPartitioner)}.
+ * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig} or
+ * {@link KStream#through(String, Produced)}, or if the original {@link KTable}'s input
+ * {@link StreamsBuilder#table(String) topic} is partitioned differently, please use
+ * {@link #metadataForKey(String, Object, StreamPartitioner)}.
* <p>
* Note:
* <ul>
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 1b520c6..01af8bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -55,9 +55,9 @@ public interface TopologyDescription {
}
/**
- * Represents a {@link Topology#addGlobalStore(org.apache.kafka.streams.processor.StateStoreSupplier, String,
+ * Represents a {@link Topology#addGlobalStore(org.apache.kafka.streams.state.StoreBuilder, String,
* org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
- * String, org.apache.kafka.streams.processor.ProcessorSupplier)} global store}.
+ * String, org.apache.kafka.streams.processor.ProcessorSupplier) global store}.
* Adding a global store results in adding a source node and one stateful processor node.
* Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different
* global stores are not connected to each other.
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index 4eec4f5..217a145 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -29,12 +29,12 @@ package org.apache.kafka.streams.kstream;
* @param <V> input value type
* @param <VA> aggregate value type
* @see Initializer
- * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Materialized)
+ * @see TimeWindowedKStream#aggregate(Initializer, Aggregator)
+ * @see TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)
+ * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger)
+ * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized)
* @see Reducer
*/
public interface Aggregator<K, V, VA> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index 81aa405..72286c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -61,7 +61,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
* @param <K> Type of primary keys
* @param <V> Type of value changes
* @see KTable
- * @see StreamsBuilder#globalTable(String, String)
+ * @see StreamsBuilder#globalTable(String)
* @see KStream#join(GlobalKTable, KeyValueMapper, ValueJoiner)
* @see KStream#leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
index d41c638..1b59c64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
@@ -23,12 +23,12 @@ package org.apache.kafka.streams.kstream;
*
* @param <VA> aggregate value type
* @see Aggregator
- * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
- * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Materialized)
+ * @see TimeWindowedKStream#aggregate(Initializer, Aggregator)
+ * @see TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)
+ * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger)
+ * @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized)
*/
public interface Initializer<VA> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index ef9ed01..863ae95 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -59,7 +59,7 @@ import java.util.Map;
* @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows)
* @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
* @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
- * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
+ * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
* @see TimestampExtractor
*/
public final class JoinWindows extends Windows<Window> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 1c72ebf..17f2db4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -85,7 +85,7 @@ public interface KGroupedStream<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count()}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
- * @deprecated use {@link #count(Materialized)
+ * @deprecated use {@link #count(Materialized) count(Materialized.as(queryableStoreName))}
*/
@Deprecated
KTable<K, Long> count(final String queryableStoreName);
@@ -146,7 +146,7 @@ public interface KGroupedStream<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
- * @deprecated use {@link #count(Materialized)}
+ * @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -229,7 +229,8 @@ public interface KGroupedStream<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count(Windows)}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window.
- * @deprecated use {@link #windowedBy(Windows)}
+ * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+ * {@link TimeWindowedKStream#count(Materialized) count(Materialized.as(queryableStoreName))}
*/
@Deprecated
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
@@ -264,7 +265,7 @@ public interface KGroupedStream<K, V> {
* @param windows the specification of the aggregation {@link Windows}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
- * @deprecated use {@link #windowedBy(Windows)}
+ * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by {@link TimeWindowedKStream#count() count()}
*/
@Deprecated
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows);
@@ -306,7 +307,8 @@ public interface KGroupedStream<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
- * @deprecated use {@link #windowedBy(Windows)}
+ * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+ * {@link TimeWindowedKStream#count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
@@ -344,10 +346,11 @@ public interface KGroupedStream<K, V> {
*
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
- * alphanumerics, '.', '_' and '-. If {@code null} then this will be equivalent to {@link KGroupedStream#count(SessionWindows)} ()}.
+ * alphanumerics, '.', '_' and '-. If {@code null} then this will be equivalent to {@link KGroupedStream#count(SessionWindows)}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
- * @deprecated use {@link #windowedBy(SessionWindows)}
+ * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+ * {@link SessionWindowedKStream#count(Materialized) count(Materialized.as(queryableStoreName))}
*/
@Deprecated
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName);
@@ -371,7 +374,8 @@ public interface KGroupedStream<K, V> {
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
- * @deprecated use {@link #windowedBy(SessionWindows)}
+ * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+ * {@link SessionWindowedKStream#count() count()}
*/
@Deprecated
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows);
@@ -409,7 +413,8 @@ public interface KGroupedStream<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
- * @deprecated use {@link #windowedBy(SessionWindows)}
+ * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+ * {@link SessionWindowedKStream#count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
@@ -419,7 +424,7 @@ public interface KGroupedStream<K, V> {
* Combine the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value
- * (c.f. {@link #aggregate(Initializer, Aggregator, Serde, String)}).
+ * (c.f. {@link #aggregate(Initializer, Aggregator)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -464,10 +469,10 @@ public interface KGroupedStream<K, V> {
* <pre>{@code
* // At the example of a Reducer<Long>
* new Reducer<Long>() {
- * @Override
* public Long apply(Long aggValue, Long currValue) {
* return aggValue + currValue;
* }
+ * }
* }</pre>
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -506,7 +511,7 @@ public interface KGroupedStream<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer)} ()}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
- * @deprectated use {@link #reduce(Reducer, Materialized)}
+ * @deprecated use {@link #reduce(Reducer, Materialized) reduce(reducer, Materialized.as(queryableStoreName))}
*/
@Deprecated
KTable<K, V> reduce(final Reducer<V> reducer,
@@ -527,10 +532,10 @@ public interface KGroupedStream<K, V> {
* <pre>{@code
* // At the example of a Reducer<Long>
* new Reducer<Long>() {
- * @Override
* public Long apply(Long aggValue, Long currValue) {
* return aggValue + currValue;
* }
+ * }
* }</pre>
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -562,7 +567,7 @@ public interface KGroupedStream<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
- * @deprectated use {@link #reduce(Reducer, Materialized)}
+ * @deprecated use {@link #reduce(Reducer, Materialized) reduce(reducer, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
KTable<K, V> reduce(final Reducer<V> reducer,
@@ -582,10 +587,10 @@ public interface KGroupedStream<K, V> {
* <pre>{@code
* // At the example of a Reducer<Long>
* new Reducer<Long>() {
- * @Override
* public Long apply(Long aggValue, Long currValue) {
* return aggValue + currValue;
* }
+ * }
* }</pre>
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -638,10 +643,10 @@ public interface KGroupedStream<K, V> {
* <pre>{@code
* // At the example of a Reducer<Long>
* new Reducer<Long>() {
- * @Override
* public Long apply(Long aggValue, Long currValue) {
* return aggValue + currValue;
* }
+ * }
* }</pre>
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -680,10 +685,11 @@ public interface KGroupedStream<K, V> {
* @param reducer a {@link Reducer} that computes a new aggregate result
* @param windows the specification of the aggregation {@link Windows}
* @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, Windows)} ()}.
+ * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, Windows)}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
- * @deprecated use {@link #windowedBy(Windows)}
+ * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+ * {@link TimeWindowedKStream#reduce(Reducer, Materialized) reduce(reducer, Materialized.as(queryableStoreName))}
*/
@Deprecated
<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -728,7 +734,8 @@ public interface KGroupedStream<K, V> {
* @param windows the specification of the aggregation {@link Windows}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
- * @deprecated use {@link #windowedBy(Windows)}
+ * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+ * {@link TimeWindowedKStream#reduce(Reducer) reduce(reducer)}
*/
@Deprecated
<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -752,10 +759,10 @@ public interface KGroupedStream<K, V> {
* <pre>{@code
* // At the example of a Reducer<Long>
* new Reducer<Long>() {
- * @Override
* public Long apply(Long aggValue, Long currValue) {
* return aggValue + currValue;
* }
+ * }
* }</pre>
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -790,7 +797,8 @@ public interface KGroupedStream<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
- * @deprecated use {@link #windowedBy(Windows)}
+ * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+ * {@link TimeWindowedKStream#reduce(Reducer, Materialized) reduce(reducer, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -813,10 +821,10 @@ public interface KGroupedStream<K, V> {
* <pre>{@code
* // At the example of a Reducer<Long>
* new Reducer<Long>() {
- * @Override
* public Long apply(Long aggValue, Long currValue) {
* return aggValue + currValue;
* }
+ * }
* }</pre>
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -855,10 +863,11 @@ public interface KGroupedStream<K, V> {
* @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)} ()}.
+ * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
- * @deprecated use {@link #windowedBy(SessionWindows)}
+ * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+ * {@link SessionWindowedKStream#reduce(Reducer, Materialized) reduce(reducer, Materialized.as(queryableStoreName))}
*/
@Deprecated
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -893,7 +902,8 @@ public interface KGroupedStream<K, V> {
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
- * @deprecated use {@link #windowedBy(SessionWindows)}
+ * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+ * {@link SessionWindowedKStream#reduce(Reducer) reduce(reducer)}
*/
@Deprecated
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -915,10 +925,10 @@ public interface KGroupedStream<K, V> {
* <pre>{@code
* // At the example of a Reducer<Long>
* new Reducer<Long>() {
- * @Override
* public Long apply(Long aggValue, Long currValue) {
* return aggValue + currValue;
* }
+ * }
* }</pre>
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
@@ -959,7 +969,8 @@ public interface KGroupedStream<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
- * @deprecated use {@link #windowedBy(SessionWindows)}
+ * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+ * {@link SessionWindowedKStream#reduce(Reducer, Materialized) reduce(reducer, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -1016,11 +1027,11 @@ public interface KGroupedStream<K, V> {
* @param aggValueSerde aggregate value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
* @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Serde)} ()} ()}.
+ * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Serde)}.
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
- * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
+ * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as(queryableStoreName).withValueSerde(aggValueSerde))}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -1031,7 +1042,7 @@ public interface KGroupedStream<K, V> {
/**
* Aggregate the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
- * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+ * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
@@ -1043,7 +1054,7 @@ public interface KGroupedStream<K, V> {
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
- * count (c.f. {@link #count(String)}).
+ * count (c.f. {@link #count()}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
@@ -1087,7 +1098,7 @@ public interface KGroupedStream<K, V> {
/**
* Aggregate the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
- * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+ * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
@@ -1099,7 +1110,7 @@ public interface KGroupedStream<K, V> {
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
- * count (c.f. {@link #count(String)}).
+ * count (c.f. {@link #count()}).
* <p>
* The default value serde from config will be used for serializing the result.
* If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
@@ -1127,10 +1138,11 @@ public interface KGroupedStream<K, V> {
*/
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator);
+
/**
* Aggregate the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
- * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+ * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
@@ -1142,7 +1154,7 @@ public interface KGroupedStream<K, V> {
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
- * count (c.f. {@link #count(String)}).
+ * count (c.f. {@link #count()}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
@@ -1166,14 +1178,13 @@ public interface KGroupedStream<K, V> {
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
- * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
+ * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde);
-
/**
* Aggregate the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
@@ -1189,7 +1200,7 @@ public interface KGroupedStream<K, V> {
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to compute aggregate functions
- * like count (c.f. {@link #count(String)}).
+ * like count (c.f. {@link #count()}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
@@ -1217,7 +1228,7 @@ public interface KGroupedStream<K, V> {
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
- * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
+ * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -1243,7 +1254,7 @@ public interface KGroupedStream<K, V> {
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
- * functions like count (c.f. {@link #count(String)}).
+ * functions like count (c.f. {@link #count(Windows)}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -1282,10 +1293,11 @@ public interface KGroupedStream<K, V> {
* if not specified the default serdes defined in the configs will be used
* @param <VR> the value type of the resulting {@link KTable}
* @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Windows, Serde)} ()} ()}.
+ * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Windows, Serde)}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
- * @deprecated use {@link #windowedBy(Windows)}
+ * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+ * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as(queryableStoreName).withValueSerde(aggValueSerde))}
*/
@Deprecated
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
@@ -1313,7 +1325,7 @@ public interface KGroupedStream<K, V> {
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
- * functions like count (c.f. {@link #count(String)}).
+ * functions like count (c.f. {@link #count(Windows)}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -1338,7 +1350,8 @@ public interface KGroupedStream<K, V> {
* @param <VR> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
- * @deprecated use {@link #windowedBy(Windows)}
+ * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+ * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)} aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
*/
@Deprecated
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
@@ -1346,7 +1359,6 @@ public interface KGroupedStream<K, V> {
final Windows<W> windows,
final Serde<VR> aggValueSerde);
-
/**
* Aggregate the values of records in this stream by the grouped key and defined windows.
* Records with {@code null} key or value are ignored.
@@ -1366,7 +1378,7 @@ public interface KGroupedStream<K, V> {
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Windows, StateStoreSupplier)} can be used to compute aggregate
- * functions like count (c.f. {@link #count(String)}).
+ * functions like count (c.f. {@link #count(Windows)}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -1397,7 +1409,8 @@ public interface KGroupedStream<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
- * @deprecated use {@link #windowedBy(Windows)}
+ * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
+ * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
@@ -1422,7 +1435,7 @@ public interface KGroupedStream<K, V> {
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)} can be used to compute
- * aggregate functions like count (c.f. {@link #count(String)})
+ * aggregate functions like count (c.f. {@link #count(SessionWindows)})
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -1452,10 +1465,11 @@ public interface KGroupedStream<K, V> {
* if not specified the default serdes defined in the configs will be used
* @param <T> the value type of the resulting {@link KTable}
* @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde)} ()} ()}.
+ * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde)}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
- * @deprecated use {@link #windowedBy(SessionWindows)}
+ * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+ * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as(queryableStoreName).withValueSerde(aggValueSerde))}
*/
@Deprecated
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
@@ -1482,7 +1496,7 @@ public interface KGroupedStream<K, V> {
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)} can be used to compute
- * aggregate functions like count (c.f. {@link #count(String)})
+ * aggregate functions like count (c.f. {@link #count(SessionWindows)})
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -1500,7 +1514,8 @@ public interface KGroupedStream<K, V> {
* @param <T> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
- * @deprecated use {@link #windowedBy(SessionWindows)}
+ * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+ * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
*/
@Deprecated
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
@@ -1526,7 +1541,7 @@ public interface KGroupedStream<K, V> {
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
* Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, StateStoreSupplier)} can be used
- * to compute aggregate functions like count (c.f. {@link #count(String)}).
+ * to compute aggregate functions like count (c.f. {@link #count(SessionWindows)}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -1559,7 +1574,8 @@ public interface KGroupedStream<K, V> {
* @param <T> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
- * @deprecated use {@link #windowedBy(SessionWindows)}
+ * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
+ * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as(KeyValueByteStoreSupplier).withValueSerde(aggValueSerde))}
*/
@Deprecated
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index f854320..f814eaf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -81,7 +81,7 @@ public interface KGroupedTable<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#count()}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
- * @deprecated use {@link #count(Materialized)}
+ * @deprecated use {@link #count(Materialized) count(Materialized.as(queryableStoreName))}
*/
@Deprecated
KTable<K, Long> count(final String queryableStoreName);
@@ -192,7 +192,7 @@ public interface KGroupedTable<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
- * @deprecated use {@link #count(Materialized)}
+ * @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier)}
*/
@Deprecated
KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -264,7 +264,7 @@ public interface KGroupedTable<K, V> {
* '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#reduce(Reducer, Reducer)} ()}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
- * @deprecated use {@link #reduce(Reducer, Reducer, Materialized)}
+ * @deprecated use {@link #reduce(Reducer, Reducer, Materialized) reduce(adder, subtractor, Materialized.as(queryableStoreName))}
*/
@Deprecated
KTable<K, V> reduce(final Reducer<V> adder,
@@ -346,7 +346,7 @@ public interface KGroupedTable<K, V> {
* mapped} to the same key into a new instance of {@link KTable}.
* Records with {@code null} key are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value
- * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Serde, String)}).
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -462,7 +462,7 @@ public interface KGroupedTable<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
- * @deprecated use {@link #reduce(Reducer, Reducer, Materialized)}
+ * @deprecated use {@link #reduce(Reducer, Reducer, Materialized) reduce(adder, subtractor, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
KTable<K, V> reduce(final Reducer<V> adder,
@@ -547,7 +547,7 @@ public interface KGroupedTable<K, V> {
* @param <VR> the value type of the aggregated {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
- * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}
+ * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as(queryableStoreName))}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -559,7 +559,7 @@ public interface KGroupedTable<K, V> {
* Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
* mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
* Records with {@code null} key are ignored.
- * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+ * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, Materialized) combining via reduce(...)} as it,
* for example, allows the result to have a different type than the input values.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given {@code storeSupplier}.
@@ -639,11 +639,10 @@ public interface KGroupedTable<K, V> {
* Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
* mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
* Records with {@code null} key are ignored.
- * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+ * Aggregating is a generalization of {@link #reduce(Reducer, Reducer) combining via reduce(...)} as it,
* for example, allows the result to have a different type than the input values.
* If the result value type does not match the {@link StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value
- * serde} you should use {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde, String)
- * aggregate(Initializer, Aggregator, Aggregator, Serde, String)}.
+ * serde} you should use {@link #aggregate(Initializer, Aggregator, Aggregator, Serde)}.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given {@code storeSupplier}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -784,7 +783,7 @@ public interface KGroupedTable<K, V> {
* @param <VR> the value type of the aggregated {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
- * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}
+ * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as(queryableStoreName).withValueSerde(aggValueSerde))}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -797,7 +796,7 @@ public interface KGroupedTable<K, V> {
* Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
* mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
* Records with {@code null} key are ignored.
- * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+ * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, Materialized) combining via reduce(...)} as it,
* for example, allows the result to have a different type than the input values.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* that can be queried using the provided {@code queryableStoreName}.
@@ -857,7 +856,9 @@ public interface KGroupedTable<K, V> {
* @param <VR> the value type of the aggregated {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
+ * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
*/
+ @Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
@@ -938,7 +939,9 @@ public interface KGroupedTable<K, V> {
* @param <VR> the value type of the aggregated {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
+ * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as(KeyValueByteStoreSupplier))}
*/
+ @Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index c56a4ed..0d1d201 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@@ -290,7 +291,7 @@ public interface KStream<K, V> {
* {@link Integer} etc. to get meaningful information.
*
* @param label the name used to label the key/value pairs printed to the console
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toSysOut()}
*/
@Deprecated
void print(final String label);
@@ -308,7 +309,7 @@ public interface KStream<K, V> {
*
* @param keySerde key serde used to deserialize key if type is {@code byte[]},
* @param valSerde value serde used to deserialize value if type is {@code byte[]},
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withKeyValueMapper(...)}
*/
@Deprecated
void print(final Serde<K> keySerde,
@@ -326,7 +327,7 @@ public interface KStream<K, V> {
* @param keySerde key serde used to deserialize key if type is {@code byte[]},
* @param valSerde value serde used to deserialize value if type is {@code byte[]},
* @param label the name used to label the key/value pairs printed to the console
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(...)}
*/
@Deprecated
void print(final Serde<K> keySerde,
@@ -342,7 +343,6 @@ public interface KStream<K, V> {
* The example below shows the way to customize output data.
* <pre>{@code
* final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
- * @Override
* public String apply(Integer key, String value) {
* return String.format("(%d, %s)", key, value);
* }
@@ -352,7 +352,7 @@ public interface KStream<K, V> {
* The KeyValueMapper's mapped value type must be {@code String}.
*
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withKeyValueMapper(mapper)}
*/
@Deprecated
void print(final KeyValueMapper<? super K, ? super V, String> mapper);
@@ -366,7 +366,6 @@ public interface KStream<K, V> {
* The example below shows the way to customize output data.
* <pre>{@code
* final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
- * @Override
* public String apply(Integer key, String value) {
* return String.format("(%d, %s)", key, value);
* }
@@ -377,7 +376,7 @@ public interface KStream<K, V> {
*
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
* @param label The given name which labels output will be printed.
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(mapper)}
*/
@Deprecated
void print(final KeyValueMapper<? super K, ? super V, String> mapper, final String label);
@@ -391,7 +390,6 @@ public interface KStream<K, V> {
* The example below shows the way to customize output data.
* <pre>{@code
* final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
- * @Override
* public String apply(Integer key, String value) {
* return String.format("(%d, %s)", key, value);
* }
@@ -404,9 +402,9 @@ public interface KStream<K, V> {
* {@link Integer} etc. to get meaningful information.
*
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
- * @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code byte[]}.
- * @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code byte[]}.
- * @deprecated use {@code print(Printed)}
+ * @param keySerde a {@link Serde} used to deserialize key if type is {@code byte[]}.
+ * @param valSerde a {@link Serde} used to deserialize value if type is {@code byte[]}.
+ * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withKeyValueMapper(mapper)}
*/
@Deprecated
void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde);
@@ -420,7 +418,6 @@ public interface KStream<K, V> {
* The example below shows the way to customize output data.
* <pre>{@code
* final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
- * @Override
* public String apply(Integer key, String value) {
* return String.format("(%d, %s)", key, value);
* }
@@ -433,10 +430,10 @@ public interface KStream<K, V> {
* {@link Integer} etc. to get meaningful information.
*
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
- * @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code byte[]}.
- * @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code byte[]}.
+ * @param keySerde a {@link Serde} used to deserialize key if type is {@code byte[]}.
+ * @param valSerde a {@link Serde} used to deserialize value if type is {@code byte[]}.
* @param label The given name which labels output will be printed.
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toSysOut().withLabel(label).withKeyValueMapper(mapper)}
*/
@Deprecated
void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label);
@@ -455,7 +452,7 @@ public interface KStream<K, V> {
* Relative order is preserved within each input stream though (ie, records within one input
* stream are processed in order).
*
- * @param a stream which is to be merged into this stream
+ * @param stream a stream which is to be merged into this stream
* @return a merged stream containing all records from this and the provided {@code KStream}
*/
KStream<K, V> merge(final KStream<K, V> stream);
@@ -472,7 +469,7 @@ public interface KStream<K, V> {
* {@link Integer} etc. to get meaningful information.
*
* @param filePath name of the file to write to
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath)}
*/
@Deprecated
void writeAsText(final String filePath);
@@ -489,7 +486,7 @@ public interface KStream<K, V> {
*
* @param filePath name of the file to write to
* @param label the name used to label the key/value pairs written to the file
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withLabel(label)}
*/
@Deprecated
void writeAsText(final String filePath,
@@ -509,7 +506,7 @@ public interface KStream<K, V> {
* @param filePath name of the file to write to
* @param keySerde key serde used to deserialize key if type is {@code byte[]},
* @param valSerde value serde used to deserialize value if type is {@code byte[]},
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(...)}
*/
@Deprecated
void writeAsText(final String filePath,
@@ -530,7 +527,7 @@ public interface KStream<K, V> {
* @param label the name used to label the key/value pairs written to the file
* @param keySerde key serde used to deserialize key if type is {@code byte[]},
* @param valSerde value serde used deserialize value if type is {@code byte[]},
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(...)}
*/
@Deprecated
void writeAsText(final String filePath,
@@ -549,7 +546,6 @@ public interface KStream<K, V> {
* The example below shows the way to customize output data.
* <pre>{@code
* final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
- * @Override
* public String apply(Integer key, String value) {
* return String.format("(%d, %s)", key, value);
* }
@@ -560,7 +556,7 @@ public interface KStream<K, V> {
*
* @param filePath path of the file to write to.
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(mapper)}
*/
@Deprecated
void writeAsText(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper);
@@ -576,7 +572,6 @@ public interface KStream<K, V> {
* The example below shows the way to customize output data.
* <pre>{@code
* final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
- * @Override
* public String apply(Integer key, String value) {
* return String.format("(%d, %s)", key, value);
* }
@@ -588,7 +583,7 @@ public interface KStream<K, V> {
* @param filePath path of the file to write to.
* @param label the name used to label records written to file.
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(mapper)}
*/
@Deprecated
void writeAsText(final String filePath, final String label, final KeyValueMapper<? super K, ? super V, String> mapper);
@@ -604,7 +599,6 @@ public interface KStream<K, V> {
* The example below shows the way to customize output data.
* <pre>{@code
* final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
- * @Override
* public String apply(Integer key, String value) {
* return String.format("(%d, %s)", key, value);
* }
@@ -620,7 +614,7 @@ public interface KStream<K, V> {
* @param keySerde key serde used to deserialize key if type is {@code byte[]}.
* @param valSerde value serde used to deserialize value if type is {@code byte[]}.
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withKeyValueMapper(mapper)}
*/
@Deprecated
void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper);
@@ -636,7 +630,6 @@ public interface KStream<K, V> {
* The example below shows the way to customize output data.
* <pre>{@code
* final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer, String, String>() {
- * @Override
* public String apply(Integer key, String value) {
* return String.format("(%d, %s)", key, value);
* }
@@ -653,8 +646,9 @@ public interface KStream<K, V> {
* @param keySerde key serde used to deserialize key if type is {@code byte[]}.
* @param valSerde value serde used to deserialize value if type is {@code byte[]}.
* @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
- * @deprecated use {@code print(Printed)}
+ * @deprecated use {@link #print(Printed) print(Printed.toFile(filePath).withLabel(label).withKeyValueMapper(mapper)}
*/
+ @Deprecated
void writeAsText(final String filePath, final String label, final Serde<K> keySerde, final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper);
/**
@@ -704,8 +698,7 @@ public interface KStream<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(String) #to(someTopicName)} and
- * {@link StreamsBuilder#stream(String)
- * StreamsBuilder#stream(someTopicName)}.
+ * {@link StreamsBuilder#stream(String) StreamsBuilder#stream(someTopicName)}.
*
* @param topic the topic name
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
@@ -725,7 +718,7 @@ public interface KStream<K, V> {
* if not specified producer's {@link DefaultPartitioner} will be used
* @param topic the topic name
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
- * @deprecated use {@code through(String, Produced)}
+ * @deprecated use {@link #through(String, Produced) through(topic, Produced.withStreamPartitioner(partitioner))}
*/
@Deprecated
KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -740,7 +733,7 @@ public interface KStream<K, V> {
* used—otherwise producer's {@link DefaultPartitioner} is used.
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valSerde, someTopicName)} and
- * {@link StreamsBuilder#stream(Serde, Serde, String...) StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
+ * {@link KStreamBuilder#stream(Serde, Serde, String...) KStreamBuilder#stream(keySerde, valSerde, someTopicName)}.
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -748,7 +741,7 @@ public interface KStream<K, V> {
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
- * @deprecated use {@code through(String, Produced)}
+ * @deprecated use {@link #through(String, Produced) through(topic, Produced.with(keySerde, valSerde))}
*/
@Deprecated
KStream<K, V> through(final Serde<K> keySerde,
@@ -762,8 +755,8 @@ public interface KStream<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) #to(keySerde, valSerde,
- * StreamPartitioner, someTopicName)} and {@link StreamsBuilder#stream(Serde, Serde, String...)
- * StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
+ * StreamPartitioner, someTopicName)} and {@link KStreamBuilder#stream(Serde, Serde, String...)
+ * KStreamBuilder#stream(keySerde, valSerde, someTopicName)}.
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -775,7 +768,7 @@ public interface KStream<K, V> {
* be used
* @param topic the topic name
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
- * @deprecated use {@code through(String, Produced)}
+ * @deprecated use {@link #through(String, Produced) through(topic, Produced.with(keySerde, valSerde, partitioner))}
*/
@Deprecated
KStream<K, V> through(final Serde<K> keySerde,
@@ -791,8 +784,7 @@ public interface KStream<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(String, Produced) to(someTopic, Produced.with(keySerde, valueSerde)}
- * and {@link StreamsBuilder#stream(Serde, Serde, String...)
- * StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
+ * and {@link StreamsBuilder#stream(String, Consumed) StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde))}.
*
* @param topic
* @param produced
@@ -820,7 +812,7 @@ public interface KStream<K, V> {
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
* @param topic the topic name
- * @deprecated use {@code to(String, Produced}
+ * @deprecated use {@link #to(String, Produced) to(topic, Produced.withStreamPartitioner(partitioner))}
*/
@Deprecated
void to(final StreamPartitioner<? super K, ? super V> partitioner,
@@ -838,7 +830,7 @@ public interface KStream<K, V> {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default serde defined in the configs will be used
* @param topic the topic name
- * @deprecated use {@code to(String, Produced}
+ * @deprecated use {@link #to(String, Produced) to(topic, Produced.with(keySerde, valSerde))}
*/
@Deprecated
void to(final Serde<K> keySerde,
@@ -860,7 +852,7 @@ public interface KStream<K, V> {
* {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will
* be used
* @param topic the topic name
- * @deprecated use {@code to(String, Produced}
+ * @deprecated use {@link #to(String, Produced) to(topic, Produced.with(keySerde, valSerde, partitioner)}
*/
@Deprecated
void to(final Serde<K> keySerde,
@@ -1159,7 +1151,7 @@ public interface KStream<K, V> {
* @param valSerde value serdes for materializing this stream,
* if not specified the default serdes defined in the configs will be used
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
- * @deprecated use {@code groupByKey(Serialized)}
+ * @deprecated use {@link #groupByKey(Serialized) groupByKey(Serialized.with(keySerde, valSerde))}
*/
@Deprecated
KGroupedStream<K, V> groupByKey(final Serde<K> keySerde,
@@ -1244,7 +1236,7 @@ public interface KStream<K, V> {
* @param <KR> the key type of the result {@link KGroupedStream}
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
* @see #groupByKey()
- * @deprecated use {@code groupBy(KeyValueMapper, Serialized}
+ * @deprecated use {@link #groupBy(KeyValueMapper, Serialized) groupBy(selector, Serialized.with(keySerde, valSerde))}
*/
@Deprecated
<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
@@ -1480,7 +1472,7 @@ public interface KStream<K, V> {
* {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals
* @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
* @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
- * @deprecated use {@link #join(KStream, ValueJoiner, JoinWindows, Joined)}
+ * @deprecated use {@link #join(KStream, ValueJoiner, JoinWindows, Joined) join(otherStream, joiner, windows, Joined.with(keySerde, thisValueSerde, otherValueSerde))}
*/
@Deprecated
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
@@ -1734,7 +1726,7 @@ public interface KStream<K, V> {
* this {@code KStream} and within the joining window intervals
* @see #join(KStream, ValueJoiner, JoinWindows, Joined)
* @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
- * @deprecated use {@link #leftJoin(KStream, ValueJoiner, JoinWindows, Joined}
+ * @deprecated use {@link #leftJoin(KStream, ValueJoiner, JoinWindows, Joined) leftJoin(otherStream, joiner, windows, Joined.with(keySerde, thisValSerde, otherValueSerde))}
*/
@Deprecated
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
@@ -1988,7 +1980,7 @@ public interface KStream<K, V> {
* both {@code KStream}s and within the joining window intervals
* @see #join(KStream, ValueJoiner, JoinWindows, Joined)
* @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
- * @deprecated use {@link #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)}
+ * @deprecated use {@link #outerJoin(KStream, ValueJoiner, JoinWindows, Joined) outerJoin(otherStream, joiner, windows, Joined.with(keySerde, thisValueSerde, otherValueSerde))}
*/
@Deprecated
<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
@@ -2222,7 +2214,7 @@ public interface KStream<K, V> {
* {@link ValueJoiner}, one for each matched record-pair with the same key
* @see #leftJoin(KTable, ValueJoiner, Joined)
* @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
- * @deprecated use {@link #join(KTable, ValueJoiner, Joined)}
+ * @deprecated use {@link #join(KTable, ValueJoiner, Joined) join(table, joiner, Joined.with(keySerde, valSerde, null))}
*/
@Deprecated
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
@@ -2461,7 +2453,9 @@ public interface KStream<K, V> {
* {@link ValueJoiner}, one output for each input {@code KStream} record
* @see #join(KTable, ValueJoiner, Serde, Serde)
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
+ * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Joined) leftJoin(table, joiner, Joined.with(keySerde, valSerde, null))}
*/
+ @Deprecated
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
final Serde<K> keySerde,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3dcbbf70/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index ab666ba..d4642da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -479,10 +479,11 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, String)} ()}.
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+ * offsets are available
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
+ * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, String) table(AutoOffsetReset, String)}.
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -791,7 +792,8 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
+ * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
+ * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -876,7 +878,8 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
+ * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
+ * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,