You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/03 23:15:59 UTC
[2/5] kafka git commit: KAFKA-5045: Clarify on KTable APIs for
queryable stores
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index cc6a126..6ed3e84 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Aggregator;
@@ -47,6 +46,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
private final Serde<K> keySerde;
private final Serde<V> valSerde;
private final boolean repartitionRequired;
+ private boolean isQueryable = true;
KGroupedStreamImpl(final KStreamBuilder topology,
final String name,
@@ -58,12 +58,26 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
this.keySerde = keySerde;
this.valSerde = valSerde;
this.repartitionRequired = repartitionRequired;
+ this.isQueryable = true;
+ }
+
+ private void determineIsQueryable(final String queryableStoreName) {
+ if (queryableStoreName == null) {
+ isQueryable = false;
+ } // no need for else {} since isQueryable is true by default
}
@Override
public KTable<K, V> reduce(final Reducer<V> reducer,
- final String storeName) {
- return reduce(reducer, keyValueStore(keySerde, valSerde, storeName));
+ final String queryableStoreName) {
+ determineIsQueryable(queryableStoreName);
+ return reduce(reducer, keyValueStore(keySerde, valSerde, getOrCreateName(queryableStoreName, REDUCE_NAME)));
+ }
+
+ @Override
+ public KTable<K, V> reduce(final Reducer<V> reducer) {
+ determineIsQueryable(null);
+ return reduce(reducer, (String) null);
}
@Override
@@ -82,8 +96,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
@Override
public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
- final String storeName) {
- return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, storeName));
+ final String queryableStoreName) {
+ determineIsQueryable(queryableStoreName);
+ return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, getOrCreateName(queryableStoreName, REDUCE_NAME)));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+ final Windows<W> windows) {
+ return reduce(reducer, windows, (String) null);
}
@SuppressWarnings("unchecked")
@@ -105,8 +127,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Serde<T> aggValueSerde,
- final String storeName) {
- return aggregate(initializer, aggregator, keyValueStore(keySerde, aggValueSerde, storeName));
+ final String queryableStoreName) {
+ determineIsQueryable(queryableStoreName);
+ return aggregate(initializer, aggregator, keyValueStore(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
+ }
+
+ @Override
+ public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+ final Aggregator<? super K, ? super V, T> aggregator,
+ final Serde<T> aggValueSerde) {
+ return aggregate(initializer, aggregator, aggValueSerde, null);
}
@Override
@@ -128,8 +158,18 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final Aggregator<? super K, ? super V, T> aggregator,
final Windows<W> windows,
final Serde<T> aggValueSerde,
- final String storeName) {
- return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, storeName));
+ final String queryableStoreName) {
+ determineIsQueryable(queryableStoreName);
+ return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+ final Aggregator<? super K, ? super V, T> aggregator,
+ final Windows<W> windows,
+ final Serde<T> aggValueSerde) {
+ return aggregate(initializer, aggregator, windows, aggValueSerde, null);
}
@SuppressWarnings("unchecked")
@@ -150,8 +190,14 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
@Override
- public KTable<K, Long> count(final String storeName) {
- return count(keyValueStore(keySerde, Serdes.Long(), storeName));
+ public KTable<K, Long> count(final String queryableStoreName) {
+ determineIsQueryable(queryableStoreName);
+ return count(keyValueStore(keySerde, Serdes.Long(), getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
+ }
+
+ @Override
+ public KTable<K, Long> count() {
+ return count((String) null);
}
@Override
@@ -171,8 +217,14 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
@Override
public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
- final String storeName) {
- return count(windows, windowedStore(keySerde, Serdes.Long(), windows, storeName));
+ final String queryableStoreName) {
+ determineIsQueryable(queryableStoreName);
+ return count(windows, windowedStore(keySerde, Serdes.Long(), windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
+ }
+
+ @Override
+ public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows) {
+ return count(windows, (String) null);
}
@Override
@@ -201,15 +253,14 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
final Serde<T> aggValueSerde,
- final String storeName) {
- Objects.requireNonNull(storeName, "storeName can't be null");
- Topic.validate(storeName);
+ final String queryableStoreName) {
+ determineIsQueryable(queryableStoreName);
return aggregate(initializer,
aggregator,
sessionMerger,
sessionWindows,
aggValueSerde,
- storeFactory(keySerde, aggValueSerde, storeName)
+ storeFactory(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME))
.sessionWindowed(sessionWindows.maintainMs()).build());
@@ -221,6 +272,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
+ final Serde<T> aggValueSerde) {
+ return aggregate(initializer, aggregator, sessionMerger, sessionWindows, aggValueSerde, (String) null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+ final Aggregator<? super K, ? super V, T> aggregator,
+ final Merger<? super K, T> sessionMerger,
+ final SessionWindows sessionWindows,
final Serde<T> aggValueSerde,
final StateStoreSupplier<SessionStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
@@ -237,14 +298,17 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
@SuppressWarnings("unchecked")
- public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName) {
- Objects.requireNonNull(storeName, "storeName can't be null");
- Topic.validate(storeName);
+ public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName) {
+ determineIsQueryable(queryableStoreName);
return count(sessionWindows,
- storeFactory(keySerde, Serdes.Long(), storeName)
+ storeFactory(keySerde, Serdes.Long(), getOrCreateName(queryableStoreName, AGGREGATE_NAME))
.sessionWindowed(sessionWindows.maintainMs()).build());
}
+ public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows) {
+ return count(sessionWindows, (String) null);
+ }
+
@SuppressWarnings("unchecked")
@Override
public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
@@ -278,15 +342,22 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
- final String storeName) {
+ final String queryableStoreName) {
+ determineIsQueryable(queryableStoreName);
- Objects.requireNonNull(storeName, "storeName can't be null");
- Topic.validate(storeName);
return reduce(reducer, sessionWindows,
- storeFactory(keySerde, valSerde, storeName)
+ storeFactory(keySerde, valSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME))
.sessionWindowed(sessionWindows.maintainMs()).build());
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+ final SessionWindows sessionWindows) {
+
+ return reduce(reducer, sessionWindows, (String) null);
+ }
+
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
@@ -339,16 +410,17 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
aggregateSupplier,
sourceName.equals(this.name) ? sourceNodes
: Collections.singleton(sourceName),
- storeSupplier.name());
+ storeSupplier.name(),
+ isQueryable);
}
/**
* @return the new sourceName if repartitioned. Otherwise the name of this stream
*/
- private String repartitionIfRequired(final String storeName) {
+ private String repartitionIfRequired(final String queryableStoreName) {
if (!repartitionRequired) {
return this.name;
}
- return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, storeName);
+ return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 08a4c5d..7e62727 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -48,41 +47,64 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
protected final Serde<? extends K> keySerde;
protected final Serde<? extends V> valSerde;
+ private boolean isQueryable = true;
- public KGroupedTableImpl(KStreamBuilder topology,
- String name,
- String sourceName,
- Serde<? extends K> keySerde,
- Serde<? extends V> valSerde) {
+ public KGroupedTableImpl(final KStreamBuilder topology,
+ final String name,
+ final String sourceName,
+ final Serde<? extends K> keySerde,
+ final Serde<? extends V> valSerde) {
super(topology, name, Collections.singleton(sourceName));
this.keySerde = keySerde;
this.valSerde = valSerde;
+ this.isQueryable = true;
+ }
+
+ private void determineIsQueryable(final String queryableStoreName) {
+ if (queryableStoreName == null) {
+ isQueryable = false;
+ } // no need for else {} since isQueryable is true by default
+ }
+
+ @Override
+ public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+ final Aggregator<? super K, ? super V, T> adder,
+ final Aggregator<? super K, ? super V, T> subtractor,
+ final Serde<T> aggValueSerde,
+ final String queryableStoreName) {
+ determineIsQueryable(queryableStoreName);
+ return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
+ }
+
+ @Override
+ public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+ final Aggregator<? super K, ? super V, T> adder,
+ final Aggregator<? super K, ? super V, T> subtractor,
+ final Serde<T> aggValueSerde) {
+ return aggregate(initializer, adder, subtractor, aggValueSerde, (String) null);
}
@Override
- public <T> KTable<K, T> aggregate(Initializer<T> initializer,
- Aggregator<? super K, ? super V, T> adder,
- Aggregator<? super K, ? super V, T> subtractor,
- Serde<T> aggValueSerde,
- String storeName) {
- return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, storeName));
+ public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+ final Aggregator<? super K, ? super V, T> adder,
+ final Aggregator<? super K, ? super V, T> subtractor,
+ final String queryableStoreName) {
+ determineIsQueryable(queryableStoreName);
+ return aggregate(initializer, adder, subtractor, null, getOrCreateName(queryableStoreName, AGGREGATE_NAME));
}
@Override
- public <T> KTable<K, T> aggregate(Initializer<T> initializer,
- Aggregator<? super K, ? super V, T> adder,
- Aggregator<? super K, ? super V, T> subtractor,
- String storeName) {
- Objects.requireNonNull(storeName, "storeName can't be null");
- Topic.validate(storeName);
- return aggregate(initializer, adder, subtractor, null, storeName);
+ public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+ final Aggregator<? super K, ? super V, T> adder,
+ final Aggregator<? super K, ? super V, T> subtractor) {
+ return aggregate(initializer, adder, subtractor, (String) null);
}
@Override
- public <T> KTable<K, T> aggregate(Initializer<T> initializer,
- Aggregator<? super K, ? super V, T> adder,
- Aggregator<? super K, ? super V, T> subtractor,
- StateStoreSupplier<KeyValueStore> storeSupplier) {
+ public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+ final Aggregator<? super K, ? super V, T> adder,
+ final Aggregator<? super K, ? super V, T> subtractor,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
@@ -91,9 +113,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return doAggregate(aggregateSupplier, AGGREGATE_NAME, storeSupplier);
}
- private <T> KTable<K, T> doAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier,
- String functionName,
- StateStoreSupplier<KeyValueStore> storeSupplier) {
+ private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
+ final String functionName,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
String sinkName = topology.newName(KStreamImpl.SINK_NAME);
String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
String funcName = topology.newName(functionName);
@@ -120,22 +142,27 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
topology.addStateStore(storeSupplier, funcName);
// return the KTable representation with the intermediate topic as the sources
- return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name());
+ return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
+ }
+
+ @Override
+ public KTable<K, V> reduce(final Reducer<V> adder,
+ final Reducer<V> subtractor,
+ final String queryableStoreName) {
+ determineIsQueryable(queryableStoreName);
+ return reduce(adder, subtractor, keyValueStore(keySerde, valSerde, getOrCreateName(queryableStoreName, REDUCE_NAME)));
}
@Override
- public KTable<K, V> reduce(Reducer<V> adder,
- Reducer<V> subtractor,
- String storeName) {
- Objects.requireNonNull(storeName, "storeName can't be null");
- Topic.validate(storeName);
- return reduce(adder, subtractor, keyValueStore(keySerde, valSerde, storeName));
+ public KTable<K, V> reduce(final Reducer<V> adder,
+ final Reducer<V> subtractor) {
+ return reduce(adder, subtractor, (String) null);
}
@Override
- public KTable<K, V> reduce(Reducer<V> adder,
- Reducer<V> subtractor,
- StateStoreSupplier<KeyValueStore> storeSupplier) {
+ public KTable<K, V> reduce(final Reducer<V> adder,
+ final Reducer<V> subtractor,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -144,14 +171,18 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
}
@Override
- public KTable<K, Long> count(String storeName) {
- Objects.requireNonNull(storeName, "storeName can't be null");
- Topic.validate(storeName);
- return count(keyValueStore(keySerde, Serdes.Long(), storeName));
+ public KTable<K, Long> count(final String queryableStoreName) {
+ determineIsQueryable(queryableStoreName);
+ return count(keyValueStore(keySerde, Serdes.Long(), getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
+ }
+
+ @Override
+ public KTable<K, Long> count() {
+ return count((String) null);
}
@Override
- public KTable<K, Long> count(StateStoreSupplier<KeyValueStore> storeSupplier) {
+ public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
return this.aggregate(
new Initializer<Long>() {
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bbd4ac4..b751294 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -617,7 +617,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final String name = topology.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
topology.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name);
- topology.connectProcessorAndStateStores(name, other.getStoreName());
+ topology.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).internalStoreName());
topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
return new KStreamImpl<>(topology, name, allSourceNodes, false);
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 774f235..af8c906 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -20,19 +20,22 @@ import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
private final KTableImpl<K, ?, V> parent;
private final Predicate<? super K, ? super V> predicate;
private final boolean filterNot;
-
+ private final String queryableName;
private boolean sendOldValues = false;
- public KTableFilter(KTableImpl<K, ?, V> parent, Predicate<? super K, ? super V> predicate, boolean filterNot) {
+ public KTableFilter(final KTableImpl<K, ?, V> parent, final Predicate<? super K, ? super V> predicate,
+ final boolean filterNot, final String queryableName) {
this.parent = parent;
this.predicate = predicate;
this.filterNot = filterNot;
+ this.queryableName = queryableName;
}
@Override
@@ -74,6 +77,18 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
}
private class KTableFilterProcessor extends AbstractProcessor<K, Change<V>> {
+ private KeyValueStore<K, V> store;
+ private TupleForwarder<K, V> tupleForwarder;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+ if (queryableName != null) {
+ store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
+ tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+ }
+ }
@Override
public void process(K key, Change<V> change) {
@@ -83,7 +98,12 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
if (sendOldValues && oldValue == null && newValue == null)
return; // unnecessary to forward here.
- context().forward(key, new Change<>(newValue, oldValue));
+ if (queryableName != null) {
+ store.put(key, newValue);
+ tupleForwarder.maybeForward(key, newValue, oldValue);
+ } else {
+ context().forward(key, new Change<>(newValue, oldValue));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 6120f91..96a0b2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -29,7 +29,9 @@ import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.state.KeyValueStore;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
@@ -67,58 +69,157 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
+ public static final String STATE_STORE_NAME = "STATE-STORE-";
+
private final ProcessorSupplier<?, ?> processorSupplier;
- private final String storeName;
+ private final String queryableStoreName;
+ private final boolean isQueryable;
private boolean sendOldValues = false;
+ private final Serde<K> keySerde;
+ private final Serde<V> valSerde;
+ public KTableImpl(KStreamBuilder topology,
+ String name,
+ ProcessorSupplier<?, ?> processorSupplier,
+ Set<String> sourceNodes,
+ final String queryableStoreName,
+ boolean isQueryable) {
+ super(topology, name, sourceNodes);
+ this.processorSupplier = processorSupplier;
+ this.queryableStoreName = queryableStoreName;
+ this.keySerde = null;
+ this.valSerde = null;
+ this.isQueryable = isQueryable;
+ }
public KTableImpl(KStreamBuilder topology,
String name,
ProcessorSupplier<?, ?> processorSupplier,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde,
Set<String> sourceNodes,
- final String storeName) {
+ final String queryableStoreName,
+ boolean isQueryable) {
super(topology, name, sourceNodes);
this.processorSupplier = processorSupplier;
- this.storeName = storeName;
+ this.queryableStoreName = queryableStoreName;
+ this.keySerde = keySerde;
+ this.valSerde = valSerde;
+ this.isQueryable = isQueryable;
}
@Override
- public String getStoreName() {
- return this.storeName;
+ public String queryableStoreName() {
+ if (!isQueryable) {
+ return null;
+ }
+ return this.queryableStoreName;
}
- @Override
- public KTable<K, V> filter(Predicate<? super K, ? super V> predicate) {
+ String internalStoreName() {
+ return this.queryableStoreName;
+ }
+
+ private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
+ final StateStoreSupplier<KeyValueStore> storeSupplier,
+ boolean isFilterNot) {
Objects.requireNonNull(predicate, "predicate can't be null");
String name = topology.newName(FILTER_NAME);
- KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, false);
+ String internalStoreName = null;
+ if (storeSupplier != null) {
+ internalStoreName = storeSupplier.name();
+ }
+ KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, isFilterNot, internalStoreName);
topology.addProcessor(name, processorSupplier, this.name);
+ if (storeSupplier != null) {
+ topology.addStateStore(storeSupplier, name);
+ }
+ return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null);
+ }
- return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.storeName);
+ @Override
+ public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate) {
+ return filter(predicate, (String) null);
}
@Override
- public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
- Objects.requireNonNull(predicate, "predicate can't be null");
- String name = topology.newName(FILTER_NAME);
- KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, true);
+ public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName) {
+ StateStoreSupplier<KeyValueStore> storeSupplier = null;
+ if (queryableStoreName != null) {
+ storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
+ }
+ return doFilter(predicate, storeSupplier, false);
+ }
- topology.addProcessor(name, processorSupplier, this.name);
+ @Override
+ public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ return doFilter(predicate, storeSupplier, false);
+ }
- return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.storeName);
+ @Override
+ public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
+ return filterNot(predicate, (String) null);
+ }
+
+ @Override
+ public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName) {
+ StateStoreSupplier<KeyValueStore> storeSupplier = null;
+ if (queryableStoreName != null) {
+ storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
+ }
+ return doFilter(predicate, storeSupplier, true);
}
@Override
- public <V1> KTable<K, V1> mapValues(ValueMapper<? super V, ? extends V1> mapper) {
+ public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ return doFilter(predicate, storeSupplier, true);
+ }
+
+ private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper,
+ final Serde<V1> valueSerde,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(mapper);
String name = topology.newName(MAPVALUES_NAME);
- KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper);
-
+ String internalStoreName = null;
+ if (storeSupplier != null) {
+ internalStoreName = storeSupplier.name();
+ }
+ KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper, internalStoreName);
topology.addProcessor(name, processorSupplier, this.name);
+ if (storeSupplier != null) {
+ topology.addStateStore(storeSupplier, name);
+ return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, valueSerde, sourceNodes, internalStoreName, true);
+ } else {
+ return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.queryableStoreName, false);
+ }
+ }
+
+ @Override
+ public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) {
+ return mapValues(mapper, null, (String) null);
+ }
- return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.storeName);
+ @Override
+ public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
+ final Serde<V1> valueSerde,
+ final String queryableStoreName) {
+ StateStoreSupplier<KeyValueStore> storeSupplier = null;
+ if (queryableStoreName != null) {
+ storeSupplier = keyValueStore(this.keySerde, valueSerde, queryableStoreName);
+ }
+ return doMapValues(mapper, valueSerde, storeSupplier);
+ }
+
+ @Override
+ public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
+ final Serde<V1> valueSerde,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ return doMapValues(mapper, valueSerde, storeSupplier);
}
@Override
@@ -193,30 +294,98 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@Override
- public KTable<K, V> through(Serde<K> keySerde,
- Serde<V> valSerde,
- StreamPartitioner<? super K, ? super V> partitioner,
- String topic,
- final String storeName) {
- Objects.requireNonNull(storeName, "storeName can't be null");
+ public KTable<K, V> through(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final StreamPartitioner<? super K, ? super V> partitioner,
+ final String topic,
+ final String queryableStoreName) {
+ final String internalStoreName = queryableStoreName != null ? queryableStoreName : topology.newStoreName(KTableImpl.TOSTREAM_NAME);
+
+ to(keySerde, valSerde, partitioner, topic);
+
+ return topology.table(keySerde, valSerde, topic, internalStoreName);
+ }
+
+ @Override
+ public KTable<K, V> through(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final StreamPartitioner<? super K, ? super V> partitioner,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
to(keySerde, valSerde, partitioner, topic);
- return topology.table(keySerde, valSerde, topic, storeName);
+ return topology.table(keySerde, valSerde, topic, storeSupplier);
+ }
+
+ @Override
+ public KTable<K, V> through(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final StreamPartitioner<? super K, ? super V> partitioner,
+ final String topic) {
+ return through(keySerde, valSerde, partitioner, topic, (String) null);
+ }
+ @Override
+ public KTable<K, V> through(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic,
+ final String queryableStoreName) {
+ return through(keySerde, valSerde, null, topic, queryableStoreName);
+ }
+
+ @Override
+ public KTable<K, V> through(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ return through(keySerde, valSerde, null, topic, storeSupplier);
+ }
+
+ @Override
+ public KTable<K, V> through(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic) {
+ return through(keySerde, valSerde, null, topic, (String) null);
+ }
+
+ @Override
+ public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
+ final String topic,
+ final String queryableStoreName) {
+ return through(null, null, partitioner, topic, queryableStoreName);
+ }
+
+ @Override
+ public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ return through(null, null, partitioner, topic, storeSupplier);
+ }
+
+ @Override
+ public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
+ final String topic) {
+ return through(null, null, partitioner, topic, (String) null);
}
@Override
- public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic, final String storeName) {
- return through(keySerde, valSerde, null, topic, storeName);
+ public KTable<K, V> through(final String topic,
+ final String queryableStoreName) {
+ return through(null, null, null, topic, queryableStoreName);
}
@Override
- public KTable<K, V> through(StreamPartitioner<? super K, ? super V> partitioner, String topic, final String storeName) {
- return through(null, null, partitioner, topic, storeName);
+ public KTable<K, V> through(final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ return through(null, null, null, topic, storeSupplier);
}
@Override
- public KTable<K, V> through(String topic, final String storeName) {
- return through(null, null, null, topic, storeName);
+ public KTable<K, V> through(final String topic) {
+ return through(null, null, null, topic, (String) null);
}
@Override
@@ -259,27 +428,94 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@Override
- public <V1, R> KTable<K, R> join(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
- return doJoin(other, joiner, false, false);
+ public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
+ final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+ return doJoin(other, joiner, false, false, null, (String) null);
}
+ @Override
+ public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
+ final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+ final Serde<R> joinSerde,
+ final String queryableStoreName) {
+ return doJoin(other, joiner, false, false, joinSerde, queryableStoreName);
+ }
+
+ @Override
+ public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
+ final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ return doJoin(other, joiner, false, false, storeSupplier);
+ }
+
+ @Override
+ public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
+ final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+ return doJoin(other, joiner, true, true, null, (String) null);
+ }
+ @Override
+ public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
+ final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+ final Serde<R> joinSerde,
+ final String queryableStoreName) {
+ return doJoin(other, joiner, true, true, joinSerde, queryableStoreName);
+ }
@Override
- public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
- return doJoin(other, joiner, true, true);
+ public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
+ final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ return doJoin(other, joiner, true, true, storeSupplier);
}
@Override
- public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
- return doJoin(other, joiner, true, false);
+ public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
+ final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+ return doJoin(other, joiner, true, false, null, (String) null);
+ }
+
+ @Override
+ public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
+ final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+ final Serde<R> joinSerde,
+ final String queryableStoreName) {
+ return doJoin(other, joiner, true, false, joinSerde, queryableStoreName);
+ }
+
+ @Override
+ public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
+ final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ return doJoin(other, joiner, true, false, storeSupplier);
}
@SuppressWarnings("unchecked")
- private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, final boolean leftOuter, final boolean rightOuter) {
+ private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other,
+ ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+ final boolean leftOuter,
+ final boolean rightOuter,
+ final Serde<R> joinSerde,
+ final String queryableStoreName) {
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
+ final StateStoreSupplier storeSupplier = queryableStoreName == null ? null : keyValueStore(this.keySerde, joinSerde, queryableStoreName);
+
+ return doJoin(other, joiner, leftOuter, rightOuter, storeSupplier);
+ }
+
+ private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other,
+ ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+ final boolean leftOuter,
+ final boolean rightOuter,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(other, "other can't be null");
+ Objects.requireNonNull(joiner, "joiner can't be null");
+ final String internalQueryableName = storeSupplier == null ? null : storeSupplier.name();
final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
if (leftOuter) {
@@ -308,8 +544,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
- new KTableImpl<K, V, R>(topology, joinThisName, joinThis, sourceNodes, storeName),
- new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
+ new KTableImpl<K, V, R>(topology, joinThisName, joinThis, sourceNodes, this.internalStoreName(), false),
+ new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes,
+ ((KTableImpl<K, ?, ?>) other).internalStoreName(), false),
+ internalQueryableName
);
topology.addProcessor(joinThisName, joinThis, this.name);
@@ -318,7 +556,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
- return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
+ if (internalQueryableName != null) {
+ topology.addStateStore(storeSupplier, joinMergeName);
+ }
+
+ return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, internalQueryableName, internalQueryableName != null);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index ee7a064..82d9c26 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -18,15 +18,22 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
private final KTableImpl<K, ?, V> parent1;
private final KTableImpl<K, ?, V> parent2;
+ private final String queryableName;
+ private boolean sendOldValues = false;
- public KTableKTableJoinMerger(KTableImpl<K, ?, V> parent1, KTableImpl<K, ?, V> parent2) {
+ public KTableKTableJoinMerger(final KTableImpl<K, ?, V> parent1,
+ final KTableImpl<K, ?, V> parent2,
+ final String queryableName) {
this.parent1 = parent1;
this.parent2 = parent2;
+ this.queryableName = queryableName;
}
@Override
@@ -43,13 +50,35 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
public void enableSendingOldValues() {
parent1.enableSendingOldValues();
parent2.enableSendingOldValues();
+ sendOldValues = true;
}
- private static final class KTableKTableJoinMergeProcessor<K, V>
+ private class KTableKTableJoinMergeProcessor<K, V>
extends AbstractProcessor<K, Change<V>> {
+ private KeyValueStore<K, V> store;
+ private TupleForwarder<K, V> tupleForwarder;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+ if (queryableName != null) {
+ store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
+ tupleForwarder = new TupleForwarder<>(store, context,
+ new ForwardingCacheFlushListener<K, V>(context, sendOldValues),
+ sendOldValues);
+ }
+ }
+
@Override
public void process(K key, Change<V> value) {
- context().forward(key, value);
+
+ if (queryableName != null) {
+ store.put(key, value.newValue);
+ tupleForwarder.maybeForward(key, value.newValue, value.oldValue);
+ } else {
+ context().forward(key, value);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 90610de..41dd7cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -20,18 +20,21 @@ import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
private final KTableImpl<K, ?, V> parent;
private final ValueMapper<? super V, ? extends V1> mapper;
-
+ private final String queryableName;
private boolean sendOldValues = false;
- public KTableMapValues(KTableImpl<K, ?, V> parent, ValueMapper<? super V, ? extends V1> mapper) {
+ public KTableMapValues(final KTableImpl<K, ?, V> parent, final ValueMapper<? super V, ? extends V1> mapper,
+ final String queryableName) {
this.parent = parent;
this.mapper = mapper;
+ this.queryableName = queryableName;
}
@Override
@@ -73,12 +76,30 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
private class KTableMapValuesProcessor extends AbstractProcessor<K, Change<V>> {
+ private KeyValueStore<K, V1> store;
+ private TupleForwarder<K, V1> tupleForwarder;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+ if (queryableName != null) {
+ store = (KeyValueStore<K, V1>) context.getStateStore(queryableName);
+ tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V1>(context, sendOldValues), sendOldValues);
+ }
+ }
+
@Override
public void process(K key, Change<V> change) {
V1 newValue = computeValue(change.newValue);
V1 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
- context().forward(key, new Change<>(newValue, oldValue));
+ if (queryableName != null) {
+ store.put(key, newValue);
+ tupleForwarder.maybeForward(key, newValue, oldValue);
+ } else {
+ context().forward(key, new Change<>(newValue, oldValue));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index a7bc5bd..81f0ff9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.internals.QuickUnion;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -460,7 +461,7 @@ public class TopologyBuilder {
* receive all records forwarded from the {@link SourceNode}. This
* {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
*
- * @param store the instance of {@link StateStore}
+ * @param storeSupplier user defined state store supplier
* @param sourceName name of the {@link SourceNode} that will be automatically added
* @param keyDeserializer the {@link Deserializer} to deserialize keys with
* @param valueDeserializer the {@link Deserializer} to deserialize values with
@@ -469,14 +470,14 @@ public class TopologyBuilder {
* @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized TopologyBuilder addGlobalStore(final StateStore store,
+ public synchronized TopologyBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
final String sourceName,
final Deserializer keyDeserializer,
final Deserializer valueDeserializer,
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
- Objects.requireNonNull(store, "store must not be null");
+ Objects.requireNonNull(storeSupplier, "store supplier must not be null");
Objects.requireNonNull(sourceName, "sourceName must not be null");
Objects.requireNonNull(topic, "topic must not be null");
Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
@@ -487,8 +488,11 @@ public class TopologyBuilder {
if (nodeFactories.containsKey(processorName)) {
throw new TopologyBuilderException("Processor " + processorName + " is already added.");
}
- if (stateFactories.containsKey(store.name()) || globalStateStores.containsKey(store.name())) {
- throw new TopologyBuilderException("StateStore " + store.name() + " is already added.");
+ if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
+ throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " is already added.");
+ }
+ if (storeSupplier.loggingEnabled()) {
+ throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
}
if (sourceName.equals(processorName)) {
throw new TopologyBuilderException("sourceName and processorName must be different.");
@@ -504,13 +508,13 @@ public class TopologyBuilder {
final String[] parents = {sourceName};
final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, parents, stateUpdateSupplier);
- nodeFactory.addStateStore(store.name());
+ nodeFactory.addStateStore(storeSupplier.name());
nodeFactories.put(processorName, nodeFactory);
nodeGrouper.add(processorName);
nodeGrouper.unite(processorName, parents);
- globalStateStores.put(store.name(), store);
- connectSourceStoreAndTopic(store.name(), topic);
+ globalStateStores.put(storeSupplier.name(), storeSupplier.get());
+ connectSourceStoreAndTopic(storeSupplier.name(), topic);
return this;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index e81c7d3..0722210 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -355,13 +355,7 @@ public class KStreamAggregationIntegrationTest {
)));
}
- @Test
- public void shouldCount() throws Exception {
- produceMessages(mockTime.milliseconds());
-
- groupedStream.count("count-by-key")
- .to(Serdes.String(), Serdes.Long(), outputTopic);
-
+ private void shouldCountHelper() throws Exception {
startStreams();
produceMessages(mockTime.milliseconds());
@@ -392,6 +386,26 @@ public class KStreamAggregationIntegrationTest {
}
@Test
+ public void shouldCount() throws Exception {
+ produceMessages(mockTime.milliseconds());
+
+ groupedStream.count("count-by-key")
+ .to(Serdes.String(), Serdes.Long(), outputTopic);
+
+ shouldCountHelper();
+ }
+
+ @Test
+ public void shouldCountWithInternalStore() throws Exception {
+ produceMessages(mockTime.milliseconds());
+
+ groupedStream.count()
+ .to(Serdes.String(), Serdes.Long(), outputTopic);
+
+ shouldCountHelper();
+ }
+
+ @Test
public void shouldGroupByKey() throws Exception {
final long timestamp = mockTime.milliseconds();
produceMessages(timestamp);
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 26deb92..5fa7e49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -30,6 +30,9 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -41,11 +44,15 @@ import org.junit.experimental.categories.Category;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
public class KTableKTableJoinIntegrationTest {
@@ -134,12 +141,22 @@ public class KTableKTableJoinIntegrationTest {
@Test
public void shouldInnerInnerJoin() throws Exception {
- verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")));
+ verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false);
+ }
+
+ @Test
+ public void shouldInnerInnerJoinQueryable() throws Exception {
+ verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true);
}
@Test
public void shouldInnerLeftJoin() throws Exception {
- verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")));
+ verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false);
+ }
+
+ @Test
+ public void shouldInnerLeftJoinQueryable() throws Exception {
+ verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true);
}
@Test
@@ -148,7 +165,16 @@ public class KTableKTableJoinIntegrationTest {
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("c", "null-C3"),
- new KeyValue<>("b", "B1-B2-B3")));
+ new KeyValue<>("b", "B1-B2-B3")), false);
+ }
+
+ @Test
+ public void shouldInnerOuterJoinQueryable() throws Exception {
+ verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList(
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("c", "null-C3"),
+ new KeyValue<>("b", "B1-B2-B3")), true);
}
@Test
@@ -156,7 +182,15 @@ public class KTableKTableJoinIntegrationTest {
verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
- new KeyValue<>("b", "B1-B2-B3")));
+ new KeyValue<>("b", "B1-B2-B3")), false);
+ }
+
+ @Test
+ public void shouldLeftInnerJoinQueryable() throws Exception {
+ verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3")), true);
}
@Test
@@ -164,7 +198,15 @@ public class KTableKTableJoinIntegrationTest {
verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
- new KeyValue<>("b", "B1-B2-B3")));
+ new KeyValue<>("b", "B1-B2-B3")), false);
+ }
+
+ @Test
+ public void shouldLeftLeftJoinQueryable() throws Exception {
+ verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3")), true);
}
@Test
@@ -175,7 +217,18 @@ public class KTableKTableJoinIntegrationTest {
new KeyValue<>("c", "null-C3"),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
- new KeyValue<>("b", "B1-B2-B3")));
+ new KeyValue<>("b", "B1-B2-B3")), false);
+ }
+
+ @Test
+ public void shouldLeftOuterJoinQueryable() throws Exception {
+ verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList(
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("c", "null-C3"),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3")), true);
}
@Test
@@ -184,7 +237,16 @@ public class KTableKTableJoinIntegrationTest {
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
- new KeyValue<>("c", "null-C2-C3")));
+ new KeyValue<>("c", "null-C2-C3")), false);
+ }
+
+ @Test
+ public void shouldOuterInnerJoinQueryable() throws Exception {
+ verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList(
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C2-C3")), true);
}
@Test
@@ -193,7 +255,16 @@ public class KTableKTableJoinIntegrationTest {
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
- new KeyValue<>("c", "null-C2-C3")));
+ new KeyValue<>("c", "null-C2-C3")), false);
+ }
+
+ @Test
+ public void shouldOuterLeftJoinQueryable() throws Exception {
+ verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT, Arrays.asList(
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C2-C3")), true);
}
@Test
@@ -205,16 +276,30 @@ public class KTableKTableJoinIntegrationTest {
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
- new KeyValue<>("c", "null-C2-C3")));
+ new KeyValue<>("c", "null-C2-C3")), false);
+ }
+
+ @Test
+ public void shouldOuterOuterJoinQueryable() throws Exception {
+ verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList(
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("c", "null-C3"),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C2-C3")), true);
}
private void verifyKTableKTableJoin(final JoinType joinType1,
final JoinType joinType2,
- final List<KeyValue<String, String>> expectedResult) throws Exception {
- streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join");
+ final List<KeyValue<String, String>> expectedResult,
+ boolean verifyQueryableState) throws Exception {
+ final String queryableName = verifyQueryableState ? joinType1 + "-" + joinType2 + "-ktable-ktable-join-query" : null;
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join" + queryableName);
- streams = prepareTopology(joinType1, joinType2);
+ streams = prepareTopology(joinType1, joinType2, queryableName);
streams.start();
final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
@@ -223,20 +308,54 @@ public class KTableKTableJoinIntegrationTest {
expectedResult.size());
assertThat(result, equalTo(expectedResult));
+
+ if (verifyQueryableState) {
+ verifyKTableKTableJoinQueryableState(joinType1, joinType2, expectedResult);
+ }
}
- private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2) {
+
+ private void verifyKTableKTableJoinQueryableState(final JoinType joinType1,
+ final JoinType joinType2,
+ final List<KeyValue<String, String>> expectedResult) {
+ final String queryableName = joinType1 + "-" + joinType2 + "-ktable-ktable-join-query";
+ final ReadOnlyKeyValueStore<String, String> myJoinStore = streams.store(queryableName,
+ QueryableStoreTypes.<String, String>keyValueStore());
+
+ // store only keeps last set of values, not entire stream of value changes
+ final Map<String, String> expectedInStore = new HashMap<>();
+ for (KeyValue<String, String> expected : expectedResult) {
+ expectedInStore.put(expected.key, expected.value);
+ }
+
+ for (Map.Entry<String, String> expected : expectedInStore.entrySet()) {
+ assertEquals(expected.getValue(), myJoinStore.get(expected.getKey()));
+ }
+ final KeyValueIterator<String, String> all = myJoinStore.all();
+ while (all.hasNext()) {
+ KeyValue<String, String> storeEntry = all.next();
+ assertTrue(expectedResult.contains(storeEntry));
+ }
+ all.close();
+
+ }
+
+ private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2, final String queryableName) {
final KStreamBuilder builder = new KStreamBuilder();
final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2);
final KTable<String, String> table3 = builder.table(TABLE_3, TABLE_3);
- join(join(table1, table2, joinType1), table3, joinType2).to(OUTPUT);
+ join(join(table1, table2, joinType1, null /* no need to query intermediate result */), table3,
+ joinType2, queryableName).to(OUTPUT);
return new KafkaStreams(builder, new StreamsConfig(streamsConfig));
}
- private KTable<String, String> join(KTable<String, String> first, KTable<String, String> second, JoinType joinType) {
+ private KTable<String, String> join(final KTable<String, String> first,
+ final KTable<String, String> second,
+ final JoinType joinType,
+ final String queryableName) {
final ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
@Override
public String apply(final String value1, final String value2) {
@@ -246,11 +365,11 @@ public class KTableKTableJoinIntegrationTest {
switch (joinType) {
case INNER:
- return first.join(second, joiner);
+ return first.join(second, joiner, Serdes.String(), queryableName);
case LEFT:
- return first.leftJoin(second, joiner);
+ return first.leftJoin(second, joiner, Serdes.String(), queryableName);
case OUTER:
- return first.outerJoin(second, joiner);
+ return first.outerJoin(second, joiner, Serdes.String(), queryableName);
}
throw new RuntimeException("Unknown join type.");
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 314079f..b435ceb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -36,6 +37,8 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -71,6 +74,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -425,6 +429,180 @@ public class QueryableStateIntegrationTest {
verifyCanQueryState(10 * 1024 * 1024);
}
+ @Test
+ public void shouldBeAbleToQueryFilterState() throws Exception {
+ streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+ final KStreamBuilder builder = new KStreamBuilder();
+ final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
+ final Set<KeyValue<String, Long>> batch1 = new HashSet<>();
+ batch1.addAll(Arrays.asList(
+ new KeyValue<>(keys[0], 1L),
+ new KeyValue<>(keys[1], 1L),
+ new KeyValue<>(keys[2], 3L),
+ new KeyValue<>(keys[3], 5L),
+ new KeyValue<>(keys[4], 2L)));
+ final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
+ expectedBatch1.addAll(Arrays.asList(
+ new KeyValue<>(keys[4], 2L)));
+
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ streamOne,
+ batch1,
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ LongSerializer.class,
+ new Properties()),
+ mockTime);
+ final Predicate<String, Long> filterPredicate = new Predicate<String, Long>() {
+ @Override
+ public boolean test(String key, Long value) {
+ return key.contains("kafka");
+ }
+ };
+ final KTable<String, Long> t1 = builder.table(streamOne);
+ final KTable<String, Long> t2 = t1.filter(filterPredicate, "queryFilter");
+ t1.filterNot(filterPredicate, "queryFilterNot");
+ t2.to(outputTopic);
+
+ kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams.start();
+
+ waitUntilAtLeastNumRecordProcessed(outputTopic, 2);
+
+ final ReadOnlyKeyValueStore<String, Long>
+ myFilterStore = kafkaStreams.store("queryFilter", QueryableStoreTypes.<String, Long>keyValueStore());
+ final ReadOnlyKeyValueStore<String, Long>
+ myFilterNotStore = kafkaStreams.store("queryFilterNot", QueryableStoreTypes.<String, Long>keyValueStore());
+
+ for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
+ assertEquals(myFilterStore.get(expectedEntry.key), expectedEntry.value);
+ }
+ for (final KeyValue<String, Long> batchEntry : batch1) {
+ if (!expectedBatch1.contains(batchEntry)) {
+ assertNull(myFilterStore.get(batchEntry.key));
+ }
+ }
+
+ for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
+ assertNull(myFilterNotStore.get(expectedEntry.key));
+ }
+ for (final KeyValue<String, Long> batchEntry : batch1) {
+ if (!expectedBatch1.contains(batchEntry)) {
+ assertEquals(myFilterNotStore.get(batchEntry.key), batchEntry.value);
+ }
+ }
+ }
+
+ @Test
+ public void shouldBeAbleToQueryMapValuesState() throws Exception {
+ streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ final KStreamBuilder builder = new KStreamBuilder();
+ final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
+ final Set<KeyValue<String, String>> batch1 = new HashSet<>();
+ batch1.addAll(Arrays.asList(
+ new KeyValue<>(keys[0], "1"),
+ new KeyValue<>(keys[1], "1"),
+ new KeyValue<>(keys[2], "3"),
+ new KeyValue<>(keys[3], "5"),
+ new KeyValue<>(keys[4], "2")));
+
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ streamOne,
+ batch1,
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ mockTime);
+
+ final KTable<String, String> t1 = builder.table(streamOne);
+ final KTable<String, Long> t2 = t1.mapValues(new ValueMapper<String, Long>() {
+ @Override
+ public Long apply(String value) {
+ return Long.valueOf(value);
+ }
+ }, Serdes.Long(), "queryMapValues");
+ t2.to(Serdes.String(), Serdes.Long(), outputTopic);
+
+ kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams.start();
+
+ waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
+
+ final ReadOnlyKeyValueStore<String, Long>
+ myMapStore = kafkaStreams.store("queryMapValues",
+ QueryableStoreTypes.<String, Long>keyValueStore());
+ for (final KeyValue<String, String> batchEntry : batch1) {
+ assertEquals(myMapStore.get(batchEntry.key), Long.valueOf(batchEntry.value));
+ }
+ }
+
+ @Test
+ public void shouldBeAbleToQueryMapValuesAfterFilterState() throws Exception {
+ streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ final KStreamBuilder builder = new KStreamBuilder();
+ final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
+ final Set<KeyValue<String, String>> batch1 = new HashSet<>();
+ batch1.addAll(Arrays.asList(
+ new KeyValue<>(keys[0], "1"),
+ new KeyValue<>(keys[1], "1"),
+ new KeyValue<>(keys[2], "3"),
+ new KeyValue<>(keys[3], "5"),
+ new KeyValue<>(keys[4], "2")));
+ final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
+ expectedBatch1.addAll(Arrays.asList(
+ new KeyValue<>(keys[4], 2L)));
+
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ streamOne,
+ batch1,
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ mockTime);
+
+ final Predicate<String, String> filterPredicate = new Predicate<String, String>() {
+ @Override
+ public boolean test(String key, String value) {
+ return key.contains("kafka");
+ }
+ };
+ final KTable<String, String> t1 = builder.table(streamOne);
+ final KTable<String, String> t2 = t1.filter(filterPredicate, "queryFilter");
+ final KTable<String, Long> t3 = t2.mapValues(new ValueMapper<String, Long>() {
+ @Override
+ public Long apply(String value) {
+ return Long.valueOf(value);
+ }
+ }, Serdes.Long(), "queryMapValues");
+ t3.to(Serdes.String(), Serdes.Long(), outputTopic);
+
+ kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams.start();
+
+ waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
+
+ final ReadOnlyKeyValueStore<String, Long>
+ myMapStore = kafkaStreams.store("queryMapValues",
+ QueryableStoreTypes.<String, Long>keyValueStore());
+ for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
+ assertEquals(myMapStore.get(expectedEntry.key), expectedEntry.value);
+ }
+ for (final KeyValue<String, String> batchEntry : batch1) {
+ final KeyValue<String, Long> batchEntryMapValue = new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value));
+ if (!expectedBatch1.contains(batchEntryMapValue)) {
+ assertNull(myMapStore.get(batchEntry.key));
+ }
+ }
+ }
+
private void verifyCanQueryState(int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
final KStreamBuilder builder = new KStreamBuilder();