You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/12 09:51:20 UTC
[kafka] branch trunk updated: KAFKA-6521: Use timestamped stores
for KTables (#6667)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8649717 KAFKA-6521: Use timestamped stores for KTables (#6667)
8649717 is described below
commit 8649717d6dde081c75fc441a56f63ee1556dc758
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Sun May 12 11:50:55 2019 +0200
KAFKA-6521: Use timestamped stores for KTables (#6667)
Reviewers: John Roesler <jo...@confluent.io>, Boyang Chen <bo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../kstream/internals/KGroupedStreamImpl.java | 2 +-
.../kstream/internals/KGroupedTableImpl.java | 2 +-
.../kstream/internals/KStreamAggregate.java | 38 +++--
.../streams/kstream/internals/KStreamReduce.java | 37 ++--
.../kstream/internals/KStreamWindowAggregate.java | 40 ++---
.../streams/kstream/internals/KTableAggregate.java | 22 ++-
.../streams/kstream/internals/KTableFilter.java | 22 ++-
.../streams/kstream/internals/KTableImpl.java | 17 +-
.../kstream/internals/KTableKTableJoinMerger.java | 17 +-
.../streams/kstream/internals/KTableMapValues.java | 22 ++-
.../KTableMaterializedValueGetterSupplier.java | 14 +-
.../streams/kstream/internals/KTableReduce.java | 22 ++-
.../streams/kstream/internals/KTableSource.java | 22 ++-
.../internals/KTableSourceValueGetterSupplier.java | 16 +-
.../kstream/internals/KTableTransformValues.java | 29 ++--
.../internals/SessionWindowedKStreamImpl.java | 3 +-
.../kstream/internals/TimeWindowedKStreamImpl.java | 16 +-
.../internals/TimestampedCacheFlushListener.java | 53 ++++++
...a => TimestampedKeyValueStoreMaterializer.java} | 11 +-
...rwarder.java => TimestampedTupleForwarder.java} | 25 +--
.../streams/kstream/internals/TupleForwarder.java | 15 +-
.../internals/graph/KTableKTableJoinNode.java | 25 +--
.../internals/graph/TableProcessorNode.java | 8 +-
.../kstream/internals/graph/TableSourceNode.java | 9 +-
.../org/apache/kafka/streams/processor/To.java | 23 +++
.../internals/GlobalProcessorContextImpl.java | 10 +-
.../processor/internals/ProcessorContextImpl.java | 6 +-
.../kafka/streams/state/ValueAndTimestamp.java | 14 +-
.../state/internals/CachingKeyValueStore.java | 6 +-
.../internals/MeteredTimestampedKeyValueStore.java | 2 +-
.../internals/TimestampedKeyValueStoreBuilder.java | 100 ++++++++++-
.../internals/TimestampedWindowStoreBuilder.java | 107 +++++++++++-
.../kstream/internals/KTableReduceTest.java | 14 +-
.../internals/KTableTransformValuesTest.java | 6 +-
.../TimestampedCacheFlushListenerTest.java | 75 ++++++++
...est.java => TimestampedTupleForwarderTest.java} | 31 ++--
.../kstream/internals/TupleForwarderTest.java | 19 ++-
.../internals/GlobalStreamThreadTest.java | 4 +-
.../processor/internals/StandbyTaskTest.java | 30 ++--
... TimestampedKeyValueStoreMaterializerTest.java} | 50 +++---
.../org/apache/kafka/streams/state/StoresTest.java | 32 ++++
.../TimestampedKeyValueStoreBuilderTest.java | 8 +-
.../TimestampedWindowStoreBuilderTest.java | 7 +-
.../GenericInMemoryTimestampedKeyValueStore.java | 190 +++++++++++++++++++++
44 files changed, 946 insertions(+), 275 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index eab1e8f..6f6521c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -185,7 +185,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
return aggregateBuilder.build(
functionName,
- new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
+ new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize(),
aggregateSupplier,
materializedInternal.queryableStoreName(),
materializedInternal.keySerde(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 56be0f6..75c998a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -88,7 +88,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>(
funcName,
new ProcessorParameters<>(aggregateSupplier, funcName),
- new KeyValueStoreMaterializer<>(materialized).materialize()
+ new TimestampedKeyValueStoreMaterializer<>(materialized).materialize()
);
// now the repartition node must be the parent of the StateProcessorNode
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 4ead76b..7d367d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -22,17 +22,19 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamAggregate.class);
private final String storeName;
private final Initializer<T> initializer;
private final Aggregator<? super K, ? super V, T> aggregator;
-
private boolean sendOldValues = false;
KStreamAggregate(final String storeName, final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator) {
@@ -51,22 +53,25 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
sendOldValues = true;
}
- private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
- private KeyValueStore<K, T> store;
+ private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
+ private TimestampedKeyValueStore<K, T> store;
private StreamsMetricsImpl metrics;
- private TupleForwarder<K, T> tupleForwarder;
+ private TimestampedTupleForwarder<K, T> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
- store = (KeyValueStore<K, T>) context.getStateStore(storeName);
- tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
+ store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ store,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
}
-
@Override
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
@@ -79,7 +84,8 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
return;
}
- T oldAgg = store.get(key);
+ final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
+ T oldAgg = getValueOrNull(oldAggAndTimestamp);
if (oldAgg == null) {
oldAgg = initializer.apply();
@@ -91,14 +97,13 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
newAgg = aggregator.apply(key, value, newAgg);
// update the store with the new value
- store.put(key, newAgg);
+ store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
}
}
@Override
public KTableValueGetterSupplier<K, T> view() {
-
return new KTableValueGetterSupplier<K, T>() {
public KTableValueGetter<K, T> get() {
@@ -112,23 +117,22 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
};
}
- private class KStreamAggregateValueGetter implements KTableValueGetter<K, T> {
- private KeyValueStore<K, T> store;
+ private class KStreamAggregateValueGetter implements KTableValueGetter<K, T> {
+ private TimestampedKeyValueStore<K, T> store;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
- store = (KeyValueStore<K, T>) context.getStateStore(storeName);
+ store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
}
@Override
public T get(final K key) {
- return store.get(key);
+ return getValueOrNull(store.get(key));
}
@Override
- public void close() {
- }
+ public void close() {}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index cd67283..a1b9f73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -21,10 +21,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamReduce.class);
@@ -48,10 +51,10 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
sendOldValues = true;
}
- private class KStreamReduceProcessor extends AbstractProcessor<K, V> {
- private KeyValueStore<K, V> store;
- private TupleForwarder<K, V> tupleForwarder;
+ private class KStreamReduceProcessor extends AbstractProcessor<K, V> {
+ private TimestampedKeyValueStore<K, V> store;
+ private TimestampedTupleForwarder<K, V> tupleForwarder;
private StreamsMetricsImpl metrics;
@SuppressWarnings("unchecked")
@@ -59,11 +62,14 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
- store = (KeyValueStore<K, V>) context.getStateStore(storeName);
- tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
+ store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ store,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
}
-
@Override
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
@@ -76,7 +82,8 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
return;
}
- final V oldAgg = store.get(key);
+ final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key);
+ final V oldAgg = getValueOrNull(oldAggAndTimestamp);
V newAgg = oldAgg;
// try to add the new value
@@ -87,14 +94,13 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
}
// update the store with the new value
- store.put(key, newAgg);
+ store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
}
}
@Override
public KTableValueGetterSupplier<K, V> view() {
-
return new KTableValueGetterSupplier<K, V>() {
public KTableValueGetter<K, V> get() {
@@ -108,24 +114,23 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
};
}
- private class KStreamReduceValueGetter implements KTableValueGetter<K, V> {
- private KeyValueStore<K, V> store;
+ private class KStreamReduceValueGetter implements KTableValueGetter<K, V> {
+ private TimestampedKeyValueStore<K, V> store;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
- store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+ store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
}
@Override
public V get(final K key) {
- return store.get(key);
+ return getValueOrNull(store.get(key));
}
@Override
- public void close() {
- }
+ public void close() {}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index d09dadd..95ae54f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -29,12 +29,15 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -69,10 +72,10 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
sendOldValues = true;
}
- private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {
- private WindowStore<K, Agg> windowStore;
- private TupleForwarder<Windowed<K>, Agg> tupleForwarder;
+ private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {
+ private TimestampedWindowStore<K, Agg> windowStore;
+ private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
private StreamsMetricsImpl metrics;
private InternalProcessorContext internalProcessorContext;
private Sensor lateRecordDropSensor;
@@ -83,13 +86,14 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
public void init(final ProcessorContext context) {
super.init(context);
internalProcessorContext = (InternalProcessorContext) context;
-
metrics = (StreamsMetricsImpl) context.metrics();
-
lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
-
- windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName);
- tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
+ windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ windowStore,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
}
@Override
@@ -115,7 +119,8 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
final Long windowStart = entry.getKey();
final long windowEnd = entry.getValue().end();
if (windowEnd > closeTime) {
- Agg oldAgg = windowStore.fetch(key, windowStart);
+ final ValueAndTimestamp<Agg> oldAggAndTimestamp = windowStore.fetch(key, windowStart);
+ Agg oldAgg = getValueOrNull(oldAggAndTimestamp);
if (oldAgg == null) {
oldAgg = initializer.apply();
@@ -124,7 +129,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
final Agg newAgg = aggregator.apply(key, value, oldAgg);
// update the store with the new value
- windowStore.put(key, newAgg, windowStart);
+ windowStore.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()), windowStart);
tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, sendOldValues ? oldAgg : null);
} else {
log.debug(
@@ -154,7 +159,6 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
@Override
public KTableValueGetterSupplier<Windowed<K>, Agg> view() {
-
return new KTableValueGetterSupplier<Windowed<K>, Agg>() {
public KTableValueGetter<Windowed<K>, Agg> get() {
@@ -168,14 +172,14 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
};
}
- private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, Agg> {
- private WindowStore<K, Agg> windowStore;
+ private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, Agg> {
+ private TimestampedWindowStore<K, Agg> windowStore;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
- windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName);
+ windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
}
@SuppressWarnings("unchecked")
@@ -183,12 +187,10 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
public Agg get(final Windowed<K> windowedKey) {
final K key = windowedKey.key();
final W window = (W) windowedKey.window();
-
- return windowStore.fetch(key, window.start());
+ return getValueOrNull(windowStore.fetch(key, window.start()));
}
@Override
- public void close() {
- }
+ public void close() {}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 1c44486..88bf867 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -22,7 +22,10 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {
@@ -54,15 +57,19 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
}
private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
- private KeyValueStore<K, T> store;
- private TupleForwarder<K, T> tupleForwarder;
+ private TimestampedKeyValueStore<K, T> store;
+ private TimestampedTupleForwarder<K, T> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
- store = (KeyValueStore<K, T>) context.getStateStore(storeName);
- tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
+ store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ store,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
}
/**
@@ -75,7 +82,8 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
throw new StreamsException("Record key for KTable aggregate operator with state " + storeName + " should not be null.");
}
- final T oldAgg = store.get(key);
+ final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
+ final T oldAgg = getValueOrNull(oldAggAndTimestamp);
final T intermediateAgg;
// first try to remove the old value
@@ -101,7 +109,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
}
// update the store with the new value
- store.put(key, newAgg);
+ store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 2410074..afa1822 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -20,10 +20,10 @@ import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
-
private final KTableImpl<K, ?, V> parent;
private final Predicate<? super K, ? super V> predicate;
private final boolean filterNot;
@@ -61,18 +61,22 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
return newValue;
}
+
private class KTableFilterProcessor extends AbstractProcessor<K, Change<V>> {
- private KeyValueStore<K, V> store;
- private TupleForwarder<K, V> tupleForwarder;
+ private TimestampedKeyValueStore<K, V> store;
+ private TimestampedTupleForwarder<K, V> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
if (queryableName != null) {
- store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
- tupleForwarder = new TupleForwarder<>(store, context,
- new ForwardingCacheFlushListener<>(context), sendOldValues);
+ store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ store,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
}
}
@@ -86,13 +90,12 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
}
if (queryableName != null) {
- store.put(key, newValue);
+ store.put(key, ValueAndTimestamp.make(newValue, context().timestamp()));
tupleForwarder.maybeForward(key, newValue, oldValue);
} else {
context().forward(key, new Change<>(newValue, oldValue));
}
}
-
}
@Override
@@ -117,6 +120,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
}
}
+
private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
private final KTableValueGetter<K, V> parentGetter;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index c86f6ce..3a4994f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,7 +118,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final Serde<K> keySerde;
final Serde<V> valueSerde;
final String queryableStoreName;
- final StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+ final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder;
if (materializedInternal != null) {
// we actually do not need to generate store names at all since if it is not specified, we will not
@@ -132,7 +133,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
valueSerde = materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : this.valSerde;
queryableStoreName = materializedInternal.queryableStoreName();
// only materialize if materialized is specified and it has queryable name
- storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
+ storeBuilder = queryableStoreName != null ? (new TimestampedKeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
} else {
keySerde = this.keySerde;
valueSerde = this.valSerde;
@@ -204,7 +205,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final Serde<K> keySerde;
final Serde<VR> valueSerde;
final String queryableStoreName;
- final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
+ final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
if (materializedInternal != null) {
// we actually do not need to generate store names at all since if it is not specified, we will not
@@ -216,7 +217,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
valueSerde = materializedInternal.valueSerde();
queryableStoreName = materializedInternal.queryableStoreName();
// only materialize if materialized is specified and it has queryable name
- storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
+ storeBuilder = queryableStoreName != null ? (new TimestampedKeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
} else {
keySerde = this.keySerde;
valueSerde = null;
@@ -312,7 +313,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final Serde<K> keySerde;
final Serde<VR> valueSerde;
final String queryableStoreName;
- final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
+ final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
if (materializedInternal != null) {
// don't inherit parent value serde, since this operation may change the value type, more specifically:
@@ -322,7 +323,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
valueSerde = materializedInternal.valueSerde();
queryableStoreName = materializedInternal.queryableStoreName();
// only materialize if materialized is specified and it has queryable name
- storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
+ storeBuilder = queryableStoreName != null ? (new TimestampedKeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
} else {
keySerde = this.keySerde;
valueSerde = null;
@@ -538,13 +539,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final Serde<K> keySerde;
final Serde<VR> valueSerde;
final String queryableStoreName;
- final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
+ final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
if (materializedInternal != null) {
keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
valueSerde = materializedInternal.valueSerde();
queryableStoreName = materializedInternal.storeName();
- storeBuilder = new KeyValueStoreMaterializer<>(materializedInternal).materialize();
+ storeBuilder = new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize();
} else {
keySerde = this.keySerde;
valueSerde = null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index de38042..86088c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -19,7 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.Collections;
import java.util.HashSet;
@@ -94,17 +95,19 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
}
private class KTableKTableJoinMergeProcessor extends AbstractProcessor<K, Change<V>> {
- private KeyValueStore<K, V> store;
- private TupleForwarder<K, V> tupleForwarder;
+ private TimestampedKeyValueStore<K, V> store;
+ private TimestampedTupleForwarder<K, V> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
if (queryableName != null) {
- store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
- tupleForwarder = new TupleForwarder<>(store, context,
- new ForwardingCacheFlushListener<K, V>(context),
+ store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ store,
+ context,
+ new TimestampedCacheFlushListener<>(context),
sendOldValues);
}
}
@@ -112,7 +115,7 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
@Override
public void process(final K key, final Change<V> value) {
if (queryableName != null) {
- store.put(key, value.newValue);
+ store.put(key, ValueAndTimestamp.make(value.newValue, context().timestamp()));
tupleForwarder.maybeForward(key, value.newValue, sendOldValues ? value.oldValue : null);
} else {
if (sendOldValues) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index aae1437..496127c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -20,11 +20,11 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
-
private final KTableImpl<K, ?, V> parent;
private final ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper;
private final String queryableName;
@@ -81,17 +81,22 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
return newValue;
}
+
private class KTableMapValuesProcessor extends AbstractProcessor<K, Change<V>> {
- private KeyValueStore<K, V1> store;
- private TupleForwarder<K, V1> tupleForwarder;
+ private TimestampedKeyValueStore<K, V1> store;
+ private TimestampedTupleForwarder<K, V1> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
if (queryableName != null) {
- store = (KeyValueStore<K, V1>) context.getStateStore(queryableName);
- tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V1>(context), sendOldValues);
+ store = (TimestampedKeyValueStore<K, V1>) context.getStateStore(queryableName);
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ store,
+ context,
+ new TimestampedCacheFlushListener<K, V1>(context),
+ sendOldValues);
}
}
@@ -101,7 +106,7 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
final V1 oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;
if (queryableName != null) {
- store.put(key, newValue);
+ store.put(key, ValueAndTimestamp.make(newValue, context().timestamp()));
tupleForwarder.maybeForward(key, newValue, oldValue);
} else {
context().forward(key, new Change<>(newValue, oldValue));
@@ -109,8 +114,8 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
}
}
- private class KTableMapValuesValueGetter implements KTableValueGetter<K, V1> {
+ private class KTableMapValuesValueGetter implements KTableValueGetter<K, V1> {
private final KTableValueGetter<K, V> parentGetter;
KTableMapValuesValueGetter(final KTableValueGetter<K, V> parentGetter) {
@@ -127,7 +132,6 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
return computeValue(key, parentGetter.get(key));
}
-
@Override
public void close() {
parentGetter.close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
index 0c17d59..a84251c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
@@ -18,10 +18,11 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
private final String storeName;
KTableMaterializedValueGetterSupplier(final String storeName) {
@@ -38,21 +39,20 @@ public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueG
}
private class KTableMaterializedValueGetter implements KTableValueGetter<K, V> {
- private KeyValueStore<K, V> store;
+ private TimestampedKeyValueStore<K, V> store;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
- store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+ store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
}
@Override
public V get(final K key) {
- return store.get(key);
+ return getValueOrNull(store.get(key));
}
@Override
- public void close() {
- }
+ public void close() {}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 70db6443..3055f51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -21,7 +21,10 @@ import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
@@ -49,15 +52,19 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
private class KTableReduceProcessor extends AbstractProcessor<K, Change<V>> {
- private KeyValueStore<K, V> store;
- private TupleForwarder<K, V> tupleForwarder;
+ private TimestampedKeyValueStore<K, V> store;
+ private TimestampedTupleForwarder<K, V> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
- store = (KeyValueStore<K, V>) context.getStateStore(storeName);
- tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
+ store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ store,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
}
/**
@@ -70,7 +77,8 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
throw new StreamsException("Record key for KTable reduce operator with state " + storeName + " should not be null.");
}
- final V oldAgg = store.get(key);
+ final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key);
+ final V oldAgg = getValueOrNull(oldAggAndTimestamp);
final V intermediateAgg;
// first try to remove the old value
@@ -93,7 +101,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
}
// update the store with the new value
- store.put(key, newAgg);
+ store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 6fc57bc..8b6ec6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -21,12 +21,15 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);
@@ -66,8 +69,8 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
private class KTableSourceProcessor extends AbstractProcessor<K, V> {
- private KeyValueStore<K, V> store;
- private TupleForwarder<K, V> tupleForwarder;
+ private TimestampedKeyValueStore<K, V> store;
+ private TimestampedTupleForwarder<K, V> tupleForwarder;
private StreamsMetricsImpl metrics;
@SuppressWarnings("unchecked")
@@ -76,8 +79,12 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
if (queryableName != null) {
- store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
- tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
+ store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ store,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
}
}
@@ -94,8 +101,9 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
}
if (queryableName != null) {
- final V oldValue = sendOldValues ? store.get(key) : null;
- store.put(key, value);
+ final ValueAndTimestamp<V> oldValueAndTimestamp = sendOldValues ? store.get(key) : null;
+ final V oldValue = getValueOrNull(oldValueAndTimestamp);
+ store.put(key, ValueAndTimestamp.make(value, context().timestamp()));
tupleForwarder.maybeForward(key, value, oldValue);
} else {
context().forward(key, new Change<>(value, null));
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index 6882dac..5ec33f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -17,10 +17,11 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
private final String storeName;
KTableSourceValueGetterSupplier(final String storeName) {
@@ -37,21 +38,18 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS
}
private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
-
- ReadOnlyKeyValueStore<K, V> store = null;
+ TimestampedKeyValueStore<K, V> store = null;
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
- store = (ReadOnlyKeyValueStore<K, V>) context.getStateStore(storeName);
+ store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
}
public V get(final K key) {
- return store.get(key);
+ return getValueOrNull(store.get(key));
}
@Override
- public void close() {
- }
+ public void close() {}
}
-
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index 88cea4f..5d81711 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -22,12 +22,14 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.Objects;
-class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
private final KTableImpl<K, ?, V> parent;
private final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> transformerSupplier;
private final String queryableName;
@@ -74,10 +76,11 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
sendOldValues = true;
}
+
private class KTableTransformValuesProcessor extends AbstractProcessor<K, Change<V>> {
private final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;
- private KeyValueStore<K, V1> store;
- private TupleForwarder<K, V1> tupleForwarder;
+ private TimestampedKeyValueStore<K, V1> store;
+ private TimestampedTupleForwarder<K, V1> tupleForwarder;
private KTableTransformValuesProcessor(final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
@@ -87,13 +90,14 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
@Override
public void init(final ProcessorContext context) {
super.init(context);
-
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
-
if (queryableName != null) {
- final ForwardingCacheFlushListener<K, V1> flushListener = new ForwardingCacheFlushListener<>(context);
- store = (KeyValueStore<K, V1>) context.getStateStore(queryableName);
- tupleForwarder = new TupleForwarder<>(store, context, flushListener, sendOldValues);
+ store = (TimestampedKeyValueStore<K, V1>) context.getStateStore(queryableName);
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ store,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
}
}
@@ -105,8 +109,8 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
final V1 oldValue = sendOldValues ? valueTransformer.transform(key, change.oldValue) : null;
context().forward(key, new Change<>(newValue, oldValue));
} else {
- final V1 oldValue = sendOldValues ? store.get(key) : null;
- store.put(key, newValue);
+ final V1 oldValue = sendOldValues ? getValueOrNull(store.get(key)) : null;
+ store.put(key, ValueAndTimestamp.make(newValue, context().timestamp()));
tupleForwarder.maybeForward(key, newValue, oldValue);
}
}
@@ -117,8 +121,8 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
}
}
- private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> {
+ private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> {
private final KTableValueGetter<K, V> parentGetter;
private final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;
@@ -131,7 +135,6 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
@Override
public void init(final ProcessorContext context) {
parentGetter.init(context);
-
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 8c731be..2106b1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
+import java.time.Duration;
import java.util.Objects;
import java.util.Set;
@@ -193,7 +194,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
}
supplier = Stores.persistentSessionStore(
materialized.storeName(),
- retentionPeriod
+ Duration.ofMillis(retentionPeriod)
);
}
final StoreBuilder<SessionStore<K, VR>> builder = Stores.sessionStoreBuilder(
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index f549ce9..a257603 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -31,8 +31,10 @@ import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import java.time.Duration;
import java.util.Objects;
@@ -154,7 +156,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
}
@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
- private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+ private <VR> StoreBuilder<TimestampedWindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
if (materialized.retention() != null) {
@@ -169,7 +171,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
+ " retention=[" + retentionPeriod + "]");
}
- supplier = Stores.persistentWindowStore(
+ supplier = Stores.persistentTimestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.size()),
@@ -190,16 +192,16 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
+ " retention=[" + windows.maintainMs() + "]");
}
- supplier = Stores.persistentWindowStore(
+ supplier = new RocksDbWindowBytesStoreSupplier(
materialized.storeName(),
windows.maintainMs(),
- windows.segments,
+ Math.max(windows.maintainMs() / (windows.segments - 1), 60_000L),
windows.size(),
- false
- );
+ false,
+ true);
}
}
- final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(
+ final StoreBuilder<TimestampedWindowStore<K, VR>> builder = Stores.timestampedWindowStoreBuilder(
supplier,
materialized.keySerde(),
materialized.valueSerde()
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
new file mode 100644
index 0000000..5540376
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
+
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+class TimestampedCacheFlushListener<K, V> implements CacheFlushListener<K, ValueAndTimestamp<V>> {
+ private final InternalProcessorContext context;
+ private final ProcessorNode myNode;
+
+ TimestampedCacheFlushListener(final ProcessorContext context) {
+ this.context = (InternalProcessorContext) context;
+ myNode = this.context.currentNode();
+ }
+
+ @Override
+ public void apply(final K key,
+ final ValueAndTimestamp<V> newValue,
+ final ValueAndTimestamp<V> oldValue,
+ final long timestamp) {
+ final ProcessorNode prev = context.currentNode();
+ context.setCurrentNode(myNode);
+ try {
+ context.forward(
+ key,
+ new Change<>(getValueOrNull(newValue), getValueOrNull(oldValue)),
+ To.all().withTimestamp(newValue != null ? newValue.timestamp() : timestamp));
+ } finally {
+ context.setCurrentNode(prev);
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java
similarity index 79%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java
index 67872be..fb40b46 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java
@@ -21,24 +21,25 @@ import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-public class KeyValueStoreMaterializer<K, V> {
+public class TimestampedKeyValueStoreMaterializer<K, V> {
private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized;
- public KeyValueStoreMaterializer(final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+ public TimestampedKeyValueStoreMaterializer(final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
this.materialized = materialized;
}
/**
* @return StoreBuilder
*/
- public StoreBuilder<KeyValueStore<K, V>> materialize() {
+ public StoreBuilder<TimestampedKeyValueStore<K, V>> materialize() {
KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
final String name = materialized.storeName();
- supplier = Stores.persistentKeyValueStore(name);
+ supplier = Stores.persistentTimestampedKeyValueStore(name);
}
- final StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder(
+ final StoreBuilder<TimestampedKeyValueStore<K, V>> builder = Stores.timestampedKeyValueStoreBuilder(
supplier,
materialized.keySerde(),
materialized.valueSerde());
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
similarity index 74%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index 323e198..ab2b506 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -25,28 +25,29 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
* Forwarding by this class only occurs when caching is not enabled. If caching is enabled,
* forwarding occurs in the flush listener when the cached store flushes.
*
- * @param <K>
- * @param <V>
+ * @param <K> the type of the key
+ * @param <V> the type of the value
*/
-class TupleForwarder<K, V> {
- private final boolean cachingEnabled;
+class TimestampedTupleForwarder<K, V> {
private final ProcessorContext context;
+ private final boolean sendOldValues;
+ private final boolean cachingEnabled;
@SuppressWarnings("unchecked")
- TupleForwarder(final StateStore store,
- final ProcessorContext context,
- final ForwardingCacheFlushListener<K, V> flushListener,
- final boolean sendOldValues) {
- cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
+ TimestampedTupleForwarder(final StateStore store,
+ final ProcessorContext context,
+ final TimestampedCacheFlushListener<K, V> flushListener,
+ final boolean sendOldValues) {
this.context = context;
+ this.sendOldValues = sendOldValues;
+ cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
}
public void maybeForward(final K key,
final V newValue,
final V oldValue) {
- if (cachingEnabled) {
- return;
+ if (!cachingEnabled) {
+ context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null));
}
- context.forward(key, new Change<>(newValue, oldValue));
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index 323e198..94b0ebd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -25,28 +25,29 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
* Forwarding by this class only occurs when caching is not enabled. If caching is enabled,
* forwarding occurs in the flush listener when the cached store flushes.
*
- * @param <K>
- * @param <V>
+ * @param <K> the type of the key
+ * @param <V> the type of the value
*/
class TupleForwarder<K, V> {
- private final boolean cachingEnabled;
private final ProcessorContext context;
+ private final boolean sendOldValues;
+ private final boolean cachingEnabled;
@SuppressWarnings("unchecked")
TupleForwarder(final StateStore store,
final ProcessorContext context,
final ForwardingCacheFlushListener<K, V> flushListener,
final boolean sendOldValues) {
- cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
this.context = context;
+ this.sendOldValues = sendOldValues;
+ cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
}
public void maybeForward(final K key,
final V newValue,
final V oldValue) {
- if (cachingEnabled) {
- return;
+ if (!cachingEnabled) {
+ context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null));
}
- context.forward(key, new Change<>(newValue, oldValue));
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 03bdda0..542726b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -22,8 +22,8 @@ import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger;
import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import java.util.Arrays;
@@ -36,7 +36,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
private final Serde<VR> valueSerde;
private final String[] joinThisStoreNames;
private final String[] joinOtherStoreNames;
- private final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
+ private final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
KTableKTableJoinNode(final String nodeName,
final ProcessorParameters<K, Change<V1>> joinThisProcessorParameters,
@@ -48,7 +48,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
final Serde<VR> valueSerde,
final String[] joinThisStoreNames,
final String[] joinOtherStoreNames,
- final StoreBuilder<KeyValueStore<K, VR>> storeBuilder) {
+ final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder) {
super(nodeName,
null,
@@ -98,23 +98,24 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
final String otherProcessorName = otherProcessorParameters().processorName();
final String mergeProcessorName = mergeProcessorParameters().processorName();
- topologyBuilder.addProcessor(thisProcessorName,
+ topologyBuilder.addProcessor(
+ thisProcessorName,
thisProcessorParameters().processorSupplier(),
thisJoinSideNodeName());
- topologyBuilder.addProcessor(otherProcessorName,
+ topologyBuilder.addProcessor(
+ otherProcessorName,
otherProcessorParameters().processorSupplier(),
otherJoinSideNodeName());
- topologyBuilder.addProcessor(mergeProcessorName,
+ topologyBuilder.addProcessor(
+ mergeProcessorName,
mergeProcessorParameters().processorSupplier(),
thisProcessorName,
otherProcessorName);
- topologyBuilder.connectProcessorAndStateStores(thisProcessorName,
- joinOtherStoreNames);
- topologyBuilder.connectProcessorAndStateStores(otherProcessorName,
- joinThisStoreNames);
+ topologyBuilder.connectProcessorAndStateStores(thisProcessorName, joinOtherStoreNames);
+ topologyBuilder.connectProcessorAndStateStores(otherProcessorName, joinThisStoreNames);
if (storeBuilder != null) {
topologyBuilder.addStateStore(storeBuilder, mergeProcessorName);
@@ -144,7 +145,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
private String[] joinThisStoreNames;
private String[] joinOtherStoreNames;
private String queryableStoreName;
- private StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
+ private StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
private KTableKTableJoinNodeBuilder() {
}
@@ -199,7 +200,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
return this;
}
- public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withStoreBuilder(final StoreBuilder<KeyValueStore<K, VR>> storeBuilder) {
+ public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withStoreBuilder(final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder) {
this.storeBuilder = storeBuilder;
return this;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index df009df..24df4d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -18,26 +18,26 @@
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import java.util.Arrays;
public class TableProcessorNode<K, V> extends StreamsGraphNode {
private final ProcessorParameters<K, V> processorParameters;
- private final StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+ private final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder;
private final String[] storeNames;
public TableProcessorNode(final String nodeName,
final ProcessorParameters<K, V> processorParameters,
- final StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
+ final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder) {
this(nodeName, processorParameters, storeBuilder, null);
}
public TableProcessorNode(final String nodeName,
final ProcessorParameters<K, V> processorParameters,
- final StoreBuilder<KeyValueStore<K, V>> storeBuilder,
+ final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder,
final String[] storeNames) {
super(nodeName);
this.processorParameters = processorParameters;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index fa979b2..b1df6ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -20,11 +20,12 @@ package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.KTableSource;
-import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import java.util.Collections;
@@ -82,10 +83,10 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
final String topicName = getTopicNames().iterator().next();
- // TODO: we assume source KTables can only be key-value stores for now.
+ // TODO: we assume source KTables can only be timestamped-key-value stores for now.
// should be expanded for other types of stores as well.
- final StoreBuilder<KeyValueStore<K, V>> storeBuilder =
- new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();
+ final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder =
+ new TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();
if (isGlobalKTable) {
topologyBuilder.addGlobalStore(storeBuilder,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/To.java b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
index f9cc4c8..dc124aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/To.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.processor;
+import java.util.Objects;
+
/**
* This class is used to provide the optional parameters when sending output records to downstream processor
* using {@link ProcessorContext#forward(Object, Object, To)}.
@@ -65,4 +67,25 @@ public class To {
this.timestamp = timestamp;
return this;
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final To to = (To) o;
+ return timestamp == to.timestamp &&
+ Objects.equals(childName, to.childName);
+ }
+
+ /**
+ * Equality is implemented in support of tests, *not* for use in Hash collections, since this class is mutable.
+ */
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException("To is unsafe for use in Hash collections");
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 0693ef7..3c8a3ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -25,10 +25,14 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.KeyValueStoreReadWriteDecorator;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.SessionStoreReadWriteDecorator;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.TimestampedKeyValueStoreReadWriteDecorator;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.TimestampedWindowStoreReadWriteDecorator;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.WindowStoreReadWriteDecorator;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -50,8 +54,12 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
public StateStore getStateStore(final String name) {
final StateStore store = stateManager.getGlobalStore(name);
- if (store instanceof KeyValueStore) {
+ if (store instanceof TimestampedKeyValueStore) {
+ return new TimestampedKeyValueStoreReadWriteDecorator((TimestampedKeyValueStore) store);
+ } else if (store instanceof KeyValueStore) {
return new KeyValueStoreReadWriteDecorator((KeyValueStore) store);
+ } else if (store instanceof TimestampedWindowStore) {
+ return new TimestampedWindowStoreReadWriteDecorator((TimestampedWindowStore) store);
} else if (store instanceof WindowStore) {
return new WindowStoreReadWriteDecorator((WindowStore) store);
} else if (store instanceof SessionStore) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 232b28d..1df6610 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -498,11 +498,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
- private static class TimestampedKeyValueStoreReadWriteDecorator<K, V>
+ static class TimestampedKeyValueStoreReadWriteDecorator<K, V>
extends KeyValueStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
implements TimestampedKeyValueStore<K, V> {
- private TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) {
+ TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) {
super(inner);
}
}
@@ -564,7 +564,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
- private static class TimestampedWindowStoreReadWriteDecorator<K, V>
+ static class TimestampedWindowStoreReadWriteDecorator<K, V>
extends WindowStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
implements TimestampedWindowStore<K, V> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
index 8bb652c..f5fc7a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
@@ -25,7 +25,7 @@ import java.util.Objects;
*
* @param <V>
*/
-public class ValueAndTimestamp<V> {
+public final class ValueAndTimestamp<V> {
private final V value;
private final long timestamp;
@@ -50,6 +50,18 @@ public class ValueAndTimestamp<V> {
return value == null ? null : new ValueAndTimestamp<>(value, timestamp);
}
+ /**
+ * Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter
+ * if the parameter is not {@code null}.
+ *
+ * @param valueAndTimestamp a {@link ValueAndTimestamp} instance; can be {@code null}
+ * @param <V> the type of the value
+ * @return the wrapped {@code value} of {@code valueAndTimestamp} if not {@code null}; otherwise {@code null}
+ */
+ public static <V> V getValueOrNull(final ValueAndTimestamp<V> valueAndTimestamp) {
+ return valueAndTimestamp == null ? null : valueAndTimestamp.value();
+ }
+
public V value() {
return value;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 95e20b4..8aa0ceb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -24,16 +24,16 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-class CachingKeyValueStore
+public class CachingKeyValueStore
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]>
implements KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 2fa7c96..468b554 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -51,6 +51,6 @@ public class MeteredTimestampedKeyValueStore<K, V>
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
+ valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
}
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index f43e4e6..863b44b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -19,12 +19,17 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import java.util.List;
import java.util.Objects;
public class TimestampedKeyValueStoreBuilder<K, V>
@@ -48,8 +53,12 @@ public class TimestampedKeyValueStoreBuilder<K, V>
@Override
public TimestampedKeyValueStore<K, V> build() {
KeyValueStore<Bytes, byte[]> store = storeSupplier.get();
- if (!(store instanceof TimestampedBytesStore) && store.persistent()) {
- store = new KeyValueToTimestampedKeyValueByteStoreAdapter(store);
+ if (!(store instanceof TimestampedBytesStore)) {
+ if (store.persistent()) {
+ store = new KeyValueToTimestampedKeyValueByteStoreAdapter(store);
+ } else {
+ store = new InMemoryTimestampedKeyValueStoreMarker(store);
+ }
}
return new MeteredTimestampedKeyValueStore<>(
maybeWrapCaching(maybeWrapLogging(store)),
@@ -72,4 +81,91 @@ public class TimestampedKeyValueStoreBuilder<K, V>
}
return new ChangeLoggingTimestampedKeyValueBytesStore(inner);
}
+
+ private final static class InMemoryTimestampedKeyValueStoreMarker
+ implements KeyValueStore<Bytes, byte[]>, TimestampedBytesStore {
+
+ final KeyValueStore<Bytes, byte[]> wrapped;
+
+ private InMemoryTimestampedKeyValueStoreMarker(final KeyValueStore<Bytes, byte[]> wrapped) {
+ if (wrapped.persistent()) {
+ throw new IllegalArgumentException("Provided store must not be a persistent store, but it is.");
+ }
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public void init(final ProcessorContext context,
+ final StateStore root) {
+ wrapped.init(context, root);
+ }
+
+ @Override
+ public void put(final Bytes key,
+ final byte[] value) {
+ wrapped.put(key, value);
+ }
+
+ @Override
+ public byte[] putIfAbsent(final Bytes key,
+ final byte[] value) {
+ return wrapped.putIfAbsent(key, value);
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+ wrapped.putAll(entries);
+ }
+
+ @Override
+ public byte[] delete(final Bytes key) {
+ return wrapped.delete(key);
+ }
+
+ @Override
+ public byte[] get(final Bytes key) {
+ return wrapped.get(key);
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+ final Bytes to) {
+ return wrapped.range(from, to);
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> all() {
+ return wrapped.all();
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return wrapped.approximateNumEntries();
+ }
+
+ @Override
+ public void flush() {
+ wrapped.flush();
+ }
+
+ @Override
+ public void close() {
+ wrapped.close();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return wrapped.isOpen();
+ }
+
+ @Override
+ public String name() {
+ return wrapped.name();
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+ }
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index 2c7c950..808d31e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -19,11 +19,16 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.Objects;
@@ -36,7 +41,7 @@ public class TimestampedWindowStoreBuilder<K, V>
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
- super(storeSupplier.name(), keySerde, new ValueAndTimestampSerde<>(valueSerde), time);
+ super(storeSupplier.name(), keySerde, valueSerde == null ? null : new ValueAndTimestampSerde<>(valueSerde), time);
Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null");
this.storeSupplier = storeSupplier;
}
@@ -44,8 +49,12 @@ public class TimestampedWindowStoreBuilder<K, V>
@Override
public TimestampedWindowStore<K, V> build() {
WindowStore<Bytes, byte[]> store = storeSupplier.get();
- if (!(store instanceof TimestampedBytesStore) && store.persistent()) {
- store = new WindowToTimestampedWindowByteStoreAdapter(store);
+ if (!(store instanceof TimestampedBytesStore)) {
+ if (store.persistent()) {
+ store = new WindowToTimestampedWindowByteStoreAdapter(store);
+ } else {
+ store = new InMemoryTimestampedWindowStoreMarker(store);
+ }
}
return new MeteredTimestampedWindowStore<>(
maybeWrapCaching(maybeWrapLogging(store)),
@@ -76,4 +85,96 @@ public class TimestampedWindowStoreBuilder<K, V>
public long retentionPeriod() {
return storeSupplier.retentionPeriod();
}
+
+
+ private final static class InMemoryTimestampedWindowStoreMarker
+ implements WindowStore<Bytes, byte[]>, TimestampedBytesStore {
+
+ private final WindowStore<Bytes, byte[]> wrapped;
+
+ private InMemoryTimestampedWindowStoreMarker(final WindowStore<Bytes, byte[]> wrapped) {
+ if (wrapped.persistent()) {
+ throw new IllegalArgumentException("Provided store must not be a persistent store, but it is.");
+ }
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public void init(final ProcessorContext context,
+ final StateStore root) {
+ wrapped.init(context, root);
+ }
+
+ @Override
+ public void put(final Bytes key,
+ final byte[] value) {
+ wrapped.put(key, value);
+ }
+
+ @Override
+ public void put(final Bytes key,
+ final byte[] value,
+ final long windowStartTimestamp) {
+ wrapped.put(key, value, windowStartTimestamp);
+ }
+
+ @Override
+ public byte[] fetch(final Bytes key,
+ final long time) {
+ return wrapped.fetch(key, time);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public WindowStoreIterator<byte[]> fetch(final Bytes key,
+ final long timeFrom,
+ final long timeTo) {
+ return wrapped.fetch(key, timeFrom, timeTo);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
+ final Bytes to,
+ final long timeFrom,
+ final long timeTo) {
+ return wrapped.fetch(from, to, timeFrom, timeTo);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
+ final long timeTo) {
+ return wrapped.fetchAll(timeFrom, timeTo);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+ return wrapped.all();
+ }
+
+ @Override
+ public void flush() {
+ wrapped.flush();
+ }
+
+ @Override
+ public void close() {
+ wrapped.close();
+ }
+ @Override
+ public boolean isOpen() {
+ return wrapped.isOpen();
+ }
+
+ @Override
+ public String name() {
+ return wrapped.name();
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index afb2cc1..600f3db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -19,8 +19,9 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.GenericInMemoryKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.test.GenericInMemoryTimestampedKeyValueStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.junit.Test;
@@ -45,18 +46,19 @@ public class KTableReduceTest {
this::differenceNotNullArgs
).get();
- final KeyValueStore<String, Set<String>> myStore = new GenericInMemoryKeyValueStore<>("myStore");
+ final TimestampedKeyValueStore<String, Set<String>> myStore =
+ new GenericInMemoryTimestampedKeyValueStore<>("myStore");
context.register(myStore, null);
reduceProcessor.init(context);
context.setCurrentNode(new ProcessorNode<>("reduce", reduceProcessor, singleton("myStore")));
reduceProcessor.process("A", new Change<>(singleton("a"), null));
- assertEquals(singleton("a"), myStore.get("A"));
+ assertEquals(ValueAndTimestamp.make(singleton("a"), -1L), myStore.get("A"));
reduceProcessor.process("A", new Change<>(singleton("b"), singleton("a")));
- assertEquals(singleton("b"), myStore.get("A"));
+ assertEquals(ValueAndTimestamp.make(singleton("b"), -1L), myStore.get("A"));
reduceProcessor.process("A", new Change<>(null, singleton("b")));
- assertEquals(emptySet(), myStore.get("A"));
+ assertEquals(ValueAndTimestamp.make(emptySet(), -1L), myStore.get("A"));
}
private Set<String> differenceNotNullArgs(final Set<String> left, final Set<String> right) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index ed6b649..a9bf7f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -37,6 +37,8 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
@@ -90,7 +92,7 @@ public class KTableTransformValuesTest {
@Mock(MockType.NICE)
private KTableValueGetter<String, String> parentGetter;
@Mock(MockType.NICE)
- private KeyValueStore<String, String> stateStore;
+ private TimestampedKeyValueStore<String, String> stateStore;
@Mock(MockType.NICE)
private ValueTransformerWithKeySupplier<String, String, String> mockSupplier;
@Mock(MockType.NICE)
@@ -220,7 +222,7 @@ public class KTableTransformValuesTest {
new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), QUERYABLE_NAME);
expect(context.getStateStore(QUERYABLE_NAME)).andReturn(stateStore);
- expect(stateStore.get("Key")).andReturn("something");
+ expect(stateStore.get("Key")).andReturn(ValueAndTimestamp.make("something", 0L));
replay(context, stateStore);
final KTableValueGetter<String, String> getter = transformValues.view().get();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
new file mode 100644
index 0000000..38ef5c6
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class TimestampedCacheFlushListenerTest {
+
+ @Test
+ public void shouldForwardValueTimestampIfNewValueExists() {
+ final InternalProcessorContext context = mock(InternalProcessorContext.class);
+ expect(context.currentNode()).andReturn(null).anyTimes();
+ context.setCurrentNode(null);
+ context.setCurrentNode(null);
+ context.forward(
+ "key",
+ new Change<>("newValue", "oldValue"),
+ To.all().withTimestamp(42L));
+ expectLastCall();
+ replay(context);
+
+ new TimestampedCacheFlushListener<>(context).apply(
+ "key",
+ ValueAndTimestamp.make("newValue", 42L),
+ ValueAndTimestamp.make("oldValue", 21L),
+ 73L);
+
+ verify(context);
+ }
+
+ @Test
+ public void shouldForwardParameterTimestampIfNewValueIsNull() {
+ final InternalProcessorContext context = mock(InternalProcessorContext.class);
+ expect(context.currentNode()).andReturn(null).anyTimes();
+ context.setCurrentNode(null);
+ context.setCurrentNode(null);
+ context.forward(
+ "key",
+ new Change<>(null, "oldValue"),
+ To.all().withTimestamp(73L));
+ expectLastCall();
+ replay(context);
+
+ new TimestampedCacheFlushListener<>(context).apply(
+ "key",
+ null,
+ ValueAndTimestamp.make("oldValue", 21L),
+ 73L);
+
+ verify(context);
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
similarity index 66%
copy from streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
copy to streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index 68c6eaa..068cb6b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.junit.Test;
@@ -27,7 +28,7 @@ import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
-public class TupleForwarderTest {
+public class TimestampedTupleForwarderTest {
@Test
public void shouldSetFlushListenerOnWrappedStateStore() {
@@ -36,29 +37,38 @@ public class TupleForwarderTest {
}
private void setFlushListener(final boolean sendOldValues) {
- final WrappedStateStore<StateStore, Object, Object> store = mock(WrappedStateStore.class);
- final ForwardingCacheFlushListener<Object, Object> flushListener = mock(ForwardingCacheFlushListener.class);
+ final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>> store = mock(WrappedStateStore.class);
+ final TimestampedCacheFlushListener<Object, Object> flushListener = mock(TimestampedCacheFlushListener.class);
expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false);
replay(store);
- new TupleForwarder<>(store, null, flushListener, sendOldValues);
+ new TimestampedTupleForwarder<>(store, null, flushListener, sendOldValues);
verify(store);
}
@Test
public void shouldForwardRecordsIfWrappedStateStoreDoesNotCache() {
+ shouldForwardRecordsIfWrappedStateStoreDoesNotCache(false);
+ shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true);
+ }
+
+ private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) {
final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
final ProcessorContext context = mock(ProcessorContext.class);
- expect(store.setFlushListener(null, false)).andReturn(false);
- context.forward("key", new Change<>("value", "oldValue"));
+ expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
+ if (sendOldValues) {
+ context.forward("key", new Change<>("newValue", "oldValue"));
+ } else {
+ context.forward("key", new Change<>("newValue", null));
+ }
expectLastCall();
replay(store, context);
- new TupleForwarder<>(store, context, null, false)
- .maybeForward("key", "value", "oldValue");
+ new TimestampedTupleForwarder<>(store, context, null, sendOldValues)
+ .maybeForward("key", "newValue", "oldValue");
verify(store, context);
}
@@ -71,10 +81,9 @@ public class TupleForwarderTest {
expect(store.setFlushListener(null, false)).andReturn(true);
replay(store, context);
- new TupleForwarder<>(store, context, null, false)
- .maybeForward("key", "value", "oldValue");
+ new TimestampedTupleForwarder<>(store, context, null, false)
+ .maybeForward("key", "newValue", "oldValue");
verify(store, context);
}
-
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
index 68c6eaa..f62e826 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
@@ -49,16 +49,25 @@ public class TupleForwarderTest {
@Test
public void shouldForwardRecordsIfWrappedStateStoreDoesNotCache() {
+ shouldForwardRecordsIfWrappedStateStoreDoesNotCache(false);
+ shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true);
+ }
+
+ private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) {
final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
final ProcessorContext context = mock(ProcessorContext.class);
- expect(store.setFlushListener(null, false)).andReturn(false);
- context.forward("key", new Change<>("value", "oldValue"));
+ expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
+ if (sendOldValues) {
+ context.forward("key", new Change<>("newValue", "oldValue"));
+ } else {
+ context.forward("key", new Change<>("newValue", null));
+ }
expectLastCall();
replay(store, context);
- new TupleForwarder<>(store, context, null, false)
- .maybeForward("key", "value", "oldValue");
+ new TupleForwarder<>(store, context, null, sendOldValues)
+ .maybeForward("key", "newValue", "oldValue");
verify(store, context);
}
@@ -72,7 +81,7 @@ public class TupleForwarderTest {
replay(store, context);
new TupleForwarder<>(store, context, null, false)
- .maybeForward("key", "value", "oldValue");
+ .maybeForward("key", "newValue", "oldValue");
verify(store, context);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index c0e0de3..a5db924 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
import org.apache.kafka.streams.kstream.internals.KTableSource;
-import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
+import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -87,7 +87,7 @@ public class GlobalStreamThreadTest {
);
builder.addGlobalStore(
- new KeyValueStoreMaterializer<>(materialized).materialize().withLoggingDisabled(),
+ new TimestampedKeyValueStoreMaterializer<>(materialized).materialize().withLoggingDisabled(),
"sourceName",
null,
null,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 8c8811d..0e50120 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -51,13 +51,15 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.apache.kafka.test.MockKeyValueStore;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockRestoreConsumer;
import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -352,9 +354,9 @@ public class StandbyTaskTest {
assertEquals(
asList(
- new KeyValue<>(new Windowed<>(1, new TimeWindow(0, 60_000)), 100L),
- new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), 100L),
- new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), 100L)
+ new KeyValue<>(new Windowed<>(1, new TimeWindow(0, 60_000)), ValueAndTimestamp.make(100L, 60_000L)),
+ new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), ValueAndTimestamp.make(100L, 120_000L)),
+ new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), ValueAndTimestamp.make(100L, 180_000L))
),
getWindowedStoreContents(storeName, task)
);
@@ -368,9 +370,9 @@ public class StandbyTaskTest {
// the first record's window should have expired.
assertEquals(
asList(
- new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), 100L),
- new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), 100L),
- new KeyValue<>(new Windowed<>(4, new TimeWindow(180_000, 240_000)), 100L)
+ new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), ValueAndTimestamp.make(100L, 120_000L)),
+ new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), ValueAndTimestamp.make(100L, 180_000L)),
+ new KeyValue<>(new Windowed<>(4, new TimeWindow(180_000, 240_000)), ValueAndTimestamp.make(100L, 240_000L))
),
getWindowedStoreContents(storeName, task)
);
@@ -388,7 +390,7 @@ public class StandbyTaskTest {
changelogName,
1,
offset,
- start,
+ end,
TimestampType.CREATE_TIME,
0L,
0,
@@ -449,17 +451,17 @@ public class StandbyTaskTest {
}
@SuppressWarnings("unchecked")
- private List<KeyValue<Windowed<Integer>, Long>> getWindowedStoreContents(final String storeName,
- final StandbyTask task) {
+ private List<KeyValue<Windowed<Integer>, ValueAndTimestamp<Long>>> getWindowedStoreContents(final String storeName,
+ final StandbyTask task) {
final StandbyContextImpl context = (StandbyContextImpl) task.context();
- final List<KeyValue<Windowed<Integer>, Long>> result = new ArrayList<>();
+ final List<KeyValue<Windowed<Integer>, ValueAndTimestamp<Long>>> result = new ArrayList<>();
- try (final KeyValueIterator<Windowed<byte[]>, Long> iterator =
- ((WindowStore) context.getStateMgr().getStore(storeName)).all()) {
+ try (final KeyValueIterator<Windowed<byte[]>, ValueAndTimestamp<Long>> iterator =
+ ((TimestampedWindowStore) context.getStateMgr().getStore(storeName)).all()) {
while (iterator.hasNext()) {
- final KeyValue<Windowed<byte[]>, Long> next = iterator.next();
+ final KeyValue<Windowed<byte[]>, ValueAndTimestamp<Long>> next = iterator.next();
final Integer deserializedKey = new IntegerDeserializer().deserialize(null, next.key.key());
result.add(new KeyValue<>(new Windowed<>(deserializedKey, next.key.window()), next.value));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java
similarity index 66%
rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java
index 30080c3..f963786 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java
@@ -20,16 +20,18 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
-import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.internals.CachingKeyValueStore;
import org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
+import org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
-import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
@@ -44,7 +46,7 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNot.not;
@RunWith(EasyMockRunner.class)
-public class KeyValueStoreMaterializerTest {
+public class TimestampedKeyValueStoreMaterializerTest {
private final String storePrefix = "prefix";
@Mock(type = MockType.NICE)
@@ -55,14 +57,14 @@ public class KeyValueStoreMaterializerTest {
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as("store"), nameProvider, storePrefix);
- final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
- final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
- final KeyValueStore<String, String> store = builder.build();
+ final TimestampedKeyValueStoreMaterializer<String, String> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder<TimestampedKeyValueStore<String, String>> builder = materializer.materialize();
+ final TimestampedKeyValueStore<String, String> store = builder.build();
final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped();
final StateStore logging = caching.wrapped();
- assertThat(store, instanceOf(MeteredKeyValueStore.class));
- assertThat(caching, instanceOf(CachedStateStore.class));
- assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+ assertThat(store, instanceOf(MeteredTimestampedKeyValueStore.class));
+ assertThat(caching, instanceOf(CachingKeyValueStore.class));
+ assertThat(logging, instanceOf(ChangeLoggingTimestampedKeyValueBytesStore.class));
}
@Test
@@ -70,9 +72,9 @@ public class KeyValueStoreMaterializerTest {
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withCachingDisabled(), nameProvider, storePrefix
);
- final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
- final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
- final KeyValueStore<String, String> store = builder.build();
+ final TimestampedKeyValueStoreMaterializer<String, String> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder<TimestampedKeyValueStore<String, String>> builder = materializer.materialize();
+ final TimestampedKeyValueStore<String, String> store = builder.build();
final WrappedStateStore logging = (WrappedStateStore) ((WrappedStateStore) store).wrapped();
assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
}
@@ -82,11 +84,11 @@ public class KeyValueStoreMaterializerTest {
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withLoggingDisabled(), nameProvider, storePrefix
);
- final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
- final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
- final KeyValueStore<String, String> store = builder.build();
+ final TimestampedKeyValueStoreMaterializer<String, String> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder<TimestampedKeyValueStore<String, String>> builder = materializer.materialize();
+ final TimestampedKeyValueStore<String, String> store = builder.build();
final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped();
- assertThat(caching, instanceOf(CachedStateStore.class));
+ assertThat(caching, instanceOf(CachingKeyValueStore.class));
assertThat(caching.wrapped(), not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
}
@@ -95,11 +97,11 @@ public class KeyValueStoreMaterializerTest {
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider, storePrefix
);
- final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
- final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
- final KeyValueStore<String, String> store = builder.build();
+ final TimestampedKeyValueStoreMaterializer<String, String> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder<TimestampedKeyValueStore<String, String>> builder = materializer.materialize();
+ final TimestampedKeyValueStore<String, String> store = builder.build();
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertThat(wrapped, not(instanceOf(CachedStateStore.class)));
+ assertThat(wrapped, not(instanceOf(CachingKeyValueStore.class)));
assertThat(wrapped, not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
}
@@ -113,9 +115,9 @@ public class KeyValueStoreMaterializerTest {
final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), nameProvider, storePrefix);
- final KeyValueStoreMaterializer<String, Integer> materializer = new KeyValueStoreMaterializer<>(materialized);
- final StoreBuilder<KeyValueStore<String, Integer>> builder = materializer.materialize();
- final KeyValueStore<String, Integer> built = builder.build();
+ final TimestampedKeyValueStoreMaterializer<String, Integer> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder<TimestampedKeyValueStore<String, Integer>> builder = materializer.materialize();
+ final TimestampedKeyValueStore<String, Integer> built = builder.build();
assertThat(store.name(), CoreMatchers.equalTo(built.name()));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index e520df4..8c83271 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -218,6 +218,17 @@ public class StoresTest {
}
@Test
+ public void shouldBuildTimestampedKeyValueStoreThatWrapsInMemoryKeyValueStore() {
+ final TimestampedKeyValueStore<String, String> store = Stores.timestampedKeyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("name"),
+ Serdes.String(),
+ Serdes.String()
+ ).withLoggingDisabled().withCachingDisabled().build();
+ assertThat(store, not(nullValue()));
+ assertThat(((WrappedStateStore) store).wrapped(), instanceOf(TimestampedBytesStore.class));
+ }
+
+ @Test
public void shouldBuildWindowStore() {
final WindowStore<String, String> store = Stores.windowStoreBuilder(
Stores.persistentWindowStore("store", ofMillis(3L), ofMillis(3L), true),
@@ -238,6 +249,27 @@ public class StoresTest {
}
@Test
+ public void shouldBuildTimestampedWindowStoreThatWrapsWindowStore() {
+ final TimestampedWindowStore<String, String> store = Stores.timestampedWindowStoreBuilder(
+ Stores.persistentWindowStore("store", ofMillis(3L), ofMillis(3L), true),
+ Serdes.String(),
+ Serdes.String()
+ ).build();
+ assertThat(store, not(nullValue()));
+ }
+
+ @Test
+ public void shouldBuildTimestampedWindowStoreThatWrapsInMemroyWindowStore() {
+ final TimestampedWindowStore<String, String> store = Stores.timestampedWindowStoreBuilder(
+ Stores.inMemoryWindowStore("store", ofMillis(3L), ofMillis(3L), true),
+ Serdes.String(),
+ Serdes.String()
+ ).withLoggingDisabled().withCachingDisabled().build();
+ assertThat(store, not(nullValue()));
+ assertThat(((WrappedStateStore) store).wrapped(), instanceOf(TimestampedBytesStore.class));
+ }
+
+ @Test
public void shouldBuildSessionStore() {
final SessionStore<String, String> store = Stores.sessionStoreBuilder(
Stores.persistentSessionStore("name", ofMillis(10)),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java
index 7b0eb6d..1ba3d37 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java
@@ -18,11 +18,9 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
@@ -46,14 +44,16 @@ public class TimestampedKeyValueStoreBuilderTest {
@Mock(type = MockType.NICE)
private KeyValueBytesStoreSupplier supplier;
@Mock(type = MockType.NICE)
- private KeyValueStore<Bytes, byte[]> inner;
+ private RocksDBTimestampedStore inner;
private TimestampedKeyValueStoreBuilder<String, String> builder;
@Before
public void setUp() {
expect(supplier.get()).andReturn(inner);
expect(supplier.name()).andReturn("name");
- replay(supplier);
+ expect(inner.persistent()).andReturn(true).anyTimes();
+ replay(supplier, inner);
+
builder = new TimestampedKeyValueStoreBuilder<>(
supplier,
Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
index 7be31ea..e6d3da5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
@@ -18,12 +18,10 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
-import org.apache.kafka.streams.state.WindowStore;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
@@ -46,14 +44,15 @@ public class TimestampedWindowStoreBuilderTest {
@Mock(type = MockType.NICE)
private WindowBytesStoreSupplier supplier;
@Mock(type = MockType.NICE)
- private WindowStore<Bytes, byte[]> inner;
+ private RocksDBTimestampedWindowStore inner;
private TimestampedWindowStoreBuilder<String, String> builder;
@Before
public void setUp() {
expect(supplier.get()).andReturn(inner);
expect(supplier.name()).andReturn("name");
- replay(supplier);
+ expect(inner.persistent()).andReturn(true).anyTimes();
+ replay(supplier, inner);
builder = new TimestampedWindowStoreBuilder<>(
supplier,
diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
new file mode 100644
index 0000000..67a67c9
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
+import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * This class is a generic version of the in-memory key-value store that is useful for testing when you
+ * need a basic KeyValueStore for arbitrary types and don't have/want to write a serde
+ */
+public class GenericInMemoryTimestampedKeyValueStore<K extends Comparable, V>
+ extends WrappedStateStore<StateStore, K, ValueAndTimestamp<V>>
+ implements TimestampedKeyValueStore<K, V> {
+
+ private final String name;
+ private final NavigableMap<K, ValueAndTimestamp<V>> map;
+ private volatile boolean open = false;
+
+ public GenericInMemoryTimestampedKeyValueStore(final String name) {
+ // it's not really a `WrappedStateStore` so we pass `null`
+ // however, we need to implement `WrappedStateStore` to make the store usable
+ super(null);
+ this.name = name;
+
+ this.map = new TreeMap<>();
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ /* This is a "dummy" store used for testing;
+ it does not support restoring from changelog since we allow it to be serde-ignorant */
+ public void init(final ProcessorContext context, final StateStore root) {
+ if (root != null) {
+ context.register(root, null);
+ }
+
+ this.open = true;
+ }
+
+ @Override
+ public boolean setFlushListener(final CacheFlushListener<K, ValueAndTimestamp<V>> listener,
+ final boolean sendOldValues) {
+ return false;
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return this.open;
+ }
+
+ @Override
+ public synchronized ValueAndTimestamp<V> get(final K key) {
+ return this.map.get(key);
+ }
+
+ @Override
+ public synchronized void put(final K key,
+ final ValueAndTimestamp<V> value) {
+ if (value == null) {
+ this.map.remove(key);
+ } else {
+ this.map.put(key, value);
+ }
+ }
+
+ @Override
+ public synchronized ValueAndTimestamp<V> putIfAbsent(final K key,
+ final ValueAndTimestamp<V> value) {
+ final ValueAndTimestamp<V> originalValue = get(key);
+ if (originalValue == null) {
+ put(key, value);
+ }
+ return originalValue;
+ }
+
+ @Override
+ public synchronized void putAll(final List<KeyValue<K, ValueAndTimestamp<V>>> entries) {
+ for (final KeyValue<K, ValueAndTimestamp<V>> entry : entries) {
+ put(entry.key, entry.value);
+ }
+ }
+
+ @Override
+ public synchronized ValueAndTimestamp<V> delete(final K key) {
+ return this.map.remove(key);
+ }
+
+ @Override
+ public synchronized KeyValueIterator<K, ValueAndTimestamp<V>> range(final K from,
+ final K to) {
+ return new DelegatingPeekingKeyValueIterator<>(
+ name,
+ new GenericInMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator()));
+ }
+
+ @Override
+ public synchronized KeyValueIterator<K, ValueAndTimestamp<V>> all() {
+ final TreeMap<K, ValueAndTimestamp<V>> copy = new TreeMap<>(this.map);
+ return new DelegatingPeekingKeyValueIterator<>(name, new GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return this.map.size();
+ }
+
+ @Override
+ public void flush() {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public void close() {
+ this.map.clear();
+ this.open = false;
+ }
+
+ private static class GenericInMemoryKeyValueIterator<K, V> implements KeyValueIterator<K, ValueAndTimestamp<V>> {
+ private final Iterator<Entry<K, ValueAndTimestamp<V>>> iter;
+
+ private GenericInMemoryKeyValueIterator(final Iterator<Map.Entry<K, ValueAndTimestamp<V>>> iter) {
+ this.iter = iter;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public KeyValue<K, ValueAndTimestamp<V>> next() {
+ final Map.Entry<K, ValueAndTimestamp<V>> entry = iter.next();
+ return new KeyValue<>(entry.getKey(), entry.getValue());
+ }
+
+ @Override
+ public void remove() {
+ iter.remove();
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+
+ @Override
+ public K peekNextKey() {
+ throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
+ }
+ }
+}
\ No newline at end of file