You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/04 21:42:11 UTC
[2/2] kafka git commit: MINOR: add suppress warnings annotations in
Streams API
MINOR: add suppress warnings annotations in Streams API
- fixes examples with regard to new API
- fixes `Topology#addGlobalStore` parameters
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>
Closes #4003 from mjsax/minor-deprecated
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/713a67fd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/713a67fd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/713a67fd
Branch: refs/heads/trunk
Commit: 713a67fddaec3fa9cd7cce53dd6fef5ab6e0cdab
Parents: 51c652c
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Oct 4 14:42:07 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 4 14:42:07 2017 -0700
----------------------------------------------------------------------
.../examples/pageview/PageViewTypedDemo.java | 77 +++++++++----------
.../examples/pageview/PageViewUntypedDemo.java | 66 ++++++++--------
.../examples/temperature/TemperatureDemo.java | 54 ++++++-------
.../examples/wordcount/WordCountDemo.java | 29 +++----
.../java/org/apache/kafka/streams/Topology.java | 6 +-
.../kafka/streams/kstream/KGroupedStream.java | 80 +++++++++-----------
.../kafka/streams/kstream/KGroupedTable.java | 7 +-
.../kafka/streams/kstream/KStreamBuilder.java | 30 ++++----
.../apache/kafka/streams/kstream/KTable.java | 23 +++---
.../kstream/internals/AbstractStream.java | 10 +--
.../internals/InternalStreamsBuilder.java | 5 +-
.../kstream/internals/KGroupedStreamImpl.java | 59 ++++++++++-----
.../kstream/internals/KGroupedTableImpl.java | 36 +++++----
.../streams/kstream/internals/KStreamImpl.java | 36 ++++++++-
.../kstream/internals/KStreamKStreamJoin.java | 1 +
.../streams/kstream/internals/KTableImpl.java | 77 ++++++++++++++-----
.../kstream/internals/WindowedSerializer.java | 5 +-
.../streams/processor/TopologyBuilder.java | 1 +
.../processor/internals/AbstractTask.java | 2 +-
.../internals/InternalTopologyBuilder.java | 11 +--
.../streams/processor/internals/QuickUnion.java | 1 +
.../processor/internals/StreamsMetricsImpl.java | 2 +-
.../org/apache/kafka/streams/state/Stores.java | 12 +--
.../state/internals/AbstractStoreSupplier.java | 5 +-
.../InMemoryKeyValueStoreSupplier.java | 1 +
.../InMemoryLRUCacheStoreSupplier.java | 1 +
.../internals/MeteredKeyValueBytesStore.java | 1 +
.../state/internals/MeteredSessionStore.java | 1 +
.../state/internals/MeteredWindowStore.java | 2 +-
.../internals/RocksDBKeyValueStoreSupplier.java | 2 +-
.../internals/RocksDBSessionStoreSupplier.java | 1 +
.../internals/RocksDBWindowStoreSupplier.java | 2 +-
.../RocksDbSessionBytesStoreSupplier.java | 17 +++--
.../RocksDbWindowBytesStoreSupplier.java | 7 +-
.../state/internals/WindowStoreSupplier.java | 6 +-
.../kafka/test/MockStateStoreSupplier.java | 2 +-
36 files changed, 394 insertions(+), 284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 068eece..101cd23 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.Windowed;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
/**
* Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
@@ -150,45 +151,45 @@ public class PageViewTypedDemo {
Consumed.with(Serdes.String(), userProfileSerde));
KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
- .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
- @Override
- public PageViewByRegion apply(PageView view, UserProfile profile) {
- PageViewByRegion viewByRegion = new PageViewByRegion();
- viewByRegion.user = view.user;
- viewByRegion.page = view.page;
-
- if (profile != null) {
- viewByRegion.region = profile.region;
- } else {
- viewByRegion.region = "UNKNOWN";
- }
- return viewByRegion;
+ .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
+ @Override
+ public PageViewByRegion apply(PageView view, UserProfile profile) {
+ PageViewByRegion viewByRegion = new PageViewByRegion();
+ viewByRegion.user = view.user;
+ viewByRegion.page = view.page;
+
+ if (profile != null) {
+ viewByRegion.region = profile.region;
+ } else {
+ viewByRegion.region = "UNKNOWN";
}
- })
- .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
- @Override
- public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
- return new KeyValue<>(viewRegion.region, viewRegion);
- }
- })
- .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
- .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
- // TODO: we can merge ths toStream().map(...) with a single toStream(...)
- .toStream()
- .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
- @Override
- public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
- WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
- wViewByRegion.windowStart = key.window().start();
- wViewByRegion.region = key.key();
-
- RegionCount rCount = new RegionCount();
- rCount.region = key.key();
- rCount.count = value;
-
- return new KeyValue<>(wViewByRegion, rCount);
- }
- });
+ return viewByRegion;
+ }
+ })
+ .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
+ @Override
+ public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
+ return new KeyValue<>(viewRegion.region, viewRegion);
+ }
+ })
+ .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
+ .windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1)))
+ .count()
+ .toStream()
+ .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
+ @Override
+ public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
+ WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
+ wViewByRegion.windowStart = key.window().start();
+ wViewByRegion.region = key.key();
+
+ RegionCount rCount = new RegionCount();
+ rCount.region = key.key();
+ rCount.count = value;
+
+ return new KeyValue<>(wViewByRegion, rCount);
+ }
+ });
// write to the result topic
regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde, regionCountSerde));
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index c20c077..ae72042 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -87,39 +87,39 @@ public class PageViewUntypedDemo {
});
KStream<JsonNode, JsonNode> regionCount = views
- .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
- @Override
- public JsonNode apply(JsonNode view, String region) {
- ObjectNode jNode = JsonNodeFactory.instance.objectNode();
-
- return jNode.put("user", view.get("user").textValue())
- .put("page", view.get("page").textValue())
- .put("region", region == null ? "UNKNOWN" : region);
- }
- })
- .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
- @Override
- public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
- return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
- }
- })
- .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
- .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
- // TODO: we can merge ths toStream().map(...) with a single toStream(...)
- .toStream()
- .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
- @Override
- public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
- ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
- keyNode.put("window-start", key.window().start())
- .put("region", key.key());
-
- ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
- valueNode.put("count", value);
-
- return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
- }
- });
+ .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
+ @Override
+ public JsonNode apply(JsonNode view, String region) {
+ ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+ return jNode.put("user", view.get("user").textValue())
+ .put("page", view.get("page").textValue())
+ .put("region", region == null ? "UNKNOWN" : region);
+ }
+ })
+ .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
+ @Override
+ public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
+ return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
+ }
+ })
+ .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
+ .windowedBy(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
+ .count()
+ .toStream()
+ .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
+ @Override
+ public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
+ ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
+ keyNode.put("window-start", key.window().start())
+ .put("region", key.key());
+
+ ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
+ valueNode.put("count", value);
+
+ return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
+ }
+ });
// write to the result topic
regionCount.to("streams-pageviewstats-untyped-output", Produced.with(jsonSerde, jsonSerde));
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index 2039ca5..ea81dd6 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
@@ -87,38 +88,39 @@ public class TemperatureDemo {
KStream<String, String> source = builder.stream("iot-temperature");
KStream<Windowed<String>, String> max = source
- // temperature values are sent without a key (null), so in order
- // to group and reduce them, a key is needed ("temp" has been chosen)
- .selectKey(new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(String key, String value) {
- return "temp";
- }
- })
- .groupByKey()
- .reduce(new Reducer<String>() {
- @Override
- public String apply(String value1, String value2) {
- if (Integer.parseInt(value1) > Integer.parseInt(value2))
- return value1;
- else
- return value2;
- }
- }, TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
- .toStream()
- .filter(new Predicate<Windowed<String>, String>() {
- @Override
- public boolean test(Windowed<String> key, String value) {
- return Integer.parseInt(value) > TEMPERATURE_THRESHOLD;
- }
- });
+ // temperature values are sent without a key (null), so in order
+ // to group and reduce them, a key is needed ("temp" has been chosen)
+ .selectKey(new KeyValueMapper<String, String, String>() {
+ @Override
+ public String apply(String key, String value) {
+ return "temp";
+ }
+ })
+ .groupByKey()
+ .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
+ .reduce(new Reducer<String>() {
+ @Override
+ public String apply(String value1, String value2) {
+ if (Integer.parseInt(value1) > Integer.parseInt(value2))
+ return value1;
+ else
+ return value2;
+ }
+ })
+ .toStream()
+ .filter(new Predicate<Windowed<String>, String>() {
+ @Override
+ public boolean test(Windowed<String> key, String value) {
+ return Integer.parseInt(value) > TEMPERATURE_THRESHOLD;
+ }
+ });
WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), TEMPERATURE_WINDOW_SIZE);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
// need to override key serde to Windowed<String> type
- max.to(windowedSerde, Serdes.String(), "iot-temperature-max");
+ max.to("iot-temperature-max", Produced.with(windowedSerde, Serdes.String()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 5689d50..7535315 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.Arrays;
@@ -63,22 +64,22 @@ public class WordCountDemo {
KStream<String, String> source = builder.stream("streams-plaintext-input");
KTable<String, Long> counts = source
- .flatMapValues(new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(String value) {
- return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
- }
- })
- .groupBy(new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(String key, String value) {
- return value;
- }
- })
- .count("Counts");
+ .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+ @Override
+ public Iterable<String> apply(String value) {
+ return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
+ }
+ })
+ .groupBy(new KeyValueMapper<String, String, String>() {
+ @Override
+ public String apply(String key, String value) {
+ return value;
+ }
+ })
+ .count();
// need to override value serde to Long type
- counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
+ counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 1409b97..3b1ac6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import java.util.regex.Pattern;
@@ -573,6 +572,7 @@ public class Topology {
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
+ @SuppressWarnings("unchecked")
public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
final String sourceName,
final Deserializer keyDeserializer,
@@ -609,7 +609,8 @@ public class Topology {
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
- public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder,
+ @SuppressWarnings("unchecked")
+ public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
final String sourceName,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
@@ -641,6 +642,7 @@ public class Topology {
*
* @return a description of the topology.
*/
+
public synchronized TopologyDescription describe() {
return internalTopologyBuilder.describe();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 17f2db4..b3945f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.SessionStore;
@@ -132,7 +131,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* String queryableStoreName = storeSupplier.name();
@@ -149,7 +148,7 @@ public interface KGroupedStream<K, V> {
* @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
- KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+ KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Count the number of records in this stream by the grouped key.
@@ -290,7 +289,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* String queryableStoreName = storeSupplier.name();
@@ -312,7 +311,7 @@ public interface KGroupedStream<K, V> {
*/
@Deprecated
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier);
/**
@@ -333,10 +332,8 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
- * Sting queryableStoreName = storeSupplier.name();
* ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
* String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -398,7 +395,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = storeSupplier.name();
@@ -418,7 +415,7 @@ public interface KGroupedStream<K, V> {
*/
@Deprecated
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
- final StateStoreSupplier<SessionStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier);
/**
* Combine the values of records in this stream by the grouped key.
@@ -522,7 +519,7 @@ public interface KGroupedStream<K, V> {
* Combine the value of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value
- * (c.f. {@link #aggregate(Initializer, Aggregator, StateStoreSupplier)}).
+ * (c.f. {@link #aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given {@code storeSupplier}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -540,8 +537,8 @@ public interface KGroupedStream<K, V> {
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
- * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
- * max.
+ * Thus, {@code reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to compute
+ * aggregate functions like sum, min, or max.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
@@ -552,7 +549,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* String queryableStoreName = storeSupplier.name();
@@ -571,7 +568,7 @@ public interface KGroupedStream<K, V> {
*/
@Deprecated
KTable<K, V> reduce(final Reducer<V> reducer,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Combine the value of records in this stream by the grouped key.
@@ -595,7 +592,7 @@ public interface KGroupedStream<K, V> {
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
- * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
+ * Thus, {@code reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
* max.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
@@ -767,8 +764,8 @@ public interface KGroupedStream<K, V> {
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
- * Thus, {@code reduce(Reducer, Windows, StateStoreSupplier)} can be used to compute aggregate functions like sum,
- * min, or max.
+ * Thus, {@code reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to
+ * compute aggregate functions like sum, min, or max.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -779,7 +776,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = storeSupplier.name();
@@ -803,7 +800,7 @@ public interface KGroupedStream<K, V> {
@Deprecated
<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier);
/**
* Combine values of this stream by the grouped key into {@link SessionWindows}.
@@ -841,10 +838,8 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
- * Sting queryableStoreName = storeSupplier.name();
* ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
* String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -933,8 +928,8 @@ public interface KGroupedStream<K, V> {
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
- * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute aggregate functions like
- * sum, min, or max.
+ * Thus, {@code reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used
+ * to compute aggregate functions like sum, min, or max.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -945,7 +940,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = storeSupplier.name();
@@ -975,7 +970,7 @@ public interface KGroupedStream<K, V> {
@Deprecated
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
- final StateStoreSupplier<SessionStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier);
/**
@@ -1188,8 +1183,8 @@ public interface KGroupedStream<K, V> {
/**
* Aggregate the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
- * Aggregating is a generalization of {@link #reduce(Reducer, StateStoreSupplier) combining via reduce(...)} as it,
- * for example, allows the result to have a different type than the input values.
+ * Aggregating is a generalization of {@link #reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * combining via reduce(...)} as it, for example, allows the result to have a different type than the input values.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given {@code storeSupplier}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -1199,8 +1194,8 @@ public interface KGroupedStream<K, V> {
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
- * Thus, {@code aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to compute aggregate functions
- * like count (c.f. {@link #count()}).
+ * Thus, {@code aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)} can be
+ * used to compute aggregate functions like count (c.f. {@link #count()}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
@@ -1211,7 +1206,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
* Sting queryableStoreName = storeSupplier.name();
@@ -1233,7 +1228,7 @@ public interface KGroupedStream<K, V> {
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Aggregate the values of records in this stream by the grouped key and defined windows.
@@ -1362,8 +1357,9 @@ public interface KGroupedStream<K, V> {
/**
* Aggregate the values of records in this stream by the grouped key and defined windows.
* Records with {@code null} key or value are ignored.
- * Aggregating is a generalization of {@link #reduce(Reducer, Windows, StateStoreSupplier) combining via
- * reduce(...)} as it, for example, allows the result to have a different type than the input values.
+ * Aggregating is a generalization of
+ * {@link #reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) combining via reduce(...)}
+ * as it, for example, allows the result to have a different type than the input values.
* The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
* {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
* The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
@@ -1377,8 +1373,8 @@ public interface KGroupedStream<K, V> {
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
- * Thus, {@code aggregate(Initializer, Aggregator, Windows, StateStoreSupplier)} can be used to compute aggregate
- * functions like count (c.f. {@link #count(Windows)}).
+ * Thus, {@code aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)}
+ * can be used to compute aggregate functions like count (c.f. {@link #count(Windows)}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -1389,7 +1385,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type Long
* Sting queryableStoreName = storeSupplier.name();
@@ -1416,7 +1412,7 @@ public interface KGroupedStream<K, V> {
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier);
/**
* Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
@@ -1446,10 +1442,8 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local {@link SessionStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double
- * Sting queryableStoreName = storeSupplier.name();
* ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore());
* String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -1540,8 +1534,8 @@ public interface KGroupedStream<K, V> {
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
- * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, StateStoreSupplier)} can be used
- * to compute aggregate functions like count (c.f. {@link #count(SessionWindows)}).
+ * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, org.apache.kafka.streams.processor.StateStoreSupplier)}
+ * can be used to compute aggregate functions like count (c.f. {@link #count(SessionWindows)}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -1552,7 +1546,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local {@link SessionStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double
* Sting queryableStoreName = storeSupplier.name();
@@ -1583,7 +1577,7 @@ public interface KGroupedStream<K, V> {
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
final Serde<T> aggValueSerde,
- final StateStoreSupplier<SessionStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier);
/**
* Create a new {@link TimeWindowedKStream} instance that can be used to perform windowed aggregations.
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index f814eaf..d0a38cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
@@ -195,7 +194,7 @@ public interface KGroupedTable<K, V> {
* @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier)}
*/
@Deprecated
- KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+ KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -467,7 +466,7 @@ public interface KGroupedTable<K, V> {
@Deprecated
KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -945,6 +944,6 @@ public interface KGroupedTable<K, V> {
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index d4642da..6b51c86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
@@ -431,7 +430,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
return table(null, null, null, null, topic, storeSupplier);
}
@@ -525,7 +524,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
*/
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
return table(offsetReset, null, null, null, topic, storeSupplier);
}
@@ -702,7 +701,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
public <K, V> KTable<K, V> table(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
return table(null, null, keySerde, valSerde, topic, storeSupplier);
}
@@ -737,7 +736,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
final Serde<V> valSerde,
final TimestampExtractor timestampExtractor,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
final boolean isQueryable) {
try {
final String source = newName(KStreamImpl.SOURCE_NAME);
@@ -882,6 +881,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
* @return a {@link KTable} for the specified topic
*/
+ @SuppressWarnings("unchecked")
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
final TimestampExtractor timestampExtractor,
final Serde<K> keySerde,
@@ -889,12 +889,13 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
final String topic,
final String queryableStoreName) {
final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
- final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
- keySerde,
- valSerde,
- false,
- Collections.<String, String>emptyMap(),
- true);
+ final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(
+ internalStoreName,
+ keySerde,
+ valSerde,
+ false,
+ Collections.<String, String>emptyMap(),
+ true);
return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null);
}
@@ -965,7 +966,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, true);
}
@@ -1096,7 +1097,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
}
@@ -1172,7 +1173,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
final Serde<V> valSerde,
final TimestampExtractor timestampExtractor,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
try {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
final String sourceName = newName(KStreamImpl.SOURCE_NAME);
@@ -1224,6 +1225,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* @param streams the {@link KStream}s to be merged
* @return a {@link KStream} containing all records of the given streams
*/
+ @SuppressWarnings("unchecked")
public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
Objects.requireNonNull(streams, "streams can't be null");
if (streams.length <= 1) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 1abc5e7..33e56aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -206,7 +205,8 @@ public interface KTable<K, V> {
* @deprecated use {@link #filter(Predicate, Materialized) filter(predicate, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
- KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
+ KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
@@ -300,7 +300,8 @@ public interface KTable<K, V> {
* @deprecated use {@link #filterNot(Predicate, Materialized) filterNot(predicate, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
- KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
+ KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
@@ -512,7 +513,7 @@ public interface KTable<K, V> {
@Deprecated
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Serde<VR> valueSerde,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
@@ -811,7 +812,7 @@ public interface KTable<K, V> {
*/
@Deprecated
KTable<K, V> through(final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
@@ -913,7 +914,7 @@ public interface KTable<K, V> {
@Deprecated
KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
@@ -978,7 +979,7 @@ public interface KTable<K, V> {
KTable<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
@@ -1080,7 +1081,7 @@ public interface KTable<K, V> {
final Serde<V> valSerde,
final StreamPartitioner<? super K, ? super V> partitioner,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
@@ -1590,7 +1591,7 @@ public interface KTable<K, V> {
@Deprecated
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
@@ -1934,7 +1935,7 @@ public interface KTable<K, V> {
@Deprecated
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
@@ -2275,7 +2276,7 @@ public interface KTable<K, V> {
@Deprecated
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Get the name of the local state store used that can be used to query this {@code KTable}.
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index b5de562..26e404e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
@@ -80,8 +79,8 @@ public abstract class AbstractStream<K> {
};
}
- @SuppressWarnings("unchecked")
- static <T, K> StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
+ @SuppressWarnings({"unchecked", "deprecation"})
+ static <T, K> org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
final Serde<T> aggValueSerde,
final String storeName) {
Objects.requireNonNull(storeName, "storeName can't be null");
@@ -89,8 +88,8 @@ public abstract class AbstractStream<K> {
return storeFactory(keySerde, aggValueSerde, storeName).build();
}
- @SuppressWarnings("unchecked")
- static <W extends Window, T, K> StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
+ @SuppressWarnings({"unchecked", "deprecation"})
+ static <W extends Window, T, K> org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
final Serde<T> aggValSerde,
final Windows<W> windows,
final String storeName) {
@@ -101,6 +100,7 @@ public abstract class AbstractStream<K> {
.build();
}
+ @SuppressWarnings("deprecation")
static <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<K> keySerde,
final Serde<T> aggValueSerde,
final String storeName) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 357a70c..0df1524 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -70,9 +69,10 @@ public class InternalStreamsBuilder {
return new KStreamImpl<>(this, name, Collections.singleton(name), false);
}
+ @SuppressWarnings("deprecation")
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
final String source = newName(KStreamImpl.SOURCE_NAME);
final String name = newName(KTableImpl.SOURCE_NAME);
@@ -132,6 +132,7 @@ public class InternalStreamsBuilder {
consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeName, isQueryable);
}
+ @SuppressWarnings("unchecked")
public <K, V> GlobalKTable<K, V> globalTable(final String topic,
final ConsumedInternal<K, V> consumed,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 64dfd19..dafaa62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -28,11 +28,10 @@ import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -78,6 +77,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
} // no need for else {} since isQueryable is true by default
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> reduce(final Reducer<V> reducer,
final String queryableStoreName) {
@@ -91,9 +91,10 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return reduce(reducer, (String) null);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> reduce(final Reducer<V> reducer,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doAggregate(
@@ -115,7 +116,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
materializedInternal);
}
-
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
@@ -124,17 +125,18 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, getOrCreateName(queryableStoreName, REDUCE_NAME)));
}
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows) {
return windowedBy(windows).reduce(reducer);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
@Override
public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) {
Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(windows, "windows can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -145,6 +147,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
);
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -176,7 +179,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
@Override
- public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator) {
+ public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
final String storeName = builder.newStoreName(AGGREGATE_NAME);
@@ -189,6 +193,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -196,10 +201,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return aggregate(initializer, aggregator, aggValueSerde, null);
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -209,6 +215,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
storeSupplier);
}
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -219,6 +226,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
}
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -230,12 +238,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
.withValueSerde(aggValueSerde));
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
@Override
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(windows, "windows can't be null");
@@ -247,6 +255,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, Long> count(final String queryableStoreName) {
determineIsQueryable(queryableStoreName);
@@ -258,8 +267,9 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return count((String) null);
}
+ @SuppressWarnings("deprecation")
@Override
- public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ public KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, storeSupplier);
}
@@ -274,6 +284,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, materialized);
}
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
final String queryableStoreName) {
@@ -281,14 +292,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return count(windows, windowedStore(keySerde, Serdes.Long(), windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
}
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows) {
return windowedBy(windows).count();
}
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) {
return aggregate(
aggregateBuilder.countInitializer,
aggregateBuilder.countAggregator,
@@ -296,7 +309,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
storeSupplier);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
@Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -316,6 +329,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -330,14 +344,14 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
.withValueSerde(aggValueSerde));
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
@Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
final Serde<T> aggValueSerde,
- final StateStoreSupplier<SessionStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
@@ -373,7 +387,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
aggregateBuilder);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName) {
Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized = Materialized.<K, Long, SessionStore<Bytes, byte[]>>as(getOrCreateName(queryableStoreName, AGGREGATE_NAME))
.withKeySerde(keySerde)
@@ -381,13 +395,15 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return windowedBy(sessionWindows).count(materialized);
}
+ @SuppressWarnings("deprecation")
public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows) {
return windowedBy(sessionWindows).count();
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
- final StateStoreSupplier<SessionStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) {
Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
final Merger<K, Long> sessionMerger = new Merger<K, Long>() {
@@ -406,7 +422,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
@@ -418,6 +434,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
.sessionWindowed(sessionWindows.maintainMs()).build());
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows) {
@@ -425,10 +442,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return windowedBy(sessionWindows).reduce(reducer);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
- final StateStoreSupplier<SessionStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) {
Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -471,10 +489,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
+ @SuppressWarnings("deprecation")
private <T> KTable<K, T> doAggregate(
final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
final String functionName,
- final StateStoreSupplier storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier) {
final String aggFunctionName = builder.newName(functionName);
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index e69d4f9..507944a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -21,15 +21,14 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedTable;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collections;
@@ -71,11 +70,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
}
};
- public KGroupedTableImpl(final InternalStreamsBuilder builder,
- final String name,
- final String sourceName,
- final Serde<? extends K> keySerde,
- final Serde<? extends V> valSerde) {
+ KGroupedTableImpl(final InternalStreamsBuilder builder,
+ final String name,
+ final String sourceName,
+ final Serde<? extends K> keySerde,
+ final Serde<? extends V> valSerde) {
super(builder, name, Collections.singleton(sourceName));
this.keySerde = keySerde;
this.valSerde = valSerde;
@@ -88,6 +87,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
} // no need for else {} since isQueryable is true by default
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> adder,
@@ -98,6 +98,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> adder,
@@ -106,6 +107,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return aggregate(initializer, adder, subtractor, aggValueSerde, null);
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> adder,
@@ -122,11 +124,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return aggregate(initializer, adder, subtractor, (String) null);
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> adder,
final Aggregator<? super K, ? super V, T> subtractor,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
@@ -135,9 +138,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return doAggregate(aggregateSupplier, AGGREGATE_NAME, storeSupplier);
}
+ @SuppressWarnings("deprecation")
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
final String functionName,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
final String sinkName = builder.newName(KStreamImpl.SINK_NAME);
final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
final String funcName = builder.newName(functionName);
@@ -194,6 +198,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), materialized.storeName(), isQueryable);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
@@ -223,10 +228,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return reduce(adder, subtractor, (String) null);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -234,6 +240,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return doAggregate(aggregateSupplier, REDUCE_NAME, storeSupplier);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, Long> count(final String queryableStoreName) {
determineIsQueryable(queryableStoreName);
@@ -271,8 +278,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return count((String) null);
}
+ @SuppressWarnings("deprecation")
@Override
- public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ public KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
return this.aggregate(
countInitializer,
countAdder,