You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/17 20:52:54 UTC
kafka git commit: KAFKA-3825: Allow users to specify different types
of state stores in Streams DSL
Repository: kafka
Updated Branches:
refs/heads/trunk 2daa10d77 -> eaf0e4af3
KAFKA-3825: Allow users to specify different types of state stores in Streams DSL
Author: Jeyhun Karimov <je...@gmail.com>
Reviewers: Damian Guy, Guozhang Wang
Closes #1588 from jeyhunkarimov/KAFKA-3825
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eaf0e4af
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eaf0e4af
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eaf0e4af
Branch: refs/heads/trunk
Commit: eaf0e4af341818b335d17400b0a02be87a7fff9b
Parents: 2daa10d
Author: Jeyhun Karimov <je...@gmail.com>
Authored: Thu Nov 17 12:52:50 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 17 12:52:50 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/kstream/KGroupedStream.java | 88 +++++++++++
.../kafka/streams/kstream/KGroupedTable.java | 47 ++++++
.../kstream/internals/AbstractStream.java | 38 +++++
.../kstream/internals/KGroupedStreamImpl.java | 148 +++++++++++--------
.../kstream/internals/KGroupedTableImpl.java | 77 +++++-----
.../kstream/internals/KStreamAggregate.java | 5 +-
.../kstream/internals/KStreamReduce.java | 7 +-
.../internals/KStreamWindowAggregate.java | 9 +-
.../kstream/internals/KStreamWindowReduce.java | 7 +-
.../kstream/internals/KTableAggregate.java | 5 +-
.../streams/kstream/internals/KTableReduce.java | 7 +-
.../streams/kstream/internals/KTableSource.java | 7 +-
.../kstream/internals/TupleForwarder.java | 51 +++++++
.../streams/processor/StateStoreSupplier.java | 11 +-
.../state/internals/AbstractStoreSupplier.java | 3 +-
.../InMemoryKeyValueStoreSupplier.java | 4 +-
.../InMemoryLRUCacheStoreSupplier.java | 13 +-
.../internals/RocksDBKeyValueStoreSupplier.java | 15 +-
.../internals/RocksDBWindowStoreSupplier.java | 6 +-
.../internals/KGroupedStreamImplTest.java | 25 +++-
.../internals/KGroupedTableImplTest.java | 19 ++-
21 files changed, 442 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 16b55d9..f47c904 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -17,6 +17,9 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStore;
/**
* {@link KGroupedStream} is an abstraction of a <i>grouped record stream</i> of key-value pairs
@@ -49,6 +52,17 @@ public interface KGroupedStream<K, V> {
KTable<K, V> reduce(Reducer<V> reducer,
final String storeName);
+ /**
+ * Combine values of this stream by the grouped key into a new instance of ever-updating
+ * {@link KTable}. The resulting {@link KTable} will be materialized in a state
+ * store provided by the {@link StateStoreSupplier}.
+ *
+ * @param reducer the instance of {@link Reducer}
+ * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+ * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
+ */
+ KTable<K, V> reduce(final Reducer<V> reducer,
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
@@ -69,6 +83,23 @@ public interface KGroupedStream<K, V> {
final String storeName);
/**
+ * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+ * The resulting {@link KTable} will be materialized in a state
+ * store provided by the {@link StateStoreSupplier}.
+ *
+ * @param reducer the instance of {@link Reducer}
+ * @param windows the specification of the aggregation {@link Windows}
+ * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+ * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+ * where each table contains records with unmodified keys and values
+ * that represent the latest (rolling) aggregate for each key within that window
+ */
+ <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
+ Windows<W> windows,
+ final StateStoreSupplier<WindowStore> storeSupplier);
+
+
+ /**
* Aggregate values of this stream by key into a new instance of a {@link KTable}.
* The resulting {@link KTable} will be materialized in a local state
* store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
@@ -90,6 +121,21 @@ public interface KGroupedStream<K, V> {
final String storeName);
/**
+ * Aggregate values of this stream by key into a new instance of a {@link KTable}.
+ * The resulting {@link KTable} will be materialized in a state
+ * store provided by the {@link StateStoreSupplier}.
+ *
+ * @param initializer the instance of {@link Initializer}
+ * @param aggregator the instance of {@link Aggregator}
+ * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+ * @param <T> the value type of the resulting {@link KTable}
+ * @return a {@link KTable} that represents the latest (rolling) aggregate for each key
+ */
+ <T> KTable<K, T> aggregate(Initializer<T> initializer,
+ Aggregator<K, V, T> aggregator,
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+ /**
* Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
* The resulting {@link KTable} will be materialized in a local state
* store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
@@ -113,6 +159,24 @@ public interface KGroupedStream<K, V> {
Serde<T> aggValueSerde,
final String storeName);
+ /**
+ * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+ * The resulting {@link KTable} will be materialized in a state
+ * store provided by the {@link StateStoreSupplier}.
+ *
+ * @param initializer the instance of {@link Initializer}
+ * @param aggregator the instance of {@link Aggregator}
+ * @param windows the specification of the aggregation {@link Windows}
+ * @param <T> the value type of the resulting {@link KTable}
+ * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+ * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+ * where each table contains records with unmodified keys and values with type {@code T}
+ * that represent the latest (rolling) aggregate for each key within that window
+ */
+ <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer,
+ Aggregator<K, V, T> aggregator,
+ Windows<W> windows,
+ final StateStoreSupplier<WindowStore> storeSupplier);
/**
* Count number of records of this stream by key into a new instance of a {@link KTable}.
@@ -127,6 +191,16 @@ public interface KGroupedStream<K, V> {
*/
KTable<K, Long> count(final String storeName);
+ /**
+ * Count number of records of this stream by key into a new instance of a {@link KTable}.
+ * The resulting {@link KTable} will be materialized in a state
+ * store provided by the {@link StateStoreSupplier}.
+ *
+ * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+ *
+ * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
+ */
+ KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
@@ -143,4 +217,18 @@ public interface KGroupedStream<K, V> {
*/
<W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows, final String storeName);
+ /**
+ * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+ * The resulting {@link KTable} will be materialized in a state
+ * store provided by the {@link StateStoreSupplier}.
+ *
+ * @param windows the specification of the aggregation {@link Windows}
+ * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+ * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+ * where each table contains records with unmodified keys and values
+ * that represent the latest (rolling) count (i.e., number of records) for each key within that window
+ */
+ <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows,
+ final StateStoreSupplier<WindowStore> storeSupplier);
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 3ba4f22..c587538 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
/**
* {@link KGroupedTable} is an abstraction of a <i>grouped changelog stream</i> from a primary-keyed table,
@@ -51,6 +53,21 @@ public interface KGroupedTable<K, V> {
String storeName);
/**
+ * Combine updating values of this stream by the selected key into a new instance of {@link KTable}.
+ * The resulting {@link KTable} will be materialized in a state
+ * store provided by the {@link StateStoreSupplier}.
+ *
+ * @param adder the instance of {@link Reducer} for addition
+ * @param subtractor the instance of {@link Reducer} for subtraction
+ * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+ * @return a {@link KTable} with the same key and value types as this {@link KGroupedTable},
+ * containing aggregated values for each key
+ */
+ KTable<K, V> reduce(Reducer<V> adder,
+ Reducer<V> subtractor,
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+ /**
* Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}.
* The resulting {@link KTable} will be materialized in a local state
* store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
@@ -95,6 +112,25 @@ public interface KGroupedTable<K, V> {
String storeName);
/**
+ * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}
+ * using default serializers and deserializers.
+ * The resulting {@link KTable} will be materialized in a state
+ * store provided by the {@link StateStoreSupplier}.
+ *
+ * @param initializer the instance of {@link Initializer}
+ * @param adder the instance of {@link Aggregator} for addition
+ * @param subtractor the instance of {@link Aggregator} for subtraction
+ * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+ * @param <T> the value type of the aggregated {@link KTable}
+ * @return a {@link KTable} with same key and aggregated value type {@code T},
+ * containing aggregated values for each key
+ */
+ <T> KTable<K, T> aggregate(Initializer<T> initializer,
+ Aggregator<K, V, T> adder,
+ Aggregator<K, V, T> subtractor,
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+ /**
* Count number of records of this stream by the selected key into a new instance of {@link KTable}.
* The resulting {@link KTable} will be materialized in a local state
* store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
@@ -107,4 +143,15 @@ public interface KGroupedTable<K, V> {
*/
KTable<K, Long> count(String storeName);
+ /**
+ * Count number of records of this stream by the selected key into a new instance of {@link KTable}.
+ * The resulting {@link KTable} will be materialized in a state
+ * store provided by the {@link StateStoreSupplier}.
+ *
+ * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+ * @return a {@link KTable} with same key and {@link Long} value type as this {@link KGroupedTable},
+ * containing the number of values for each key
+ */
+ KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 2f5b160..31a3dc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -17,10 +17,18 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Set;
public abstract class AbstractStream<K> {
@@ -64,4 +72,34 @@ public abstract class AbstractStream<K> {
};
}
+ @SuppressWarnings("unchecked")
+ public static <T, K> StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
+ final Serde<T> aggValueSerde,
+ final String storeName) {
+ Objects.requireNonNull(storeName, "storeName can't be null");
+ return storeFactory(keySerde, aggValueSerde, storeName).build();
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <W extends Window, T, K> StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
+ final Serde<T> aggValSerde,
+ final Windows<W> windows,
+ final String storeName) {
+ Objects.requireNonNull(storeName, "storeName can't be null");
+ return storeFactory(keySerde, aggValSerde, storeName)
+ .windowed(windows.size(), windows.maintainMs(), windows.segments, false)
+ .build();
+ }
+ @SuppressWarnings("unchecked")
+ public static <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<K> keySerde,
+ final Serde<T> aggValueSerde,
+ final String storeName) {
+ return Stores.create(storeName)
+ .withKeys(keySerde)
+ .withValues(aggValueSerde)
+ .persistent()
+ .enableCaching();
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 9bc66e8..e50b6dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -16,17 +16,18 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStore;
import java.util.Collections;
import java.util.Objects;
@@ -56,27 +57,41 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
@Override
public KTable<K, V> reduce(final Reducer<V> reducer,
final String storeName) {
+ return reduce(reducer, keyValueStore(keySerde, valSerde, storeName));
+ }
+
+ @Override
+ public KTable<K, V> reduce(final Reducer<V> reducer,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(reducer, "reducer can't be null");
- Objects.requireNonNull(storeName, "storeName can't be null");
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doAggregate(
- new KStreamReduce<K, V>(storeName, reducer),
- REDUCE_NAME,
- keyValueStore(valSerde, storeName));
+ new KStreamReduce<K, V>(storeSupplier.name(), reducer),
+ REDUCE_NAME,
+ storeSupplier);
}
@SuppressWarnings("unchecked")
@Override
- public <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
- Windows<W> windows,
+ public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+ final Windows<W> windows,
final String storeName) {
+ return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, storeName));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+ final Windows<W> windows,
+ final StateStoreSupplier<WindowStore> storeSupplier) {
Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(windows, "windows can't be null");
- Objects.requireNonNull(storeName, "storeName can't be null");
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return (KTable<Windowed<K>, V>) doAggregate(
- new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
- REDUCE_NAME,
- windowedStore(valSerde, windows, storeName)
+ new KStreamWindowReduce<K, V, W>(windows, storeSupplier.name(), reducer),
+ REDUCE_NAME,
+ storeSupplier
);
}
@@ -85,13 +100,20 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
final Aggregator<K, V, T> aggregator,
final Serde<T> aggValueSerde,
final String storeName) {
+ return aggregate(initializer, aggregator, keyValueStore(keySerde, aggValueSerde, storeName));
+ }
+
+ @Override
+ public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+ final Aggregator<K, V, T> aggregator,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
- Objects.requireNonNull(storeName, "storeName can't be null");
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return doAggregate(
- new KStreamAggregate<>(storeName, initializer, aggregator),
- AGGREGATE_NAME,
- keyValueStore(aggValueSerde, storeName));
+ new KStreamAggregate<>(storeSupplier.name(), initializer, aggregator),
+ AGGREGATE_NAME,
+ storeSupplier);
}
@SuppressWarnings("unchecked")
@@ -101,34 +123,33 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
final Windows<W> windows,
final Serde<T> aggValueSerde,
final String storeName) {
+ return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, storeName));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+ final Aggregator<K, V, T> aggregator,
+ final Windows<W> windows,
+ final StateStoreSupplier<WindowStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(windows, "windows can't be null");
- Objects.requireNonNull(storeName, "storeName can't be null");
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
return (KTable<Windowed<K>, T>) doAggregate(
- new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
- AGGREGATE_NAME,
- windowedStore(aggValueSerde, windows, storeName)
+ new KStreamWindowAggregate<>(windows, storeSupplier.name(), initializer, aggregator),
+ AGGREGATE_NAME,
+ storeSupplier
);
}
@Override
public KTable<K, Long> count(final String storeName) {
- return aggregate(new Initializer<Long>() {
- @Override
- public Long apply() {
- return 0L;
- }
- }, new Aggregator<K, V, Long>() {
- @Override
- public Long apply(K aggKey, V value, Long aggregate) {
- return aggregate + 1;
- }
- }, Serdes.Long(), storeName);
+ return count(keyValueStore(keySerde, Serdes.Long(), storeName));
}
@Override
- public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows, final String storeName) {
+ public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
return aggregate(new Initializer<Long>() {
@Override
public Long apply() {
@@ -139,36 +160,39 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
public Long apply(K aggKey, V value, Long aggregate) {
return aggregate + 1;
}
- }, windows, Serdes.Long(), storeName);
+ }, storeSupplier);
}
- private <T> StateStoreSupplier keyValueStore(final Serde<T> aggValueSerde, final String name) {
- return storeFactory(aggValueSerde, name).build();
+ @Override
+ public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
+ final String storeName) {
+ return count(windows, windowedStore(keySerde, Serdes.Long(), windows, storeName));
}
-
- private <W extends Window, T> StateStoreSupplier windowedStore(final Serde<T> aggValSerde,
- final Windows<W> windows,
- final String storeName) {
- return storeFactory(aggValSerde, storeName)
- .windowed(windows.size(), windows.maintainMs(), windows.segments, false)
- .build();
-
+ @Override
+ public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
+ final StateStoreSupplier<WindowStore> storeSupplier) {
+ return aggregate(
+ new Initializer<Long>() {
+ @Override
+ public Long apply() {
+ return 0L;
+ }
+ }, new Aggregator<K, V, Long>() {
+ @Override
+ public Long apply(K aggKey, V value, Long aggregate) {
+ return aggregate + 1;
+ }
+ },
+ windows,
+ storeSupplier);
}
- private <T> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<T> aggValueSerde,
- final String storeName) {
- return Stores.create(storeName)
- .withKeys(keySerde)
- .withValues(aggValueSerde)
- .persistent()
- .enableCaching();
- }
private <T> KTable<K, T> doAggregate(
- final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
- final String functionName,
- final StateStoreSupplier storeSupplier) {
+ final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
+ final String functionName,
+ final StateStoreSupplier storeSupplier) {
final String aggFunctionName = topology.newName(functionName);
@@ -178,11 +202,11 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
topology.addStateStore(storeSupplier, aggFunctionName);
return new KTableImpl<>(topology,
- aggFunctionName,
- aggregateSupplier,
- sourceName.equals(this.name) ? sourceNodes
- : Collections.singleton(sourceName),
- storeSupplier.name());
+ aggFunctionName,
+ aggregateSupplier,
+ sourceName.equals(this.name) ? sourceNodes
+ : Collections.singleton(sourceName),
+ storeSupplier.name());
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 7aa2531..4ca69d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -21,22 +21,22 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
-import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collections;
import java.util.Objects;
/**
* The implementation class of {@link KGroupedTable}.
- *
+ *
* @param <K> the key type
* @param <V> the value type
*/
@@ -65,33 +65,39 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
Aggregator<K, V, T> subtractor,
Serde<T> aggValueSerde,
String storeName) {
-
- Objects.requireNonNull(initializer, "initializer can't be null");
- Objects.requireNonNull(adder, "adder can't be null");
- Objects.requireNonNull(subtractor, "subtractor can't be null");
- Objects.requireNonNull(storeName, "storeName can't be null");
- ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(storeName, initializer, adder, subtractor);
- return doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, storeName);
+ return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, storeName));
}
+
@Override
public <T> KTable<K, T> aggregate(Initializer<T> initializer,
- Aggregator<K, V, T> adder,
- Aggregator<K, V, T> subtractor,
- String storeName) {
-
+ Aggregator<K, V, T> adder,
+ Aggregator<K, V, T> subtractor,
+ String storeName) {
return aggregate(initializer, adder, subtractor, null, storeName);
}
+ @Override
+ public <T> KTable<K, T> aggregate(Initializer<T> initializer,
+ Aggregator<K, V, T> adder,
+ Aggregator<K, V, T> subtractor,
+ StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(initializer, "initializer can't be null");
+ Objects.requireNonNull(adder, "adder can't be null");
+ Objects.requireNonNull(subtractor, "subtractor can't be null");
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(storeSupplier.name(), initializer, adder, subtractor);
+ return doAggregate(aggregateSupplier, AGGREGATE_NAME, storeSupplier);
+ }
+
private <T> KTable<K, T> doAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier,
- Serde<T> aggValueSerde,
String functionName,
- String storeName) {
+ StateStoreSupplier<KeyValueStore> storeSupplier) {
String sinkName = topology.newName(KStreamImpl.SINK_NAME);
String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
String funcName = topology.newName(functionName);
- String topic = storeName + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
+ String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
@@ -101,13 +107,6 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
- StateStoreSupplier aggregateStore = Stores.create(storeName)
- .withKeys(keySerde)
- .withValues(aggValueSerde)
- .persistent()
- .enableCaching()
- .build();
-
// send the aggregate key-value pairs to the intermediate topic for partitioning
topology.addInternalTopic(topic);
topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
@@ -117,25 +116,37 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
// aggregate the values with the aggregator and local store
topology.addProcessor(funcName, aggregateSupplier, sourceName);
- topology.addStateStore(aggregateStore, funcName);
+ topology.addStateStore(storeSupplier, funcName);
// return the KTable representation with the intermediate topic as the sources
- return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeName);
+ return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name());
}
@Override
public KTable<K, V> reduce(Reducer<V> adder,
Reducer<V> subtractor,
String storeName) {
+ return reduce(adder, subtractor, keyValueStore(keySerde, valSerde, storeName));
+ }
+
+ @Override
+ public KTable<K, V> reduce(Reducer<V> adder,
+ Reducer<V> subtractor,
+ StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
- Objects.requireNonNull(storeName, "storeName can't be null");
- ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(storeName, adder, subtractor);
- return doAggregate(aggregateSupplier, valSerde, REDUCE_NAME, storeName);
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(storeSupplier.name(), adder, subtractor);
+ return doAggregate(aggregateSupplier, REDUCE_NAME, storeSupplier);
}
@Override
public KTable<K, Long> count(String storeName) {
+ return count(keyValueStore(keySerde, Serdes.Long(), storeName));
+ }
+
+ @Override
+ public KTable<K, Long> count(StateStoreSupplier<KeyValueStore> storeSupplier) {
return this.aggregate(
new Initializer<Long>() {
@Override
@@ -154,7 +165,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return aggregate - 1L;
}
},
- Serdes.Long(), storeName);
+ storeSupplier);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index d596d5e..5bbda1f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> {
@@ -53,13 +52,14 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
private KeyValueStore<K, T> store;
+ private TupleForwarder<K, T> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
- ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+ tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
}
@@ -82,6 +82,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
// update the store with the new value
store.put(key, newAgg);
+ tupleForwarder.maybeForward(key, newAgg, oldAgg, sendOldValues);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 1408169..9af4368 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> {
@@ -49,6 +48,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
private class KStreamReduceProcessor extends AbstractProcessor<K, V> {
private KeyValueStore<K, V> store;
+ private TupleForwarder<K, V> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
@@ -56,7 +56,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
super.init(context);
store = (KeyValueStore<K, V>) context.getStateStore(storeName);
- ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+ tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
}
@@ -77,10 +77,9 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
newAgg = reducer.apply(newAgg, value);
}
}
-
// update the store with the new value
store.put(key, newAgg);
-
+ tupleForwarder.maybeForward(key, newAgg, oldAgg, sendOldValues);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 55b0916..d74a399 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -20,15 +20,14 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
import java.util.Map;
@@ -61,6 +60,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {
private WindowStore<K, T> windowStore;
+ private TupleForwarder<Windowed<K>, T> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
@@ -68,7 +68,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
super.init(context);
windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
- ((CachedStateStore) windowStore).setFlushListener(new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues));
+ tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues));
}
@Override
@@ -110,7 +110,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
// update the store with the new value
windowStore.put(key, newAgg, window.start());
-
+ tupleForwarder.maybeForward(new Windowed<>(key, window), newAgg, oldAgg, sendOldValues);
matchedWindows.remove(entry.key);
}
}
@@ -121,6 +121,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
T oldAgg = initializer.apply();
T newAgg = aggregator.apply(key, value, oldAgg);
windowStore.put(key, newAgg, windowStartMs);
+ tupleForwarder.maybeForward(new Windowed<>(key, matchedWindows.get(windowStartMs)), newAgg, oldAgg, sendOldValues);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 0b93468..5ee02e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
import java.util.Map;
@@ -58,13 +57,14 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
private class KStreamWindowReduceProcessor extends AbstractProcessor<K, V> {
private WindowStore<K, V> windowStore;
+ private TupleForwarder<Windowed<K>, V> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
- ((CachedStateStore) windowStore).setFlushListener(new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues));
+ tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues));
}
@Override
@@ -108,7 +108,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
// update the store with the new value
windowStore.put(key, newAgg, window.start());
-
+ tupleForwarder.maybeForward(new Windowed<>(key, window), newAgg, oldAgg, sendOldValues);
matchedWindows.remove(entry.key);
}
}
@@ -117,6 +117,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
// create the new window for the rest of unmatched window that do not exist yet
for (long windowStartMs : matchedWindows.keySet()) {
windowStore.put(key, value, windowStartMs);
+ tupleForwarder.maybeForward(new Windowed<>(key, matchedWindows.get(windowStartMs)), value, null, false);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 2ef4709..fd04fb3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {
@@ -55,13 +54,14 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
private KeyValueStore<K, T> store;
+ private TupleForwarder<K, T> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
- ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+ tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
}
/**
@@ -92,6 +92,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
// update the store with the new value
store.put(key, newAgg);
+ tupleForwarder.maybeForward(key, newAgg, oldAgg, sendOldValues);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 8c2e5f9..7b29d1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
@@ -52,14 +51,14 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
private class KTableReduceProcessor extends AbstractProcessor<K, Change<V>> {
private KeyValueStore<K, V> store;
+ private TupleForwarder<K, V> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
-
store = (KeyValueStore<K, V>) context.getStateStore(storeName);
- ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+ tupleForwarder = new TupleForwarder<K, V>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
}
/**
@@ -90,7 +89,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
// update the store with the new value
store.put(key, newAgg);
-
+ tupleForwarder.maybeForward(key, newAgg, oldAgg, sendOldValues);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 20a80f4..7b777d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
@@ -47,13 +46,14 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
private class KTableSourceProcessor extends AbstractProcessor<K, V> {
private KeyValueStore<K, V> store;
+ private TupleForwarder<K, V> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
store = (KeyValueStore<K, V>) context.getStateStore(storeName);
- ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+ tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
}
@Override
@@ -61,8 +61,9 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
// the keys should never be null
if (key == null)
throw new StreamsException("Record key for the source KTable from store name " + storeName + " should not be null.");
-
+ V oldValue = store.get(key);
store.put(key, value);
+ tupleForwarder.maybeForward(key, value, oldValue, sendOldValues);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
new file mode 100644
index 0000000..02609d7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
+
+
+class TupleForwarder<K, V> {
+ private final boolean cached;
+ private final ProcessorContext context;
+ @SuppressWarnings("unchecked")
+ public TupleForwarder(final StateStore store,
+ final ProcessorContext context,
+ final ForwardingCacheFlushListener flushListener) {
+ this.cached = store instanceof CachedStateStore;
+ this.context = context;
+ if (this.cached) {
+ ((CachedStateStore) store).setFlushListener(flushListener);
+ }
+ }
+
+ public void maybeForward(final K key,
+ final V newValue,
+ final V oldValue,
+ final boolean sendOldValues) {
+ if (!cached) {
+ if (sendOldValues) {
+ context.forward(key, new Change<>(newValue, oldValue));
+ } else {
+ context.forward(key, new Change<>(newValue, null));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
index d3b0a1b..e4051ca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -21,8 +21,10 @@ import java.util.Map;
/**
* A state store supplier which can create one or more {@link StateStore} instances.
+ *
+ * @param <T> State store type
*/
-public interface StateStoreSupplier {
+public interface StateStoreSupplier<T extends StateStore> {
/**
* Return the name of this state store supplier.
@@ -34,14 +36,15 @@ public interface StateStoreSupplier {
/**
* Return a new {@link StateStore} instance.
*
- * @return a new {@link StateStore} instance
+ * @return a new {@link StateStore} instance of type T
*/
- StateStore get();
+ T get();
/**
* Returns a Map containing any log configs that will be used when creating the changelog for the {@link StateStore}
- *
+ * <p>
* Note: any unrecognized configs will be ignored by the Kafka brokers.
+ *
* @return Map containing any log configs to be used when creating the changelog for the {@link StateStore}
* If {@code loggingEnabled} returns false, this function will always return an empty map
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
index 39a33a0..3ad44ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
@@ -18,12 +18,13 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import java.util.Map;
-public abstract class AbstractStoreSupplier<K, V> implements StateStoreSupplier {
+public abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements StateStoreSupplier<T> {
protected final String name;
protected final Serde<K> keySerde;
protected final Serde<V> valueSerde;
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index c05ebb2..d09630d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -45,7 +45,7 @@ import java.util.TreeMap;
*
* @see org.apache.kafka.streams.state.Stores#create(String)
*/
-public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V> {
+public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
@@ -56,7 +56,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K
super(name, keySerde, valueSerde, time, logged, logConfig);
}
- public StateStore get() {
+ public KeyValueStore get() {
MemoryStore<K, V> store = new MemoryStore<>(name, keySerde, valueSerde);
return new MeteredKeyValueStore<>(logged ? store.enableLogging() : store, "in-memory-state", time);
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index 45bcca3..c2b56fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -1,12 +1,12 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,11 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Map;
@@ -29,7 +30,7 @@ import java.util.Map;
* @param <V> The value type
*
*/
-public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K, V> {
+public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
private final int capacity;
@@ -42,7 +43,7 @@ public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K
this.capacity = capacity;
}
- public StateStore get() {
+ public KeyValueStore get() {
MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<>(name, capacity, keySerde, valueSerde);
return new MeteredKeyValueStore<>(logged ? cache.enableLogging() : cache, "in-memory-lru-state", time);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index 68a4429..164b352 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Map;
@@ -30,11 +30,10 @@ import java.util.Map;
*
* @param <K> the type of keys
* @param <V> the type of values
- *
* @see org.apache.kafka.streams.state.Stores#create(String)
*/
-public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V> {
+public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
private final boolean enableCaching;
@@ -47,7 +46,7 @@ public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K,
this.enableCaching = enableCaching;
}
- public StateStore get() {
+ public KeyValueStore get() {
if (!enableCaching) {
RocksDBStore<K, V> store = new RocksDBStore<>(name, keySerde, valueSerde);
return new MeteredKeyValueStore<>(logged ? store.enableLogging() : store, "rocksdb-state", time);
@@ -55,9 +54,9 @@ public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K,
final RocksDBStore<Bytes, byte[]> store = new RocksDBStore<>(name, Serdes.Bytes(), Serdes.ByteArray());
return new CachingKeyValueStore<>(new MeteredKeyValueStore<>(logged ? store.enableLogging() : store,
- "rocksdb-state",
- time),
- keySerde,
- valueSerde);
+ "rocksdb-state",
+ time),
+ keySerde,
+ valueSerde);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index eb16bba..49d8882 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.WindowStore;
import java.util.Map;
@@ -34,7 +34,7 @@ import java.util.Map;
* @see org.apache.kafka.streams.state.Stores#create(String)
*/
-public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V> {
+public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> {
private final long retentionPeriod;
private final boolean retainDuplicates;
@@ -59,7 +59,7 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
return name;
}
- public StateStore get() {
+ public WindowStore get() {
if (!enableCaching) {
final RocksDBWindowStore<K, V> rocksDbStore = new RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde);
return new MeteredWindowStore<>(logged ? rocksDbStore.enableLogging() : rocksDbStore, "rocksdb-window", time);
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index a95d1fb..292f229 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
@@ -45,7 +46,14 @@ public class KGroupedStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotHaveNullStoreNameOnReduce() throws Exception {
- groupedStream.reduce(MockReducer.STRING_ADDER, null);
+ String storeName = null;
+ groupedStream.reduce(MockReducer.STRING_ADDER, storeName);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullStoreSupplierOnReduce() throws Exception {
+ StateStoreSupplier storeSupplier = null;
+ groupedStream.reduce(MockReducer.STRING_ADDER, storeSupplier);
}
@Test(expected = NullPointerException.class)
@@ -60,7 +68,8 @@ public class KGroupedStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotHaveNullStoreNameWithWindowedReduce() throws Exception {
- groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), null);
+ String storeName = null;
+ groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), storeName);
}
@Test(expected = NullPointerException.class)
@@ -75,7 +84,8 @@ public class KGroupedStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotHaveNullStoreNameOnAggregate() throws Exception {
- groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, Serdes.String(), null);
+ String storeName = null;
+ groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, Serdes.String(), storeName);
}
@Test(expected = NullPointerException.class)
@@ -95,6 +105,13 @@ public class KGroupedStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotHaveNullStoreNameOnWindowedAggregate() throws Exception {
- groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, TimeWindows.of(10), Serdes.String(), null);
+ String storeName = null;
+ groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, TimeWindows.of(10), Serdes.String(), storeName);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() throws Exception {
+ StateStoreSupplier storeSupplier = null;
+ groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, TimeWindows.of(10), storeSupplier);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf0e4af/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 9b3a90b..85e2073 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
@@ -50,7 +51,8 @@ public class KGroupedTableImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullStoreNameOnAggregate() throws Exception {
- groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, null);
+ String storeName = null;
+ groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, storeName);
}
@Test(expected = NullPointerException.class)
@@ -80,7 +82,14 @@ public class KGroupedTableImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullStoreNameOnReduce() throws Exception {
- groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, null);
+ String storeName = null;
+ groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, storeName);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullStoreSupplierOnReduce() throws Exception {
+ StateStoreSupplier storeName = null;
+ groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, storeName);
}
@Test