You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/04 21:42:10 UTC
[1/2] kafka git commit: MINOR: add suppress warnings annotations in
Streams API
Repository: kafka
Updated Branches:
refs/heads/trunk 51c652c40 -> 713a67fdd
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ae3808e..cbbe848 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -183,23 +183,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
}
-
+
+ @SuppressWarnings("deprecation")
@Override
public void print() {
print(defaultKeyValueMapper, null, null, this.name);
}
+ @SuppressWarnings("deprecation")
@Override
public void print(final String label) {
print(defaultKeyValueMapper, null, null, label);
}
+ @SuppressWarnings("deprecation")
@Override
public void print(final Serde<K> keySerde,
final Serde<V> valSerde) {
print(defaultKeyValueMapper, keySerde, valSerde, this.name);
}
+ @SuppressWarnings("deprecation")
@Override
public void print(final Serde<K> keySerde,
final Serde<V> valSerde,
@@ -207,17 +211,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
print(defaultKeyValueMapper, keySerde, valSerde, label);
}
+ @SuppressWarnings("deprecation")
@Override
public void print(final KeyValueMapper<? super K, ? super V, String> mapper) {
print(mapper, null, null, this.name);
}
+ @SuppressWarnings("deprecation")
@Override
public void print(final KeyValueMapper<? super K, ? super V, String> mapper,
final String label) {
print(mapper, null, null, label);
}
+ @SuppressWarnings("deprecation")
@Override
public void print(final KeyValueMapper<? super K, ? super V, String> mapper,
final Serde<K> keySerde,
@@ -225,6 +232,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
print(mapper, keySerde, valSerde, this.name);
}
+ @SuppressWarnings("deprecation")
@Override
public void print(final KeyValueMapper<? super K, ? super V, String> mapper,
final Serde<K> keySerde,
@@ -243,17 +251,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
builder.internalTopologyBuilder.addProcessor(name, printedInternal.build(this.name), this.name);
}
+ @SuppressWarnings("deprecation")
@Override
public void writeAsText(final String filePath) {
writeAsText(filePath, this.name, null, null, defaultKeyValueMapper);
}
+ @SuppressWarnings("deprecation")
@Override
public void writeAsText(final String filePath,
final String label) {
writeAsText(filePath, label, null, null, defaultKeyValueMapper);
}
+ @SuppressWarnings("deprecation")
@Override
public void writeAsText(final String filePath,
final Serde<K> keySerde,
@@ -261,6 +272,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
writeAsText(filePath, this.name, keySerde, valSerde, defaultKeyValueMapper);
}
+ @SuppressWarnings("deprecation")
@Override
public void writeAsText(final String filePath,
final String label,
@@ -269,12 +281,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
writeAsText(filePath, label, keySerde, valSerde, defaultKeyValueMapper);
}
+ @SuppressWarnings("deprecation")
@Override
public void writeAsText(final String filePath,
final KeyValueMapper<? super K, ? super V, String> mapper) {
writeAsText(filePath, this.name, null, null, mapper);
}
+ @SuppressWarnings("deprecation")
@Override
public void writeAsText(final String filePath,
final String label,
@@ -282,6 +296,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
writeAsText(filePath, label, null, null, mapper);
}
+ @SuppressWarnings("deprecation")
@Override
public void writeAsText(final String filePath,
final Serde<K> keySerde,
@@ -290,6 +305,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
writeAsText(filePath, this.name, keySerde, valSerde, mapper);
}
+ @SuppressWarnings("deprecation")
@Override
public void writeAsText(final String filePath,
final String label,
@@ -368,6 +384,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
return new KStreamImpl<>(builder, name, allSourceNodes, requireRepartitioning);
}
+ @SuppressWarnings("deprecation")
@Override
public KStream<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
@@ -404,6 +421,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
return new KStreamImpl<>(builder, name, sourceNodes, repartitionRequired);
}
+ @SuppressWarnings("deprecation")
@Override
public KStream<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
@@ -411,6 +429,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
return through(topic, Produced.with(keySerde, valSerde));
}
+ @SuppressWarnings("deprecation")
@Override
public KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic) {
@@ -427,12 +446,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
to(topic, Produced.<K, V>with(null, null, null));
}
+ @SuppressWarnings("deprecation")
@Override
public void to(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic) {
to(topic, Produced.streamPartitioner(partitioner));
}
+ @SuppressWarnings("deprecation")
@Override
public void to(final Serde<K> keySerde,
final Serde<V> valSerde,
@@ -440,7 +461,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
to(topic, Produced.with(keySerde, valSerde));
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
@Override
public void to(final Serde<K> keySerde,
final Serde<V> valSerde,
@@ -459,6 +480,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
+ @SuppressWarnings("unchecked")
private void to(final String topic, final ProducedInternal<K, V> produced) {
final String name = builder.newName(SINK_NAME);
final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer();
@@ -513,6 +535,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
}
+ @SuppressWarnings("deprecation")
@Override
public <V1, R> KStream<K, R> join(final KStream<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -540,6 +563,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
new KStreamImplJoin(false, false));
}
+ @SuppressWarnings("deprecation")
@Override
public <V1, R> KStream<K, R> outerJoin(final KStream<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -641,6 +665,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
return sourceName;
}
+ @SuppressWarnings("deprecation")
@Override
public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -696,6 +721,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
}
+ @SuppressWarnings("deprecation")
@Override
public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -732,6 +758,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
return new KStreamImpl<>(builder, name, sourceNodes, false);
}
+ @SuppressWarnings("unchecked")
private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
final boolean leftJoin) {
@@ -768,6 +795,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
}
+ @SuppressWarnings("deprecation")
+ @Override
public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
final Serde<K> keySerde,
@@ -795,6 +824,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
true);
}
+ @SuppressWarnings("deprecation")
@Override
public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector,
final Serde<K1> keySerde,
@@ -820,6 +850,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
+ @SuppressWarnings("deprecation")
@Override
public KGroupedStream<K, V> groupByKey(final Serde<K> keySerde,
final Serde<V> valSerde) {
@@ -849,6 +880,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
this.rightOuter = rightOuter;
}
+ @SuppressWarnings("unchecked")
public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs,
final KStream<K1, V2> other,
final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index b322415..4f26767 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -51,6 +51,7 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
private WindowStore<K, V2> otherWindow;
+ @SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index a42db0b..db8de1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -33,7 +33,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -142,8 +141,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return this.queryableStoreName;
}
+ @SuppressWarnings("deprecation")
private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
- final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
final boolean isFilterNot) {
Objects.requireNonNull(predicate, "predicate can't be null");
String name = builder.newName(FILTER_NAME);
@@ -196,19 +196,21 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return doFilter(predicate, new MaterializedInternal<>(materialized), false);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
final String queryableStoreName) {
- StateStoreSupplier<KeyValueStore> storeSupplier = null;
+ org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null;
if (queryableStoreName != null) {
storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
}
return doFilter(predicate, storeSupplier, false);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doFilter(predicate, storeSupplier, false);
}
@@ -226,26 +228,29 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return doFilter(predicate, new MaterializedInternal<>(materialized), true);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
final String queryableStoreName) {
- StateStoreSupplier<KeyValueStore> storeSupplier = null;
+ org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null;
if (queryableStoreName != null) {
storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
}
return doFilter(predicate, storeSupplier, true);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doFilter(predicate, storeSupplier, true);
}
+ @SuppressWarnings("deprecation")
private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper,
final Serde<V1> valueSerde,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(mapper);
String name = builder.newName(MAPVALUES_NAME);
String internalStoreName = null;
@@ -284,21 +289,23 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true);
}
+ @SuppressWarnings("deprecation")
@Override
public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
final Serde<V1> valueSerde,
final String queryableStoreName) {
- StateStoreSupplier<KeyValueStore> storeSupplier = null;
+ org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null;
if (queryableStoreName != null) {
storeSupplier = keyValueStore(this.keySerde, valueSerde, queryableStoreName);
}
return doMapValues(mapper, valueSerde, storeSupplier);
}
+ @SuppressWarnings("deprecation")
@Override
public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
final Serde<V1> valueSerde,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doMapValues(mapper, valueSerde, storeSupplier);
}
@@ -322,7 +329,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
print(keySerde, valSerde, this.name);
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings({"unchecked", "deprecation"})
@Override
public void print(final Serde<K> keySerde,
final Serde<V> valSerde,
@@ -356,7 +363,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
/**
* @throws TopologyException if file is not found
*/
- @SuppressWarnings("deprecation")
+ @SuppressWarnings({"unchecked", "deprecation"})
@Override
public void writeAsText(final String filePath,
final String label,
@@ -390,6 +397,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
@@ -408,12 +416,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
queryableStoreName != null));
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final StreamPartitioner<? super K, ? super V> partitioner,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
to(keySerde, valSerde, partitioner, topic);
@@ -421,6 +430,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return builder.table(topic, consumed, storeSupplier);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
@@ -428,6 +438,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final String topic) {
return through(keySerde, valSerde, partitioner, topic, (String) null);
}
+
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
@@ -436,15 +448,17 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return through(keySerde, valSerde, null, topic, queryableStoreName);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return through(keySerde, valSerde, null, topic, storeSupplier);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
@@ -452,6 +466,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return through(keySerde, valSerde, null, topic, (String) null);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic,
@@ -459,49 +474,57 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return through(null, null, partitioner, topic, queryableStoreName);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return through(null, null, partitioner, topic, storeSupplier);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic) {
return through(null, null, partitioner, topic, (String) null);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> through(final String topic,
final String queryableStoreName) {
return through(null, null, null, topic, queryableStoreName);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> through(final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return through(null, null, null, topic, storeSupplier);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> through(final String topic) {
return through(null, null, null, topic, (String) null);
}
+ @SuppressWarnings("deprecation")
@Override
public void to(final String topic) {
to(null, null, null, topic);
}
+ @SuppressWarnings("deprecation")
@Override
public void to(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic) {
to(null, null, partitioner, topic);
}
+ @SuppressWarnings("deprecation")
@Override
public void to(final Serde<K> keySerde,
final Serde<V> valSerde,
@@ -509,6 +532,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
this.toStream().to(keySerde, valSerde, null, topic);
}
+ @SuppressWarnings("deprecation")
@Override
public void to(final Serde<K> keySerde,
final Serde<V> valSerde,
@@ -552,6 +576,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return doJoin(other, joiner, new MaterializedInternal<>(materialized), false, false);
}
+ @SuppressWarnings("deprecation")
@Override
public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -560,10 +585,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return doJoin(other, joiner, false, false, joinSerde, queryableStoreName);
}
+ @SuppressWarnings("deprecation")
@Override
public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doJoin(other, joiner, false, false, storeSupplier);
}
@@ -581,6 +607,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return doJoin(other, joiner, new MaterializedInternal<>(materialized), true, true);
}
+ @SuppressWarnings("deprecation")
@Override
public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -589,10 +616,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return doJoin(other, joiner, true, true, joinSerde, queryableStoreName);
}
+ @SuppressWarnings("deprecation")
@Override
public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doJoin(other, joiner, true, true, storeSupplier);
}
@@ -614,6 +642,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
false);
}
+ @SuppressWarnings("deprecation")
@Override
public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -622,15 +651,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return doJoin(other, joiner, true, false, joinSerde, queryableStoreName);
}
+ @SuppressWarnings("deprecation")
@Override
public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doJoin(other, joiner, true, false, storeSupplier);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
final boolean leftOuter,
@@ -640,16 +670,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
- final StateStoreSupplier storeSupplier = queryableStoreName == null ? null : keyValueStore(this.keySerde, joinSerde, queryableStoreName);
+ final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier
+ = queryableStoreName == null ? null : keyValueStore(this.keySerde, joinSerde, queryableStoreName);
return doJoin(other, joiner, leftOuter, rightOuter, storeSupplier);
}
+ @SuppressWarnings({"unchecked", "deprecation"})
private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
final boolean leftOuter,
final boolean rightOuter,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
final String joinMergeName = builder.newName(MERGE_NAME);
@@ -668,6 +700,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return result;
}
+ @SuppressWarnings("unchecked")
private <VO, VR> KTable<K, VR> doJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
@@ -692,6 +725,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return result;
}
+ @SuppressWarnings("unchecked")
private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
final boolean leftOuter,
@@ -740,6 +774,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return new KTableImpl<>(builder, joinMergeName, joinMerge, allSourceNodes, internalQueryableName, internalQueryableName != null);
}
+ @SuppressWarnings("deprecation")
@Override
public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector,
final Serde<K1> keySerde,
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
index ffb3697..b4e5d44 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
@@ -43,6 +43,7 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
// Default constructor needed by Kafka
public WindowedSerializer() {}
+ @SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
if (inner == null) {
@@ -76,12 +77,12 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
inner.close();
}
- public byte[] serializeBaseKey(String topic, Windowed<T> data) {
+ byte[] serializeBaseKey(String topic, Windowed<T> data) {
return inner.serialize(topic, data.key());
}
// Only for testing
- public Serializer<T> innerSerializer() {
+ Serializer<T> innerSerializer() {
return inner;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index ad0f236..66dfa27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -50,6 +50,7 @@ import java.util.regex.Pattern;
*
* @deprecated use {@link Topology} instead
*/
+@SuppressWarnings("unchecked")
@Deprecated
public class TopologyBuilder {
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index c24686e..52465ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -46,7 +46,7 @@ public abstract class AbstractTask implements Task {
final ProcessorTopology topology;
final ProcessorStateManager stateMgr;
final Set<TopicPartition> partitions;
- final Consumer consumer;
+ final Consumer<byte[], byte[]> consumer;
final String logPrefix;
final boolean eosEnabled;
final Logger log;
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 06405ef..f2cbf51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
@@ -179,10 +178,10 @@ public class InternalTopologyBuilder {
private static class StateStoreSupplierFactory extends AbstractStateStoreFactory {
@SuppressWarnings("deprecation")
- private final StateStoreSupplier supplier;
+ private final org.apache.kafka.streams.processor.StateStoreSupplier supplier;
@SuppressWarnings("deprecation")
- StateStoreSupplierFactory(final StateStoreSupplier<?> supplier) {
+ StateStoreSupplierFactory(final org.apache.kafka.streams.processor.StateStoreSupplier<?> supplier) {
super(supplier.name(),
supplier.loggingEnabled(),
supplier instanceof WindowStoreSupplier,
@@ -196,6 +195,7 @@ public class InternalTopologyBuilder {
return supplier.get();
}
+ @SuppressWarnings("deprecation")
@Override
public long retentionPeriod() {
if (!isWindowStore()) {
@@ -498,7 +498,7 @@ public class InternalTopologyBuilder {
}
@SuppressWarnings("deprecation")
- public final void addStateStore(final StateStoreSupplier supplier,
+ public final void addStateStore(final org.apache.kafka.streams.processor.StateStoreSupplier supplier,
final String... processorNames) {
Objects.requireNonNull(supplier, "supplier can't be null");
if (stateFactories.containsKey(supplier.name())) {
@@ -531,7 +531,7 @@ public class InternalTopologyBuilder {
}
@SuppressWarnings("deprecation")
- public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+ public final void addGlobalStore(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
final String sourceName,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
@@ -925,6 +925,7 @@ public class InternalTopologyBuilder {
return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values()));
}
+ @SuppressWarnings("unchecked")
private void buildSinkNode(final Map<String, ProcessorNode> processorMap,
final Map<String, SinkNode> topicSinkMap,
final SinkNodeFactory sinkNodeFactory,
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
index b395d42..47cd61d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
@@ -52,6 +52,7 @@ public class QuickUnion<T> {
return current;
}
+ @SuppressWarnings("unchecked")
public void unite(T id1, T... idList) {
for (T id2 : idList) {
unitePair(id1, id2);
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index b254bb8..03bbceb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -112,7 +112,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
private Map<String, String> constructTags(final String scopeName, final String entityName, final String... tags) {
- List<String> updatedTagList = new ArrayList(Arrays.asList(tags));
+ List<String> updatedTagList = new ArrayList<>(Arrays.asList(tags));
updatedTagList.add(scopeName + "-id");
updatedTagList.add(entityName);
return tagMap(updatedTagList.toArray(new String[updatedTagList.size()]));
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 0173c1d..c9c44af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
@@ -271,7 +270,7 @@ public class Stores {
}
@Override
- public StateStoreSupplier build() {
+ public org.apache.kafka.streams.processor.StateStoreSupplier build() {
log.trace("Defining InMemory Store name={} capacity={} logged={}", name, capacity, logged);
if (capacity < Integer.MAX_VALUE) {
return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig);
@@ -335,7 +334,7 @@ public class Stores {
}
@Override
- public StateStoreSupplier build() {
+ public org.apache.kafka.streams.processor.StateStoreSupplier build() {
log.trace("Defining RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged);
if (sessionWindows) {
return new RocksDBSessionStoreSupplier<>(name, retentionPeriod, keySerde, valueSerde, logged, logConfig, cachingEnabled);
@@ -535,6 +534,7 @@ public class Stores {
* @param <K> the type of keys
* @param <V> the type of values
*/
+ @Deprecated
public interface InMemoryKeyValueFactory<K, V> {
/**
* Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is
@@ -567,7 +567,7 @@ public class Stores {
* Return the instance of StateStoreSupplier of new key-value store.
* @return the state store supplier; never null
*/
- StateStoreSupplier build();
+ org.apache.kafka.streams.processor.StateStoreSupplier build();
}
/**
@@ -576,6 +576,7 @@ public class Stores {
* @param <K> the type of keys
* @param <V> the type of values
*/
+ @Deprecated
public interface PersistentKeyValueFactory<K, V> {
/**
@@ -614,11 +615,12 @@ public class Stores {
* @return the factory to create a persistent key-value store
*/
PersistentKeyValueFactory<K, V> enableCaching();
+
/**
* Return the instance of StateStoreSupplier of new key-value store.
* @return the key-value store; never null
*/
- StateStoreSupplier build();
+ org.apache.kafka.streams.processor.StateStoreSupplier build();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
index 92e8ce0..10d0fe2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
@@ -19,12 +19,11 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import java.util.Map;
-
-abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements StateStoreSupplier<T> {
+@Deprecated
+abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements org.apache.kafka.streams.processor.StateStoreSupplier<T> {
protected final String name;
protected final Serde<K> keySerde;
protected final Serde<V> valueSerde;
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 6d1e6dd..f955421 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -34,6 +34,7 @@ import java.util.Map;
*
* @see org.apache.kafka.streams.state.Stores#create(String)
*/
+@Deprecated
public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index c93bacb..0f897ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -29,6 +29,7 @@ import java.util.Map;
* @param <V> The value type
*
*/
+@Deprecated
public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
private final int capacity;
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
index 7ac8bab..a6ff8d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
@@ -95,6 +95,7 @@ public class MeteredKeyValueBytesStore<K, V> extends WrappedStateStore.AbstractS
}, time);
}
+ @SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context, StateStore root) {
this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 8c10987..8d9065c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -57,6 +57,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
this.time = time;
}
+ @SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context, final StateStore root) {
//noinspection unchecked
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 49d8050..20c7c43 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -57,7 +57,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
this.valueSerde = valueSerde;
}
-
+ @SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context, final StateStore root) {
this.context = context;
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index d629c1c..4b233f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -29,7 +29,7 @@ import java.util.Map;
* @param <V> the type of values
* @see org.apache.kafka.streams.state.Stores#create(String)
*/
-
+@Deprecated
public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
private final KeyValueStoreBuilder<K, V> builder;
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
index f5432dc..1552f7d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
@@ -30,6 +30,7 @@ import java.util.Map;
*
* @see org.apache.kafka.streams.state.Stores#create(String)
*/
+@Deprecated
public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, SessionStore> implements WindowStoreSupplier<SessionStore> {
static final int NUM_SEGMENTS = 3;
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index b899f5e..2a82f79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -30,7 +30,7 @@ import java.util.Map;
*
* @see org.apache.kafka.streams.state.Stores#create(String)
*/
-
+@Deprecated
public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> implements WindowStoreSupplier<WindowStore> {
public static final int MIN_SEGMENTS = 2;
private final long retentionPeriod;
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index ffeb7d8..b9b7181 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -21,8 +21,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
-import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS;
-
public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSupplier {
private final String name;
private final long retentionPeriod;
@@ -38,12 +36,14 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
return name;
}
+ @SuppressWarnings("deprecation")
@Override
public SessionStore<Bytes, byte[]> get() {
- final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name,
- retentionPeriod,
- NUM_SEGMENTS,
- new SessionKeySchema());
+ final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(
+ name,
+ retentionPeriod,
+ org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS,
+ new SessionKeySchema());
return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), Serdes.ByteArray());
}
@@ -52,8 +52,11 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
return "rocksdb-session";
}
+ @SuppressWarnings("deprecation")
@Override
public long segmentIntervalMs() {
- return Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
+ return Segments.segmentInterval(
+ retentionPeriod,
+ org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index a0500b6..e873435 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -20,8 +20,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
-import static org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS;
-
public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier {
private final String name;
private final long retentionPeriod;
@@ -29,13 +27,14 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
private final long windowSize;
private final boolean retainDuplicates;
+ @SuppressWarnings("deprecation")
public RocksDbWindowBytesStoreSupplier(final String name,
final long retentionPeriod,
final int segments,
final long windowSize,
final boolean retainDuplicates) {
- if (segments < MIN_SEGMENTS) {
- throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
+ if (segments < org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS) {
+ throw new IllegalArgumentException("numSegments must be >= " + org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS);
}
this.name = name;
this.retentionPeriod = retentionPeriod;
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
index ad24c25..3495352 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
@@ -17,14 +17,14 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
/**
- * A windowed state store supplier that extends the {@link StateStoreSupplier} interface.
+ * A windowed state store supplier that extends the {@link org.apache.kafka.streams.processor.StateStoreSupplier} interface.
*
* @param <T> State store type
*/
-public interface WindowStoreSupplier<T extends StateStore> extends StateStoreSupplier<T> {
+@Deprecated
+public interface WindowStoreSupplier<T extends StateStore> extends org.apache.kafka.streams.processor.StateStoreSupplier<T> {
// window retention period in milli-second
long retentionPeriod();
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index a4b3118..100a11c 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
-@SuppressWarnings("deprecation")
+@Deprecated
public class MockStateStoreSupplier implements StateStoreSupplier {
private String name;
private boolean persistent;
[2/2] kafka git commit: MINOR: add suppress warnings annotations in
Streams API
Posted by gu...@apache.org.
MINOR: add suppress warnings annotations in Streams API
- fixes examples with regard to new API
- fixes `Topology#addGlobalStore` parameters
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>
Closes #4003 from mjsax/minor-deprecated
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/713a67fd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/713a67fd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/713a67fd
Branch: refs/heads/trunk
Commit: 713a67fddaec3fa9cd7cce53dd6fef5ab6e0cdab
Parents: 51c652c
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Oct 4 14:42:07 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 4 14:42:07 2017 -0700
----------------------------------------------------------------------
.../examples/pageview/PageViewTypedDemo.java | 77 +++++++++----------
.../examples/pageview/PageViewUntypedDemo.java | 66 ++++++++--------
.../examples/temperature/TemperatureDemo.java | 54 ++++++-------
.../examples/wordcount/WordCountDemo.java | 29 +++----
.../java/org/apache/kafka/streams/Topology.java | 6 +-
.../kafka/streams/kstream/KGroupedStream.java | 80 +++++++++-----------
.../kafka/streams/kstream/KGroupedTable.java | 7 +-
.../kafka/streams/kstream/KStreamBuilder.java | 30 ++++----
.../apache/kafka/streams/kstream/KTable.java | 23 +++---
.../kstream/internals/AbstractStream.java | 10 +--
.../internals/InternalStreamsBuilder.java | 5 +-
.../kstream/internals/KGroupedStreamImpl.java | 59 ++++++++++-----
.../kstream/internals/KGroupedTableImpl.java | 36 +++++----
.../streams/kstream/internals/KStreamImpl.java | 36 ++++++++-
.../kstream/internals/KStreamKStreamJoin.java | 1 +
.../streams/kstream/internals/KTableImpl.java | 77 ++++++++++++++-----
.../kstream/internals/WindowedSerializer.java | 5 +-
.../streams/processor/TopologyBuilder.java | 1 +
.../processor/internals/AbstractTask.java | 2 +-
.../internals/InternalTopologyBuilder.java | 11 +--
.../streams/processor/internals/QuickUnion.java | 1 +
.../processor/internals/StreamsMetricsImpl.java | 2 +-
.../org/apache/kafka/streams/state/Stores.java | 12 +--
.../state/internals/AbstractStoreSupplier.java | 5 +-
.../InMemoryKeyValueStoreSupplier.java | 1 +
.../InMemoryLRUCacheStoreSupplier.java | 1 +
.../internals/MeteredKeyValueBytesStore.java | 1 +
.../state/internals/MeteredSessionStore.java | 1 +
.../state/internals/MeteredWindowStore.java | 2 +-
.../internals/RocksDBKeyValueStoreSupplier.java | 2 +-
.../internals/RocksDBSessionStoreSupplier.java | 1 +
.../internals/RocksDBWindowStoreSupplier.java | 2 +-
.../RocksDbSessionBytesStoreSupplier.java | 17 +++--
.../RocksDbWindowBytesStoreSupplier.java | 7 +-
.../state/internals/WindowStoreSupplier.java | 6 +-
.../kafka/test/MockStateStoreSupplier.java | 2 +-
36 files changed, 394 insertions(+), 284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 068eece..101cd23 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.Windowed;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
/**
* Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
@@ -150,45 +151,45 @@ public class PageViewTypedDemo {
Consumed.with(Serdes.String(), userProfileSerde));
KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
- .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
- @Override
- public PageViewByRegion apply(PageView view, UserProfile profile) {
- PageViewByRegion viewByRegion = new PageViewByRegion();
- viewByRegion.user = view.user;
- viewByRegion.page = view.page;
-
- if (profile != null) {
- viewByRegion.region = profile.region;
- } else {
- viewByRegion.region = "UNKNOWN";
- }
- return viewByRegion;
+ .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
+ @Override
+ public PageViewByRegion apply(PageView view, UserProfile profile) {
+ PageViewByRegion viewByRegion = new PageViewByRegion();
+ viewByRegion.user = view.user;
+ viewByRegion.page = view.page;
+
+ if (profile != null) {
+ viewByRegion.region = profile.region;
+ } else {
+ viewByRegion.region = "UNKNOWN";
}
- })
- .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
- @Override
- public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
- return new KeyValue<>(viewRegion.region, viewRegion);
- }
- })
- .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
- .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
- // TODO: we can merge ths toStream().map(...) with a single toStream(...)
- .toStream()
- .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
- @Override
- public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
- WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
- wViewByRegion.windowStart = key.window().start();
- wViewByRegion.region = key.key();
-
- RegionCount rCount = new RegionCount();
- rCount.region = key.key();
- rCount.count = value;
-
- return new KeyValue<>(wViewByRegion, rCount);
- }
- });
+ return viewByRegion;
+ }
+ })
+ .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
+ @Override
+ public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
+ return new KeyValue<>(viewRegion.region, viewRegion);
+ }
+ })
+ .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
+ .windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1)))
+ .count()
+ .toStream()
+ .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
+ @Override
+ public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
+ WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
+ wViewByRegion.windowStart = key.window().start();
+ wViewByRegion.region = key.key();
+
+ RegionCount rCount = new RegionCount();
+ rCount.region = key.key();
+ rCount.count = value;
+
+ return new KeyValue<>(wViewByRegion, rCount);
+ }
+ });
// write to the result topic
regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde, regionCountSerde));
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index c20c077..ae72042 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -87,39 +87,39 @@ public class PageViewUntypedDemo {
});
KStream<JsonNode, JsonNode> regionCount = views
- .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
- @Override
- public JsonNode apply(JsonNode view, String region) {
- ObjectNode jNode = JsonNodeFactory.instance.objectNode();
-
- return jNode.put("user", view.get("user").textValue())
- .put("page", view.get("page").textValue())
- .put("region", region == null ? "UNKNOWN" : region);
- }
- })
- .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
- @Override
- public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
- return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
- }
- })
- .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
- .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
- // TODO: we can merge ths toStream().map(...) with a single toStream(...)
- .toStream()
- .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
- @Override
- public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
- ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
- keyNode.put("window-start", key.window().start())
- .put("region", key.key());
-
- ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
- valueNode.put("count", value);
-
- return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
- }
- });
+ .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
+ @Override
+ public JsonNode apply(JsonNode view, String region) {
+ ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+ return jNode.put("user", view.get("user").textValue())
+ .put("page", view.get("page").textValue())
+ .put("region", region == null ? "UNKNOWN" : region);
+ }
+ })
+ .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
+ @Override
+ public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
+ return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
+ }
+ })
+ .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
+ .windowedBy(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
+ .count()
+ .toStream()
+ .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
+ @Override
+ public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
+ ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
+ keyNode.put("window-start", key.window().start())
+ .put("region", key.key());
+
+ ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
+ valueNode.put("count", value);
+
+ return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
+ }
+ });
// write to the result topic
regionCount.to("streams-pageviewstats-untyped-output", Produced.with(jsonSerde, jsonSerde));
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index 2039ca5..ea81dd6 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
@@ -87,38 +88,39 @@ public class TemperatureDemo {
KStream<String, String> source = builder.stream("iot-temperature");
KStream<Windowed<String>, String> max = source
- // temperature values are sent without a key (null), so in order
- // to group and reduce them, a key is needed ("temp" has been chosen)
- .selectKey(new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(String key, String value) {
- return "temp";
- }
- })
- .groupByKey()
- .reduce(new Reducer<String>() {
- @Override
- public String apply(String value1, String value2) {
- if (Integer.parseInt(value1) > Integer.parseInt(value2))
- return value1;
- else
- return value2;
- }
- }, TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
- .toStream()
- .filter(new Predicate<Windowed<String>, String>() {
- @Override
- public boolean test(Windowed<String> key, String value) {
- return Integer.parseInt(value) > TEMPERATURE_THRESHOLD;
- }
- });
+ // temperature values are sent without a key (null), so in order
+ // to group and reduce them, a key is needed ("temp" has been chosen)
+ .selectKey(new KeyValueMapper<String, String, String>() {
+ @Override
+ public String apply(String key, String value) {
+ return "temp";
+ }
+ })
+ .groupByKey()
+ .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
+ .reduce(new Reducer<String>() {
+ @Override
+ public String apply(String value1, String value2) {
+ if (Integer.parseInt(value1) > Integer.parseInt(value2))
+ return value1;
+ else
+ return value2;
+ }
+ })
+ .toStream()
+ .filter(new Predicate<Windowed<String>, String>() {
+ @Override
+ public boolean test(Windowed<String> key, String value) {
+ return Integer.parseInt(value) > TEMPERATURE_THRESHOLD;
+ }
+ });
WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), TEMPERATURE_WINDOW_SIZE);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
// need to override key serde to Windowed<String> type
- max.to(windowedSerde, Serdes.String(), "iot-temperature-max");
+ max.to("iot-temperature-max", Produced.with(windowedSerde, Serdes.String()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 5689d50..7535315 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.Arrays;
@@ -63,22 +64,22 @@ public class WordCountDemo {
KStream<String, String> source = builder.stream("streams-plaintext-input");
KTable<String, Long> counts = source
- .flatMapValues(new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(String value) {
- return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
- }
- })
- .groupBy(new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(String key, String value) {
- return value;
- }
- })
- .count("Counts");
+ .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+ @Override
+ public Iterable<String> apply(String value) {
+ return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
+ }
+ })
+ .groupBy(new KeyValueMapper<String, String, String>() {
+ @Override
+ public String apply(String key, String value) {
+ return value;
+ }
+ })
+ .count();
// need to override value serde to Long type
- counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
+ counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 1409b97..3b1ac6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import java.util.regex.Pattern;
@@ -573,6 +572,7 @@ public class Topology {
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
+ @SuppressWarnings("unchecked")
public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
final String sourceName,
final Deserializer keyDeserializer,
@@ -609,7 +609,8 @@ public class Topology {
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
- public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder,
+ @SuppressWarnings("unchecked")
+ public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
final String sourceName,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
@@ -641,6 +642,7 @@ public class Topology {
*
* @return a description of the topology.
*/
+
public synchronized TopologyDescription describe() {
return internalTopologyBuilder.describe();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 17f2db4..b3945f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.SessionStore;
@@ -132,7 +131,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* String queryableStoreName = storeSupplier.name();
@@ -149,7 +148,7 @@ public interface KGroupedStream<K, V> {
* @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
- KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+ KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Count the number of records in this stream by the grouped key.
@@ -290,7 +289,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // counting words
* String queryableStoreName = storeSupplier.name();
@@ -312,7 +311,7 @@ public interface KGroupedStream<K, V> {
*/
@Deprecated
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier);
/**
@@ -333,10 +332,8 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
- * Sting queryableStoreName = storeSupplier.name();
* ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
* String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -398,7 +395,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = storeSupplier.name();
@@ -418,7 +415,7 @@ public interface KGroupedStream<K, V> {
*/
@Deprecated
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
- final StateStoreSupplier<SessionStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier);
/**
* Combine the values of records in this stream by the grouped key.
@@ -522,7 +519,7 @@ public interface KGroupedStream<K, V> {
* Combine the value of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value
- * (c.f. {@link #aggregate(Initializer, Aggregator, StateStoreSupplier)}).
+ * (c.f. {@link #aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given {@code storeSupplier}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -540,8 +537,8 @@ public interface KGroupedStream<K, V> {
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
- * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
- * max.
+ * Thus, {@code reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to compute
+ * aggregate functions like sum, min, or max.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
@@ -552,7 +549,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* String queryableStoreName = storeSupplier.name();
@@ -571,7 +568,7 @@ public interface KGroupedStream<K, V> {
*/
@Deprecated
KTable<K, V> reduce(final Reducer<V> reducer,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Combine the value of records in this stream by the grouped key.
@@ -595,7 +592,7 @@ public interface KGroupedStream<K, V> {
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
- * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
+ * Thus, {@code reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
* max.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
@@ -767,8 +764,8 @@ public interface KGroupedStream<K, V> {
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
- * Thus, {@code reduce(Reducer, Windows, StateStoreSupplier)} can be used to compute aggregate functions like sum,
- * min, or max.
+ * Thus, {@code reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to
+ * compute aggregate functions like sum, min, or max.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -779,7 +776,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = storeSupplier.name();
@@ -803,7 +800,7 @@ public interface KGroupedStream<K, V> {
@Deprecated
<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier);
/**
* Combine values of this stream by the grouped key into {@link SessionWindows}.
@@ -841,10 +838,8 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
- * Sting queryableStoreName = storeSupplier.name();
* ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
* String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -933,8 +928,8 @@ public interface KGroupedStream<K, V> {
* <p>
* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
* value as-is.
- * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute aggregate functions like
- * sum, min, or max.
+ * Thus, {@code reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used
+ * to compute aggregate functions like sum, min, or max.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -945,7 +940,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = storeSupplier.name();
@@ -975,7 +970,7 @@ public interface KGroupedStream<K, V> {
@Deprecated
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
- final StateStoreSupplier<SessionStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier);
/**
@@ -1188,8 +1183,8 @@ public interface KGroupedStream<K, V> {
/**
* Aggregate the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
- * Aggregating is a generalization of {@link #reduce(Reducer, StateStoreSupplier) combining via reduce(...)} as it,
- * for example, allows the result to have a different type than the input values.
+ * Aggregating is a generalization of {@link #reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * combining via reduce(...)} as it, for example, allows the result to have a different type than the input values.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
* provided by the given {@code storeSupplier}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
@@ -1199,8 +1194,8 @@ public interface KGroupedStream<K, V> {
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
- * Thus, {@code aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to compute aggregate functions
- * like count (c.f. {@link #count()}).
+ * Thus, {@code aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)} can be
+ * used to compute aggregate functions like count (c.f. {@link #count()}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same key.
@@ -1211,7 +1206,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
* Sting queryableStoreName = storeSupplier.name();
@@ -1233,7 +1228,7 @@ public interface KGroupedStream<K, V> {
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Aggregate the values of records in this stream by the grouped key and defined windows.
@@ -1362,8 +1357,9 @@ public interface KGroupedStream<K, V> {
/**
* Aggregate the values of records in this stream by the grouped key and defined windows.
* Records with {@code null} key or value are ignored.
- * Aggregating is a generalization of {@link #reduce(Reducer, Windows, StateStoreSupplier) combining via
- * reduce(...)} as it, for example, allows the result to have a different type than the input values.
+ * Aggregating is a generalization of
+ * {@link #reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) combining via reduce(...)}
+ * as it, for example, allows the result to have a different type than the input values.
* The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
* {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
* The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
@@ -1377,8 +1373,8 @@ public interface KGroupedStream<K, V> {
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
- * Thus, {@code aggregate(Initializer, Aggregator, Windows, StateStoreSupplier)} can be used to compute aggregate
- * functions like count (c.f. {@link #count(Windows)}).
+ * Thus, {@code aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)}
+ * can be used to compute aggregate functions like count (c.f. {@link #count(Windows)}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -1389,7 +1385,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type Long
* Sting queryableStoreName = storeSupplier.name();
@@ -1416,7 +1412,7 @@ public interface KGroupedStream<K, V> {
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier);
/**
* Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
@@ -1446,10 +1442,8 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local {@link SessionStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double
- * Sting queryableStoreName = storeSupplier.name();
* ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore());
* String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -1540,8 +1534,8 @@ public interface KGroupedStream<K, V> {
* The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
* aggregate (or for the very first record using the intermediate aggregation result provided via the
* {@link Initializer}) and the record's value.
- * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, StateStoreSupplier)} can be used
- * to compute aggregate functions like count (c.f. {@link #count(SessionWindows)}).
+ * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, org.apache.kafka.streams.processor.StateStoreSupplier)}
+ * can be used to compute aggregate functions like count (c.f. {@link #count(SessionWindows)}).
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
* the same window and key.
@@ -1552,7 +1546,7 @@ public interface KGroupedStream<K, V> {
* <p>
* To query the local {@link SessionStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
- * Use {@link StateStoreSupplier#name()} to get the store name:
+ * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double
* Sting queryableStoreName = storeSupplier.name();
@@ -1583,7 +1577,7 @@ public interface KGroupedStream<K, V> {
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
final Serde<T> aggValueSerde,
- final StateStoreSupplier<SessionStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier);
/**
* Create a new {@link TimeWindowedKStream} instance that can be used to perform windowed aggregations.
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index f814eaf..d0a38cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
@@ -195,7 +194,7 @@ public interface KGroupedTable<K, V> {
* @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier)}
*/
@Deprecated
- KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+ KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -467,7 +466,7 @@ public interface KGroupedTable<K, V> {
@Deprecated
KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -945,6 +944,6 @@ public interface KGroupedTable<K, V> {
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index d4642da..6b51c86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
@@ -431,7 +430,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
return table(null, null, null, null, topic, storeSupplier);
}
@@ -525,7 +524,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
*/
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
return table(offsetReset, null, null, null, topic, storeSupplier);
}
@@ -702,7 +701,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
public <K, V> KTable<K, V> table(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
return table(null, null, keySerde, valSerde, topic, storeSupplier);
}
@@ -737,7 +736,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
final Serde<V> valSerde,
final TimestampExtractor timestampExtractor,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
final boolean isQueryable) {
try {
final String source = newName(KStreamImpl.SOURCE_NAME);
@@ -882,6 +881,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
* @return a {@link KTable} for the specified topic
*/
+ @SuppressWarnings("unchecked")
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
final TimestampExtractor timestampExtractor,
final Serde<K> keySerde,
@@ -889,12 +889,13 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
final String topic,
final String queryableStoreName) {
final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
- final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
- keySerde,
- valSerde,
- false,
- Collections.<String, String>emptyMap(),
- true);
+ final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(
+ internalStoreName,
+ keySerde,
+ valSerde,
+ false,
+ Collections.<String, String>emptyMap(),
+ true);
return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null);
}
@@ -965,7 +966,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, true);
}
@@ -1096,7 +1097,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
}
@@ -1172,7 +1173,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
final Serde<V> valSerde,
final TimestampExtractor timestampExtractor,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
try {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
final String sourceName = newName(KStreamImpl.SOURCE_NAME);
@@ -1224,6 +1225,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* @param streams the {@link KStream}s to be merged
* @return a {@link KStream} containing all records of the given streams
*/
+ @SuppressWarnings("unchecked")
public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
Objects.requireNonNull(streams, "streams can't be null");
if (streams.length <= 1) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 1abc5e7..33e56aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -206,7 +205,8 @@ public interface KTable<K, V> {
* @deprecated use {@link #filter(Predicate, Materialized) filter(predicate, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
- KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
+ KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
@@ -300,7 +300,8 @@ public interface KTable<K, V> {
* @deprecated use {@link #filterNot(Predicate, Materialized) filterNot(predicate, Materialized.as(KeyValueByteStoreSupplier))}
*/
@Deprecated
- KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
+ KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
@@ -512,7 +513,7 @@ public interface KTable<K, V> {
@Deprecated
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Serde<VR> valueSerde,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
@@ -811,7 +812,7 @@ public interface KTable<K, V> {
*/
@Deprecated
KTable<K, V> through(final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
@@ -913,7 +914,7 @@ public interface KTable<K, V> {
@Deprecated
KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
@@ -978,7 +979,7 @@ public interface KTable<K, V> {
KTable<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
@@ -1080,7 +1081,7 @@ public interface KTable<K, V> {
final Serde<V> valSerde,
final StreamPartitioner<? super K, ? super V> partitioner,
final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
@@ -1590,7 +1591,7 @@ public interface KTable<K, V> {
@Deprecated
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
@@ -1934,7 +1935,7 @@ public interface KTable<K, V> {
@Deprecated
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
@@ -2275,7 +2276,7 @@ public interface KTable<K, V> {
@Deprecated
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Get the name of the local state store used that can be used to query this {@code KTable}.
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index b5de562..26e404e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
@@ -80,8 +79,8 @@ public abstract class AbstractStream<K> {
};
}
- @SuppressWarnings("unchecked")
- static <T, K> StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
+ @SuppressWarnings({"unchecked", "deprecation"})
+ static <T, K> org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
final Serde<T> aggValueSerde,
final String storeName) {
Objects.requireNonNull(storeName, "storeName can't be null");
@@ -89,8 +88,8 @@ public abstract class AbstractStream<K> {
return storeFactory(keySerde, aggValueSerde, storeName).build();
}
- @SuppressWarnings("unchecked")
- static <W extends Window, T, K> StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
+ @SuppressWarnings({"unchecked", "deprecation"})
+ static <W extends Window, T, K> org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
final Serde<T> aggValSerde,
final Windows<W> windows,
final String storeName) {
@@ -101,6 +100,7 @@ public abstract class AbstractStream<K> {
.build();
}
+ @SuppressWarnings("deprecation")
static <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<K> keySerde,
final Serde<T> aggValueSerde,
final String storeName) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 357a70c..0df1524 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -70,9 +69,10 @@ public class InternalStreamsBuilder {
return new KStreamImpl<>(this, name, Collections.singleton(name), false);
}
+ @SuppressWarnings("deprecation")
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
final String source = newName(KStreamImpl.SOURCE_NAME);
final String name = newName(KTableImpl.SOURCE_NAME);
@@ -132,6 +132,7 @@ public class InternalStreamsBuilder {
consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeName, isQueryable);
}
+ @SuppressWarnings("unchecked")
public <K, V> GlobalKTable<K, V> globalTable(final String topic,
final ConsumedInternal<K, V> consumed,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 64dfd19..dafaa62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -28,11 +28,10 @@ import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -78,6 +77,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
} // no need for else {} since isQueryable is true by default
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> reduce(final Reducer<V> reducer,
final String queryableStoreName) {
@@ -91,9 +91,10 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return reduce(reducer, (String) null);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> reduce(final Reducer<V> reducer,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doAggregate(
@@ -115,7 +116,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
materializedInternal);
}
-
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
@@ -124,17 +125,18 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, getOrCreateName(queryableStoreName, REDUCE_NAME)));
}
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows) {
return windowedBy(windows).reduce(reducer);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
@Override
public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) {
Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(windows, "windows can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -145,6 +147,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
);
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -176,7 +179,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
@Override
- public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator) {
+ public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
final String storeName = builder.newStoreName(AGGREGATE_NAME);
@@ -189,6 +193,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -196,10 +201,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return aggregate(initializer, aggregator, aggValueSerde, null);
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -209,6 +215,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
storeSupplier);
}
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -219,6 +226,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
}
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -230,12 +238,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
.withValueSerde(aggValueSerde));
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
@Override
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(windows, "windows can't be null");
@@ -247,6 +255,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, Long> count(final String queryableStoreName) {
determineIsQueryable(queryableStoreName);
@@ -258,8 +267,9 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return count((String) null);
}
+ @SuppressWarnings("deprecation")
@Override
- public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ public KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, storeSupplier);
}
@@ -274,6 +284,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, materialized);
}
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
final String queryableStoreName) {
@@ -281,14 +292,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return count(windows, windowedStore(keySerde, Serdes.Long(), windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
}
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows) {
return windowedBy(windows).count();
}
+ @SuppressWarnings("deprecation")
@Override
public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) {
return aggregate(
aggregateBuilder.countInitializer,
aggregateBuilder.countAggregator,
@@ -296,7 +309,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
storeSupplier);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
@Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -316,6 +329,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -330,14 +344,14 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
.withValueSerde(aggValueSerde));
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
@Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
final Serde<T> aggValueSerde,
- final StateStoreSupplier<SessionStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
@@ -373,7 +387,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
aggregateBuilder);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName) {
Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized = Materialized.<K, Long, SessionStore<Bytes, byte[]>>as(getOrCreateName(queryableStoreName, AGGREGATE_NAME))
.withKeySerde(keySerde)
@@ -381,13 +395,15 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return windowedBy(sessionWindows).count(materialized);
}
+ @SuppressWarnings("deprecation")
public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows) {
return windowedBy(sessionWindows).count();
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
- final StateStoreSupplier<SessionStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) {
Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
final Merger<K, Long> sessionMerger = new Merger<K, Long>() {
@@ -406,7 +422,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "deprecation"})
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
@@ -418,6 +434,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
.sessionWindowed(sessionWindows.maintainMs()).build());
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows) {
@@ -425,10 +442,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return windowedBy(sessionWindows).reduce(reducer);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
- final StateStoreSupplier<SessionStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) {
Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -471,10 +489,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
+ @SuppressWarnings("deprecation")
private <T> KTable<K, T> doAggregate(
final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
final String functionName,
- final StateStoreSupplier storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier) {
final String aggFunctionName = builder.newName(functionName);
http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index e69d4f9..507944a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -21,15 +21,14 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedTable;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collections;
@@ -71,11 +70,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
}
};
- public KGroupedTableImpl(final InternalStreamsBuilder builder,
- final String name,
- final String sourceName,
- final Serde<? extends K> keySerde,
- final Serde<? extends V> valSerde) {
+ KGroupedTableImpl(final InternalStreamsBuilder builder,
+ final String name,
+ final String sourceName,
+ final Serde<? extends K> keySerde,
+ final Serde<? extends V> valSerde) {
super(builder, name, Collections.singleton(sourceName));
this.keySerde = keySerde;
this.valSerde = valSerde;
@@ -88,6 +87,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
} // no need for else {} since isQueryable is true by default
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> adder,
@@ -98,6 +98,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> adder,
@@ -106,6 +107,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return aggregate(initializer, adder, subtractor, aggValueSerde, null);
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> adder,
@@ -122,11 +124,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return aggregate(initializer, adder, subtractor, (String) null);
}
+ @SuppressWarnings("deprecation")
@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> adder,
final Aggregator<? super K, ? super V, T> subtractor,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
@@ -135,9 +138,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return doAggregate(aggregateSupplier, AGGREGATE_NAME, storeSupplier);
}
+ @SuppressWarnings("deprecation")
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
final String functionName,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
final String sinkName = builder.newName(KStreamImpl.SINK_NAME);
final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
final String funcName = builder.newName(functionName);
@@ -194,6 +198,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), materialized.storeName(), isQueryable);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
@@ -223,10 +228,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return reduce(adder, subtractor, (String) null);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -234,6 +240,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return doAggregate(aggregateSupplier, REDUCE_NAME, storeSupplier);
}
+ @SuppressWarnings("deprecation")
@Override
public KTable<K, Long> count(final String queryableStoreName) {
determineIsQueryable(queryableStoreName);
@@ -271,8 +278,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return count((String) null);
}
+ @SuppressWarnings("deprecation")
@Override
- public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ public KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
return this.aggregate(
countInitializer,
countAdder,