You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/05/08 06:14:49 UTC
[kafka] branch trunk updated: KAFKA-9290: Update IQ related
JavaDocs (#8114)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 133c2ed KAFKA-9290: Update IQ related JavaDocs (#8114)
133c2ed is described below
commit 133c2ed58ad3c70df75882728213a86013e17e95
Author: high.lee <ye...@daum.net>
AuthorDate: Fri May 8 15:14:17 2020 +0900
KAFKA-9290: Update IQ related JavaDocs (#8114)
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../org/apache/kafka/streams/StreamsBuilder.java | 24 +++++-----
.../kafka/streams/kstream/CogroupedKStream.java | 34 +++++++-------
.../apache/kafka/streams/kstream/GlobalKTable.java | 2 +-
.../kafka/streams/kstream/KGroupedStream.java | 50 ++++++++++-----------
.../kafka/streams/kstream/KGroupedTable.java | 48 ++++++++++----------
.../org/apache/kafka/streams/kstream/KTable.java | 26 +++++------
.../kstream/TimeWindowedCogroupedKStream.java | 16 +++----
.../kafka/streams/kstream/TimeWindowedKStream.java | 52 +++++++++++-----------
8 files changed, 126 insertions(+), 126 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 997fe64..f3b08da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -208,13 +208,13 @@ public class StreamsBuilder {
* streamBuilder.table(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
* }
* </pre>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * K key = "some-key";
+ * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -393,13 +393,13 @@ public class StreamsBuilder {
* streamBuilder.globalTable(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
* }
* </pre>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key);
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * K key = "some-key";
+ * ValueAndTimestamp<V> valueForKey = localStore.get(key);
* }</pre>
* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
* regardless of the specified value in {@link StreamsConfig} or {@link Consumed}.
@@ -435,13 +435,13 @@ public class StreamsBuilder {
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key);
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * K key = "some-key";
+ * ValueAndTimestamp<V> valueForKey = localStore.get(key);
* }</pre>
* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
* regardless of the specified value in {@link StreamsConfig}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
index 53dd318..f9b6fb4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -79,14 +79,14 @@ public interface CogroupedKStream<K, VOut> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
- * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VOut>> timestampedKeyValueStore());
+ * K key = "some-key";
+ * ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
* the value of the key on a parallel running instance of your Kafka Streams application.
@@ -128,14 +128,14 @@ public interface CogroupedKStream<K, VOut> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
- * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VOut>> timestampedKeyValueStore());
+ * K key = "some-key";
+ * ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
* the value of the key on a parallel running instance of your Kafka Streams application.
@@ -178,14 +178,14 @@ public interface CogroupedKStream<K, VOut> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>
* KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
- * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VOut>> timestampedKeyValueStore());
+ * K key = "some-key";
+ * ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
* the value of the key on a parallel running instance of your Kafka Streams application.
@@ -230,14 +230,14 @@ public interface CogroupedKStream<K, VOut> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(StoreQueryParameters)} KafkaStreams#store(...)}:
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>
* KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
- * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VOut>> timestampedKeyValueStore());
+ * K key = "some-key";
+ * ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
* the value of the key on a parallel running instance of your Kafka Streams application.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index ba64abe..73efbc3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -49,7 +49,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
* final KafkaStreams streams = ...;
* streams.start()
* ...
- * ReadOnlyKeyValueStore view = streams.store("g1-store", QueryableStoreTypes.keyValueStore());
+ * ReadOnlyKeyValueStore view = streams.store("g1-store", QueryableStoreTypes.timestampedKeyValueStore());
* view.get(key); // can be done on any key, as all keys are present
*}</pre>
* Note that in contrast to {@link KTable} a {@code GlobalKTable}'s state holds a full copy of the underlying topic,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index bfcbacb..1fcfac9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -111,14 +111,14 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}.
* <pre>{@code
* KafkaStreams streams = ... // counting words
* String queryableStoreName = "storeName"; // the store name should be the name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-word";
- * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>>timestampedKeyValueStore());
+ * K key = "some-word";
+ * ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -156,14 +156,14 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}.
* <pre>{@code
* KafkaStreams streams = ... // counting words
* String queryableStoreName = "storeName"; // the store name should be the name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-word";
- * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>>timestampedKeyValueStore());
+ * K key = "some-word";
+ * ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -258,14 +258,14 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}.
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * K key = "some-key";
+ * ValueAndTimestamp<V> reduceForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -321,14 +321,14 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}.
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * K key = "some-key";
+ * ValueAndTimestamp<V> reduceForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -424,14 +424,14 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>>timestampedKeyValueStore());
+ * K key = "some-key";
+ * ValueAndTimestamp<VR> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -482,14 +482,14 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(StoreQueryParameters)} KafkaStreams#store(...)}:
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, ValueAndTimestamp<VR>>timestampedKeyValueStore());
+ * K key = "some-key";
+ * ValueAndTimestamp<VR> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 628f458..23cd676 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -54,13 +54,13 @@ public interface KGroupedTable<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-word";
- * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>> timestampedKeyValueStore());
+ * K key = "some-word";
+ * ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -97,13 +97,13 @@ public interface KGroupedTable<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-word";
- * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>> timestampedKeyValueStore());
+ * K key = "some-word";
+ * ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -225,13 +225,13 @@ public interface KGroupedTable<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-word";
- * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>> timestampedKeyValueStore());
+ * K key = "some-word";
+ * ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -298,13 +298,13 @@ public interface KGroupedTable<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-word";
- * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>> timestampedKeyValueStore());
+ * K key = "some-word";
+ * ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -436,13 +436,13 @@ public interface KGroupedTable<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-word";
- * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>> timestampedKeyValueStore());
+ * K key = "some-word";
+ * ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -520,13 +520,13 @@ public interface KGroupedTable<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-word";
- * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>> timestampedKeyValueStore());
+ * K key = "some-word";
+ * ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 1733c20..b7f073b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -53,7 +53,7 @@ import java.util.function.Function;
* streams.start()
* ...
* final String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable
- * ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+ * ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* view.get(key);
*}</pre>
*<p>
@@ -130,13 +130,13 @@ public interface KTable<K, V> {
* Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record
* is forwarded.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
- * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
* K key = "some-word";
- * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -169,13 +169,13 @@ public interface KTable<K, V> {
* Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record
* is forwarded.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
- * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
* K key = "some-word";
- * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -255,13 +255,13 @@ public interface KTable<K, V> {
* Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
* forwarded.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
- * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
* K key = "some-word";
- * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -293,13 +293,13 @@ public interface KTable<K, V> {
* Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
* forwarded.
* <p>
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
- * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
* K key = "some-word";
- * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
index fd292c5..d99f3fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
@@ -155,17 +155,17 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link WindowStore} it must be obtained via
+ * To query the local {@link ReadOnlyWindowStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedWindowStore());
*
- * String key = "some-word";
+ * K key = "some-word";
* long fromTime = ...;
* long toTime = ...;
- * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * WindowStoreIterator<ValueAndTimestamp<V>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -211,17 +211,17 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link WindowStore} it must be obtained via
+ * To query the local {@link ReadOnlyWindowStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedWindowStore());
*
- * String key = "some-word";
+ * K key = "some-word";
* long fromTime = ...;
* long toTime = ...;
- * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * WindowStoreIterator<ValueAndTimestamp<V>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 1ee4f1f..71b7bca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -125,17 +125,17 @@ public interface TimeWindowedKStream<K, V> {
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link WindowStore} it must be obtained via
- * {@link KafkaStreams#store(StoreQueryParameters)} KafkaStreams#store(...)}:
+ * To query the local {@link ReadOnlyWindowStore} it must be obtained via
+ * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>>timestampedWindowStore());
*
- * String key = "some-word";
+ * K key = "some-word";
* long fromTime = ...;
* long toTime = ...;
- * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * WindowStoreIterator<ValueAndTimestamp<Long>> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -173,17 +173,17 @@ public interface TimeWindowedKStream<K, V> {
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
* <p>
- * To query the local {@link WindowStore} it must be obtained via
+ * To query the local {@link ReadOnlyWindowStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<Long>>timestampedWindowStore());
*
- * String key = "some-word";
+ * K key = "some-word";
* long fromTime = ...;
* long toTime = ...;
- * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * WindowStoreIterator<ValueAndTimestamp<Long>> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -321,17 +321,17 @@ public interface TimeWindowedKStream<K, V> {
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link WindowStore} it must be obtained via
+ * To query the local {@link ReadOnlyWindowStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>>timestampedWindowStore());
*
- * String key = "some-word";
+ * K key = "some-word";
* long fromTime = ...;
* long toTime = ...;
- * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * WindowStoreIterator<ValueAndTimestamp<VR>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -381,17 +381,17 @@ public interface TimeWindowedKStream<K, V> {
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
* <p>
- * To query the local {@link WindowStore} it must be obtained via
+ * To query the local {@link ReadOnlyWindowStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<VR>>timestampedWindowStore());
*
- * String key = "some-word";
+ * K key = "some-word";
* long fromTime = ...;
* long toTime = ...;
- * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * WindowStoreIterator<ValueAndTimestamp<VR>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -538,17 +538,17 @@ public interface TimeWindowedKStream<K, V> {
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link WindowStore} it must be obtained via
+ * To query the local {@link ReadOnlyWindowStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedWindowStore());
*
- * String key = "some-word";
+ * K key = "some-word";
* long fromTime = ...;
* long toTime = ...;
- * WindowStoreIterator<Long> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * WindowStoreIterator<ValueAndTimestamp<V>> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -600,17 +600,17 @@ public interface TimeWindowedKStream<K, V> {
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link WindowStore} it must be obtained via
- * {@link KafkaStreams#store(StoreQueryParameters)} KafkaStreams#store(...)}:
+ * To query the local {@link ReadOnlyWindowStore} it must be obtained via
+ * {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedWindowStore());
*
- * String key = "some-word";
+ * K key = "some-word";
* long fromTime = ...;
* long toTime = ...;
- * WindowStoreIterator<Long> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * WindowStoreIterator<ValueAndTimestamp<V>> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.