You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/01 17:27:20 UTC
[kafka] branch trunk updated: KAFKA-6560: Replace range query with
newly added single point query in Windowed Aggregation (#4578)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new eb449fe KAFKA-6560: Replace range query with newly added single point query in Windowed Aggregation (#4578)
eb449fe is described below
commit eb449fe7c55a0816328a851fc1102dfeac6d8616
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Mar 1 09:27:11 2018 -0800
KAFKA-6560: Replace range query with newly added single point query in Windowed Aggregation (#4578)
* Add a new fetch(K key, long window-start-timestamp) API into ReadOnlyWindowStore.
* Use the new API to replace the range fetch API in KStreamWindowedAggregate and KStreamWindowedReduce.
* Added corresponding unit tests.
* Also removed some redundant byte serdes in byte stores.
---
.../kstream/internals/KStreamWindowAggregate.java | 61 +--
.../kstream/internals/KStreamWindowReduce.java | 65 +---
.../kafka/streams/state/ReadOnlyWindowStore.java | 24 +-
.../state/internals/CachingKeyValueStore.java | 8 +-
.../state/internals/CachingWindowStore.java | 20 +-
.../internals/ChangeLoggingWindowBytesStore.java | 13 +-
.../internals/CompositeReadOnlyWindowStore.java | 23 +-
.../state/internals/MeteredWindowStore.java | 16 +-
.../state/internals/RocksDBWindowStore.java | 16 +-
.../streams/state/internals/WindowKeySchema.java | 10 +-
.../kafka/streams/state/NoOpWindowStore.java | 5 +
.../state/internals/CachingWindowStoreTest.java | 5 +
.../CompositeReadOnlyWindowStoreTest.java | 18 +-
.../state/internals/ReadOnlyWindowStoreStub.java | 11 +-
.../state/internals/RocksDBWindowStoreTest.java | 415 ++++++++++-----------
15 files changed, 377 insertions(+), 333 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index ec26866..27f8408 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.Window;
@@ -39,7 +38,10 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
private boolean sendOldValues = false;
- public KStreamWindowAggregate(Windows<W> windows, String storeName, Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator) {
+ KStreamWindowAggregate(final Windows<W> windows,
+ final String storeName,
+ final Initializer<T> initializer,
+ final Aggregator<? super K, ? super V, T> aggregator) {
this.windows = windows;
this.storeName = storeName;
this.initializer = initializer;
@@ -63,7 +65,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
@SuppressWarnings("unchecked")
@Override
- public void init(ProcessorContext context) {
+ public void init(final ProcessorContext context) {
super.init(context);
windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
@@ -71,54 +73,27 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
}
@Override
- public void process(K key, V value) {
+ public void process(final K key, final V value) {
// if the key is null, we do not need proceed aggregating the record
// the record with the table
if (key == null)
return;
// first get the matching windows
- long timestamp = context().timestamp();
- Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
+ final long timestamp = context().timestamp();
+ final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
- long timeFrom = Long.MAX_VALUE;
- long timeTo = Long.MIN_VALUE;
+ // try update the window, and create the new window for the rest of unmatched window that do not exist yet
+ for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
+ T oldAgg = windowStore.fetch(key, entry.getKey());
- // use range query on window store for efficient reads
- for (long windowStartMs : matchedWindows.keySet()) {
- timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
- timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
- }
-
- try (WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo)) {
-
- // for each matching window, try to update the corresponding key
- while (iter.hasNext()) {
- KeyValue<Long, T> entry = iter.next();
- W window = matchedWindows.get(entry.key);
-
- if (window != null) {
-
- T oldAgg = entry.value;
-
- if (oldAgg == null)
- oldAgg = initializer.apply();
-
- // try to add the new value (there will never be old value)
- T newAgg = aggregator.apply(key, value, oldAgg);
-
- // update the store with the new value
- windowStore.put(key, newAgg, window.start());
- tupleForwarder.maybeForward(new Windowed<>(key, window), newAgg, oldAgg);
- matchedWindows.remove(entry.key);
- }
+ if (oldAgg == null) {
+ oldAgg = initializer.apply();
}
- }
- // create the new window for the rest of unmatched window that do not exist yet
- for (Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
- T oldAgg = initializer.apply();
- T newAgg = aggregator.apply(key, value, oldAgg);
+ final T newAgg = aggregator.apply(key, value, oldAgg);
+
+ // update the store with the new value
windowStore.put(key, newAgg, entry.getKey());
tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg);
}
@@ -147,13 +122,13 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
@SuppressWarnings("unchecked")
@Override
- public void init(ProcessorContext context) {
+ public void init(final ProcessorContext context) {
windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
}
@SuppressWarnings("unchecked")
@Override
- public T get(Windowed<K> windowedKey) {
+ public T get(final Windowed<K> windowedKey) {
K key = windowedKey.key();
W window = (W) windowedKey.window();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 7d02f11..c3d95d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
@@ -37,7 +36,9 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
private boolean sendOldValues = false;
- public KStreamWindowReduce(Windows<W> windows, String storeName, Reducer<V> reducer) {
+ KStreamWindowReduce(final Windows<W> windows,
+ final String storeName,
+ final Reducer<V> reducer) {
this.windows = windows;
this.storeName = storeName;
this.reducer = reducer;
@@ -60,63 +61,37 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
@SuppressWarnings("unchecked")
@Override
- public void init(ProcessorContext context) {
+ public void init(final ProcessorContext context) {
super.init(context);
windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues);
}
@Override
- public void process(K key, V value) {
+ public void process(final K key, final V value) {
// if the key is null, we do not need proceed aggregating
// the record with the table
if (key == null)
return;
// first get the matching windows
- long timestamp = context().timestamp();
+ final long timestamp = context().timestamp();
+ final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
- Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
-
- long timeFrom = Long.MAX_VALUE;
- long timeTo = Long.MIN_VALUE;
-
- // use range query on window store for efficient reads
- for (long windowStartMs : matchedWindows.keySet()) {
- timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
- timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
- }
+ // try update the window, and create the new window for the rest of unmatched window that do not exist yet
+ for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
+ final V oldAgg = windowStore.fetch(key, entry.getKey());
- try (WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo)) {
- // for each matching window, try to update the corresponding key and send to the downstream
- while (iter.hasNext()) {
- KeyValue<Long, V> entry = iter.next();
- W window = matchedWindows.get(entry.key);
-
- if (window != null) {
-
- V oldAgg = entry.value;
- V newAgg = oldAgg;
-
- // try to add the new value (there will never be old value)
- if (newAgg == null) {
- newAgg = value;
- } else {
- newAgg = reducer.apply(newAgg, value);
- }
-
- // update the store with the new value
- windowStore.put(key, newAgg, window.start());
- tupleForwarder.maybeForward(new Windowed<>(key, window), newAgg, oldAgg);
- matchedWindows.remove(entry.key);
- }
+ V newAgg;
+ if (oldAgg == null) {
+ newAgg = value;
+ } else {
+ newAgg = reducer.apply(oldAgg, value);
}
- }
- // create the new window for the rest of unmatched window that do not exist yet
- for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
- windowStore.put(key, value, entry.getKey());
- tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), value, null);
+ // update the store with the new value
+ windowStore.put(key, newAgg, entry.getKey());
+ tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg);
}
}
}
@@ -143,13 +118,13 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
@SuppressWarnings("unchecked")
@Override
- public void init(ProcessorContext context) {
+ public void init(final ProcessorContext context) {
windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
}
@SuppressWarnings("unchecked")
@Override
- public V get(Windowed<K> windowedKey) {
+ public V get(final Windowed<K> windowedKey) {
K key = windowedKey.key();
W window = (W) windowedKey.window();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index f92ab6e..dea759f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -29,6 +29,17 @@ import org.apache.kafka.streams.kstream.Windowed;
public interface ReadOnlyWindowStore<K, V> {
/**
+ * Get the value of key from a window.
+ *
+ * @param key the key to fetch
+ * @param time start timestamp (inclusive) of the window
+ * @return The value or {@code null} if no value is found in the window
+ * @throws InvalidStateStoreException if the store is not initialized
+ * @throws NullPointerException If {@code null} is used for any key.
+ */
+ V fetch(K key, long time);
+
+ /**
* Get all the key-value pairs with the given key and the time range from all
* the existing windows.
*
@@ -56,9 +67,12 @@ public interface ReadOnlyWindowStore<K, V> {
* For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest
* available window to the newest/latest window.
*
+ * @param key the key to fetch
+ * @param timeFrom time range start (inclusive)
+ * @param timeTo time range end (inclusive)
* @return an iterator over key-value pairs {@code <timestamp, value>}
* @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException If null is used for key.
+ * @throws NullPointerException If {@code null} is used for key.
*/
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
@@ -74,7 +88,7 @@ public interface ReadOnlyWindowStore<K, V> {
* @param timeTo time range end (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException If null is used for any key.
+ * @throws NullPointerException If {@code null} is used for any key.
*/
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
@@ -89,11 +103,11 @@ public interface ReadOnlyWindowStore<K, V> {
/**
* Gets all the key-value pairs that belong to the windows within in the given time range.
*
- * @param timeFrom the beginning of the time slot from which to search
- * @param timeTo the end of the time slot from which to search
+ * @param timeFrom the beginning of the time slot from which to search (inclusive)
+ * @param timeTo the end of the time slot from which to search (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException if null is used for any key
+ * @throws NullPointerException if {@code null} is used for any key
*/
KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 82a6ac7..45f606f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -175,13 +175,9 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
cache.put(cacheName, key, new LRUCacheEntry(rawValue));
}
return rawValue;
+ } else {
+ return entry.value;
}
-
- if (entry.value == null) {
- return null;
- }
-
- return entry.value;
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index ad0bd99..e3d0f62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -45,11 +45,12 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
private String name;
private ThreadCache cache;
- private InternalProcessorContext context;
+ private boolean sendOldValues;
private StateSerdes<K, V> serdes;
+ private InternalProcessorContext context;
private StateSerdes<Bytes, byte[]> bytesSerdes;
private CacheFlushListener<Windowed<K>, V> flushListener;
- private boolean sendOldValues;
+
private final SegmentedCacheFunction cacheFunction;
CachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
@@ -150,13 +151,26 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
// if store is open outside as well.
validateStoreOpen();
- final Bytes keyBytes = WindowStoreUtils.toBinaryKey(key, timestamp, 0, bytesSerdes);
+ final Bytes keyBytes = WindowStoreUtils.toBinaryKey(key.get(), timestamp, 0);
final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(),
timestamp, context.partition(), context.topic());
cache.put(name, cacheFunction.cacheKey(keyBytes), entry);
}
@Override
+ public byte[] fetch(final Bytes key, final long timestamp) {
+ validateStoreOpen();
+ final Bytes bytesKey = WindowStoreUtils.toBinaryKey(key.get(), timestamp, 0);
+ final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
+ final LRUCacheEntry entry = cache.get(name, cacheKey);
+ if (entry == null) {
+ return underlying.fetch(key, timestamp);
+ } else {
+ return entry.value;
+ }
+ }
+
+ @Override
public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
// since this function may not access the underlying inner store, we need to validate
// if store is open outside as well.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 4fe4b99..e69a320 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -36,7 +36,6 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
private final boolean retainDuplicates;
private StoreChangeLogger<Bytes, byte[]> changeLogger;
private ProcessorContext context;
- private StateSerdes<Bytes, byte[]> innerStateSerde;
private int seqnum = 0;
ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore,
@@ -47,6 +46,11 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
}
@Override
+ public byte[] fetch(final Bytes key, final long timestamp) {
+ return bytesStore.fetch(key, timestamp);
+ }
+
+ @Override
public WindowStoreIterator<byte[]> fetch(final Bytes key, final long from, final long to) {
return bytesStore.fetch(key, from, to);
}
@@ -74,18 +78,19 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
@Override
public void put(final Bytes key, final byte[] value, final long timestamp) {
bytesStore.put(key, value, timestamp);
- changeLogger.logChange(WindowStoreUtils.toBinaryKey(key, timestamp, maybeUpdateSeqnumForDups(), innerStateSerde), value);
+ changeLogger.logChange(WindowStoreUtils.toBinaryKey(key.get(), timestamp, maybeUpdateSeqnumForDups()), value);
}
@Override
public void init(final ProcessorContext context, final StateStore root) {
this.context = context;
bytesStore.init(context, root);
- innerStateSerde = WindowStoreUtils.getInnerStateSerde(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()));
+
+ final StateSerdes<Bytes, byte[]> bytesSerde = WindowStoreUtils.getInnerStateSerde(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()));
changeLogger = new StoreChangeLogger<>(
name(),
context,
- innerStateSerde);
+ bytesSerde);
}
private int maybeUpdateSeqnumForDups() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index 6afc6fd..1b5d5e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -45,10 +45,29 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
}
@Override
+ public V fetch(final K key, final long time) {
+ Objects.requireNonNull(key, "key can't be null");
+ final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName, windowStoreType);
+ for (final ReadOnlyWindowStore<K, V> windowStore : stores) {
+ try {
+ final V result = windowStore.fetch(key, time);
+ if (result != null) {
+ return result;
+ }
+ } catch (final InvalidStateStoreException e) {
+ throw new InvalidStateStoreException(
+ "State store is not available anymore and may have been migrated to another instance; " +
+ "please re-discover its location from the state metadata.");
+ }
+ }
+ return null;
+ }
+
+ @Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
Objects.requireNonNull(key, "key can't be null");
final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName, windowStoreType);
- for (ReadOnlyWindowStore<K, V> windowStore : stores) {
+ for (final ReadOnlyWindowStore<K, V> windowStore : stores) {
try {
final WindowStoreIterator<V> result = windowStore.fetch(key, timeFrom, timeTo);
if (!result.hasNext()) {
@@ -56,7 +75,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
} else {
return result;
}
- } catch (InvalidStateStoreException e) {
+ } catch (final InvalidStateStoreException e) {
throw new InvalidStateStoreException(
"State store is not available anymore and may have been migrated to another instance; " +
"please re-discover its location from the state metadata.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index e890005..15961e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -91,7 +91,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
@Override
public void put(final K key, final V value, final long timestamp) {
- long startNs = time.nanoseconds();
+ final long startNs = time.nanoseconds();
try {
inner.put(keyBytes(key), serdes.rawValue(value), timestamp);
} finally {
@@ -104,6 +104,20 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
}
@Override
+ public V fetch(final K key, final long timestamp) {
+ final long startNs = time.nanoseconds();
+ V ret;
+ try {
+ final byte[] result = inner.fetch(keyBytes(key), timestamp);
+ ret = serdes.valueFrom(result);
+ } finally {
+ metrics.recordLatency(this.fetchTime, startNs, time.nanoseconds());
+ }
+
+ return ret;
+ }
+
+ @Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
return new MeteredWindowStoreIterator<>(inner.fetch(keyBytes(key), timeFrom, timeTo),
fetchTime,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index f1a9c63..58c345a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -96,25 +96,31 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
}
@Override
- public void put(K key, V value) {
+ public void put(final K key, final V value) {
put(key, value, context.timestamp());
}
@Override
- public void put(K key, V value, long timestamp) {
+ public void put(final K key, final V value, final long timestamp) {
maybeUpdateSeqnumForDups();
bytesStore.put(WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes), serdes.rawValue(value));
}
@Override
- public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
+ public V fetch(final K key, final long timestamp) {
+ final byte[] bytesValue = bytesStore.get(WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes));
+ return serdes.valueFrom(bytesValue);
+ }
+
+ @Override
+ public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).valuesIterator();
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) {
+ public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo);
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
}
@@ -126,7 +132,7 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) {
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetchAll(timeFrom, timeTo);
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 739792f..e432baa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -16,10 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
import java.nio.ByteBuffer;
import java.util.List;
@@ -29,11 +27,9 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
private static final int SUFFIX_SIZE = WindowStoreUtils.TIMESTAMP_SIZE + WindowStoreUtils.SEQNUM_SIZE;
private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];
- private StateSerdes<Bytes, byte[]> serdes;
-
@Override
public void init(final String topic) {
- serdes = new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray());
+ // nothing to do
}
@Override
@@ -53,12 +49,12 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
@Override
public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
- return WindowStoreUtils.toBinaryKey(key, Math.max(0, from), 0, serdes);
+ return WindowStoreUtils.toBinaryKey(key.get(), Math.max(0, from), 0);
}
@Override
public Bytes upperRangeFixedSize(final Bytes key, final long to) {
- return WindowStoreUtils.toBinaryKey(key, to, Integer.MAX_VALUE, serdes);
+ return WindowStoreUtils.toBinaryKey(key.get(), to, Integer.MAX_VALUE);
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
index 1ded31f..05016bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
@@ -83,6 +83,11 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
}
@Override
+ public Object fetch(final Object key, final long time) {
+ return null;
+ }
+
+ @Override
public WindowStoreIterator fetch(final Object key, final long timeFrom, final long timeTo) {
return EMPTY_WINDOW_STORE_ITERATOR;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 239a007..5f934b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -96,6 +96,11 @@ public class CachingWindowStoreTest {
cachingStore.put(bytesKey("a"), bytesValue("a"));
cachingStore.put(bytesKey("b"), bytesValue("b"));
+ assertThat(cachingStore.fetch(bytesKey("a"), 10), equalTo(bytesValue("a")));
+ assertThat(cachingStore.fetch(bytesKey("b"), 10), equalTo(bytesValue("b")));
+ assertThat(cachingStore.fetch(bytesKey("c"), 10), equalTo(null));
+ assertThat(cachingStore.fetch(bytesKey("a"), 0), equalTo(null));
+
final WindowStoreIterator<byte[]> a = cachingStore.fetch(bytesKey("a"), 10, 10);
final WindowStoreIterator<byte[]> b = cachingStore.fetch(bytesKey("b"), 10, 10);
verifyKeyValue(a.next(), DEFAULT_TIMESTAMP, "a");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 58fddaa..a241510 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -169,8 +169,7 @@ public class CompositeReadOnlyWindowStoreTest {
@Test
public void shouldFetchKeyRangeAcrossStores() {
- final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new
- ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+ final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
stubProviderTwo.addStore(storeName, secondUnderlying);
underlyingWindowStore.put("a", "a", 0L);
secondUnderlying.put("b", "b", 10L);
@@ -179,7 +178,20 @@ public class CompositeReadOnlyWindowStoreTest {
KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"))));
}
-
+
+ @Test
+ public void shouldFetchKeyValueAcrossStores() {
+ final ReadOnlyWindowStoreStub<String, String> secondUnderlyingWindowStore = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+ stubProviderTwo.addStore(storeName, secondUnderlyingWindowStore);
+ underlyingWindowStore.put("a", "a", 0L);
+ secondUnderlyingWindowStore.put("b", "b", 10L);
+ assertThat(windowStore.fetch("a", 0L), equalTo("a"));
+ assertThat(windowStore.fetch("b", 10L), equalTo("b"));
+ assertThat(windowStore.fetch("c", 10L), equalTo(null));
+ assertThat(windowStore.fetch("a", 10L), equalTo(null));
+ }
+
+
@Test
public void shouldGetAllAcrossStores() {
final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index 256df33..6d911a3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -47,7 +47,16 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
public ReadOnlyWindowStoreStub(long windowSize) {
this.windowSize = windowSize;
}
-
+
+ @Override
+ public V fetch(final K key, final long time) {
+ final Map<K, V> kvMap = data.get(time);
+ if (kvMap != null) {
+ return kvMap.get(key);
+ } else {
+ return null;
+ }
+ }
@Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 39c8f03..f757298 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.MockProcessorContext;
@@ -46,7 +47,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -66,7 +66,7 @@ public class RocksDBWindowStoreTest {
private static final long DEFAULT_CACHE_SIZE_BYTES = 1024 * 1024L;
private final int numSegments = 3;
- private final static long WINDOW_SIZE = 3;
+ private final long windowSize = 3L;
private final String windowName = "window";
private final long segmentSize = Segments.MIN_SEGMENT_INTERVAL;
private final long retentionPeriod = segmentSize * (numSegments - 1);
@@ -95,27 +95,37 @@ public class RocksDBWindowStoreTest {
private final File baseDir = TestUtils.tempDirectory("test");
private final MockProcessorContext context = new MockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
- private WindowStore windowStore;
+ private WindowStore<Integer, String> windowStore;
+
+ private WindowStore<Integer, String> createWindowStore(final ProcessorContext context, final boolean retainDuplicates) {
+ final WindowStore<Integer, String> store = Stores.windowStoreBuilder(
+ Stores.persistentWindowStore(windowName,
+ retentionPeriod,
+ numSegments,
+ windowSize,
+ retainDuplicates),
+ Serdes.Integer(),
+ Serdes.String()).build();
- @SuppressWarnings("unchecked")
- private <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, final boolean enableCaching, final boolean retainDuplicates) {
- final RocksDBWindowStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, retainDuplicates, Serdes.Integer(), Serdes.String(),
- WINDOW_SIZE, true, Collections.<String, String>emptyMap(), enableCaching);
- final WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
store.init(context, store);
return store;
}
+ private WindowStore<Integer, String> createWindowStore(final ProcessorContext context) {
+ return createWindowStore(context, false);
+ }
+
@After
public void closeStore() {
context.close();
- windowStore.close();
+ if (windowStore != null) {
+ windowStore.close();
+ }
}
- @SuppressWarnings("unchecked")
@Test
public void shouldOnlyIterateOpenSegments() {
- windowStore = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context);
long currentTime = 0;
context.setRecordContext(createRecordContext(currentTime));
windowStore.put(1, "one");
@@ -145,38 +155,50 @@ public class RocksDBWindowStoreTest {
return new ProcessorRecordContext(time, 0, 0, "topic");
}
- @SuppressWarnings("unchecked")
@Test
- public void testPutAndFetch() throws IOException {
- windowStore = createWindowStore(context, false, true);
+ public void testRangeAndSinglePointFetch() {
+ windowStore = createWindowStore(context);
long startTime = segmentSize - 4L;
putFirstBatch(windowStore, startTime, context);
- assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - WINDOW_SIZE, startTime + 1L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - WINDOW_SIZE, startTime + 3L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - WINDOW_SIZE, startTime + 4L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - WINDOW_SIZE, startTime + 5L + WINDOW_SIZE)));
+ assertEquals("zero", windowStore.fetch(0, startTime));
+ assertEquals("one", windowStore.fetch(1, startTime + 1L));
+ assertEquals("two", windowStore.fetch(2, startTime + 2L));
+ assertEquals("four", windowStore.fetch(4, startTime + 4L));
+ assertEquals("five", windowStore.fetch(5, startTime + 5L));
+
+ assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize)));
+ assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize)));
putSecondBatch(windowStore, startTime, context);
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L - WINDOW_SIZE, startTime - 2L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L - WINDOW_SIZE, startTime - 1L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L - WINDOW_SIZE, startTime + 1L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L - WINDOW_SIZE, startTime + 3L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L - WINDOW_SIZE, startTime + 4L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L - WINDOW_SIZE, startTime + 5L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L - WINDOW_SIZE, startTime + 6L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L - WINDOW_SIZE, startTime + 7L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - WINDOW_SIZE, startTime + 8L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - WINDOW_SIZE, startTime + 9L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - WINDOW_SIZE, startTime + 10L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - WINDOW_SIZE, startTime + 11L + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - WINDOW_SIZE, startTime + 12L + WINDOW_SIZE)));
+ assertEquals("two+1", windowStore.fetch(2, startTime + 3L));
+ assertEquals("two+2", windowStore.fetch(2, startTime + 4L));
+ assertEquals("two+3", windowStore.fetch(2, startTime + 5L));
+ assertEquals("two+4", windowStore.fetch(2, startTime + 6L));
+ assertEquals("two+5", windowStore.fetch(2, startTime + 7L));
+ assertEquals("two+6", windowStore.fetch(2, startTime + 8L));
+
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
+ assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
+ assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize)));
+ assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize)));
+ assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize)));
+ assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize)));
+ assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize)));
// Flush the store and verify all current entries were properly flushed ...
windowStore.flush();
@@ -192,10 +214,9 @@ public class RocksDBWindowStoreTest {
assertNull(entriesByKey.get(6));
}
- @SuppressWarnings("unchecked")
@Test
- public void shouldGetAll() throws IOException {
- windowStore = createWindowStore(context, false, true);
+ public void shouldGetAll() {
+ windowStore = createWindowStore(context);
long startTime = segmentSize - 4L;
putFirstBatch(windowStore, startTime, context);
@@ -212,10 +233,9 @@ public class RocksDBWindowStoreTest {
);
}
- @SuppressWarnings("unchecked")
@Test
- public void shouldFetchAllInTimeRange() throws IOException {
- windowStore = createWindowStore(context, false, true);
+ public void shouldFetchAllInTimeRange() {
+ windowStore = createWindowStore(context);
long startTime = segmentSize - 4L;
putFirstBatch(windowStore, startTime, context);
@@ -242,10 +262,9 @@ public class RocksDBWindowStoreTest {
);
}
- @SuppressWarnings("unchecked")
@Test
- public void testFetchRange() throws IOException {
- windowStore = createWindowStore(context, false, true);
+ public void testFetchRange() {
+ windowStore = createWindowStore(context);
long startTime = segmentSize - 4L;
putFirstBatch(windowStore, startTime, context);
@@ -258,71 +277,70 @@ public class RocksDBWindowStoreTest {
assertEquals(
Utils.mkList(zero, one),
- StreamsTestUtils.toList(windowStore.fetch(0, 1, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE))
+ StreamsTestUtils.toList(windowStore.fetch(0, 1, startTime + 0L - windowSize, startTime + 0L + windowSize))
);
assertEquals(
Utils.mkList(one),
- StreamsTestUtils.toList(windowStore.fetch(1, 1, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE))
+ StreamsTestUtils.toList(windowStore.fetch(1, 1, startTime + 0L - windowSize, startTime + 0L + windowSize))
);
assertEquals(
Utils.mkList(one, two),
- StreamsTestUtils.toList(windowStore.fetch(1, 3, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE))
+ StreamsTestUtils.toList(windowStore.fetch(1, 3, startTime + 0L - windowSize, startTime + 0L + windowSize))
);
assertEquals(
Utils.mkList(zero, one, two),
- StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE))
+ StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 0L - windowSize, startTime + 0L + windowSize))
);
assertEquals(
Utils.mkList(zero, one, two,
four, five),
- StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE + 5L))
+ StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 0L - windowSize, startTime + 0L + windowSize + 5L))
);
assertEquals(
Utils.mkList(two, four, five),
- StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 2L, startTime + 0L + WINDOW_SIZE + 5L))
+ StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 2L, startTime + 0L + windowSize + 5L))
);
assertEquals(
Utils.mkList(),
- StreamsTestUtils.toList(windowStore.fetch(4, 5, startTime + 2L, startTime + WINDOW_SIZE))
+ StreamsTestUtils.toList(windowStore.fetch(4, 5, startTime + 2L, startTime + windowSize))
);
assertEquals(
Utils.mkList(),
- StreamsTestUtils.toList(windowStore.fetch(0, 3, startTime + 3L, startTime + WINDOW_SIZE + 5))
+ StreamsTestUtils.toList(windowStore.fetch(0, 3, startTime + 3L, startTime + windowSize + 5))
);
}
- @SuppressWarnings("unchecked")
@Test
- public void testPutAndFetchBefore() throws IOException {
- windowStore = createWindowStore(context, false, true);
+ public void testPutAndFetchBefore() {
+ windowStore = createWindowStore(context);
long startTime = segmentSize - 4L;
putFirstBatch(windowStore, startTime, context);
- assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - WINDOW_SIZE, startTime + 0L)));
- assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - WINDOW_SIZE, startTime + 1L)));
- assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - WINDOW_SIZE, startTime + 3L)));
- assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - WINDOW_SIZE, startTime + 4L)));
- assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - WINDOW_SIZE, startTime + 5L)));
+ assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L)));
+ assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L)));
putSecondBatch(windowStore, startTime, context);
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 1L - WINDOW_SIZE, startTime - 1L)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 0L - WINDOW_SIZE, startTime + 0L)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 1L - WINDOW_SIZE, startTime + 1L)));
- assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L)));
- assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime + 3L - WINDOW_SIZE, startTime + 3L)));
- assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 4L - WINDOW_SIZE, startTime + 4L)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 5L - WINDOW_SIZE, startTime + 5L)));
- assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 6L - WINDOW_SIZE, startTime + 6L)));
- assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 7L - WINDOW_SIZE, startTime + 7L)));
- assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - WINDOW_SIZE, startTime + 8L)));
- assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - WINDOW_SIZE, startTime + 9L)));
- assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - WINDOW_SIZE, startTime + 10L)));
- assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - WINDOW_SIZE, startTime + 11L)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - WINDOW_SIZE, startTime + 12L)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 13L - WINDOW_SIZE, startTime + 13L)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 0L - windowSize, startTime + 0L)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+ assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
+ assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
+ assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
+ assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
+ assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
+ assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
+ assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 13L - windowSize, startTime + 13L)));
// Flush the store and verify all current entries were properly flushed ...
windowStore.flush();
@@ -338,38 +356,37 @@ public class RocksDBWindowStoreTest {
assertNull(entriesByKey.get(6));
}
- @SuppressWarnings("unchecked")
@Test
- public void testPutAndFetchAfter() throws IOException {
- windowStore = createWindowStore(context, false, true);
+ public void testPutAndFetchAfter() {
+ windowStore = createWindowStore(context);
long startTime = segmentSize - 4L;
putFirstBatch(windowStore, startTime, context);
- assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L, startTime + 0L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L, startTime + 1L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L, startTime + 3L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L, startTime + 4L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L, startTime + 5L + WINDOW_SIZE)));
+ assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L, startTime + 0L + windowSize)));
+ assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L, startTime + 4L + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L, startTime + 5L + windowSize)));
putSecondBatch(windowStore, startTime, context);
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L, startTime - 2L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L, startTime - 1L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime, startTime + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L, startTime + 1L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L, startTime + 3L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L, startTime + 4L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L, startTime + 5L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L, startTime + 6L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L, startTime + 7L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 8L, startTime + 8L + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 9L, startTime + 9L + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 10L, startTime + 10L + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 11L, startTime + 11L + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L, startTime + 12L + WINDOW_SIZE)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L, startTime - 2L + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L, startTime - 1L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime, startTime + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
+ assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
+ assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
+ assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
+ assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 8L, startTime + 8L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 9L, startTime + 9L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 10L, startTime + 10L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 11L, startTime + 11L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L, startTime + 12L + windowSize)));
// Flush the store and verify all current entries were properly flushed ...
windowStore.flush();
@@ -385,26 +402,25 @@ public class RocksDBWindowStoreTest {
assertNull(entriesByKey.get(6));
}
- @SuppressWarnings("unchecked")
@Test
- public void testPutSameKeyTimestamp() throws IOException {
- windowStore = createWindowStore(context, false, true);
+ public void testPutSameKeyTimestamp() {
+ windowStore = createWindowStore(context, true);
long startTime = segmentSize - 4L;
context.setRecordContext(createRecordContext(startTime));
windowStore.put(0, "zero");
- assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
+ assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
windowStore.put(0, "zero");
windowStore.put(0, "zero+");
windowStore.put(0, "zero++");
- assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
- assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 1L - WINDOW_SIZE, startTime + 1L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 2L - WINDOW_SIZE, startTime + 2L + WINDOW_SIZE)));
- assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 3L - WINDOW_SIZE, startTime + 3L + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime + 4L - WINDOW_SIZE, startTime + 4L + WINDOW_SIZE)));
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize)));
// Flush the store and verify all current entries were properly flushed ...
windowStore.flush();
@@ -414,11 +430,9 @@ public class RocksDBWindowStoreTest {
assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
}
-
- @SuppressWarnings("unchecked")
@Test
- public void testRolling() throws IOException {
- windowStore = createWindowStore(context, false, true);
+ public void testRolling() {
+ windowStore = createWindowStore(context);
// to validate segments
final Segments segments = new Segments(windowName, retentionPeriod, numSegments);
@@ -450,12 +464,12 @@ public class RocksDBWindowStoreTest {
segments.segmentName(3),
segments.segmentName(4)), segmentDirs(baseDir));
- assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
- assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
+ assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
context.setRecordContext(createRecordContext(startTime + incr * 6));
windowStore.put(6, "six");
@@ -464,13 +478,13 @@ public class RocksDBWindowStoreTest {
segments.segmentName(5)), segmentDirs(baseDir));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
context.setRecordContext(createRecordContext(startTime + incr * 7));
@@ -479,14 +493,14 @@ public class RocksDBWindowStoreTest {
segments.segmentName(4),
segments.segmentName(5)), segmentDirs(baseDir));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
- assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+ assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
context.setRecordContext(createRecordContext(startTime + incr * 8));
windowStore.put(8, "eight");
@@ -495,15 +509,15 @@ public class RocksDBWindowStoreTest {
segments.segmentName(6)), segmentDirs(baseDir));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - WINDOW_SIZE, startTime + incr * 8 + WINDOW_SIZE)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+ assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+ assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
// check segment directories
windowStore.flush();
@@ -514,14 +528,12 @@ public class RocksDBWindowStoreTest {
}
-
- @SuppressWarnings("unchecked")
@Test
public void testRestore() throws IOException {
long startTime = segmentSize * 2;
long incr = segmentSize / 2;
- windowStore = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context);
context.setRecordContext(createRecordContext(startTime));
windowStore.put(0, "zero");
context.setRecordContext(createRecordContext(startTime + incr));
@@ -547,28 +559,28 @@ public class RocksDBWindowStoreTest {
// remove local store image
Utils.delete(baseDir);
- windowStore = createWindowStore(context, false, true);
- assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(8, startTime + incr * 8 - WINDOW_SIZE, startTime + incr * 8 + WINDOW_SIZE)));
+ windowStore = createWindowStore(context);
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
context.restore(windowName, changeLog);
- assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
- assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE)));
- assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - WINDOW_SIZE, startTime + incr * 8 + WINDOW_SIZE)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+ assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+ assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
// check segment directories
windowStore.flush();
@@ -578,10 +590,9 @@ public class RocksDBWindowStoreTest {
);
}
- @SuppressWarnings("unchecked")
@Test
- public void testSegmentMaintenance() throws IOException {
- windowStore = createWindowStore(context, false, true);
+ public void testSegmentMaintenance() {
+ windowStore = createWindowStore(context, true);
context.setTime(0L);
context.setRecordContext(createRecordContext(0));
windowStore.put(0, "v");
@@ -655,12 +666,11 @@ public class RocksDBWindowStoreTest {
}
- @SuppressWarnings("unchecked")
@Test
- public void testInitialLoading() throws IOException {
+ public void testInitialLoading() {
File storeDir = new File(baseDir, windowName);
- windowStore = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context);
new File(storeDir, segments.segmentName(0L)).mkdir();
new File(storeDir, segments.segmentName(1L)).mkdir();
@@ -671,7 +681,7 @@ public class RocksDBWindowStoreTest {
new File(storeDir, segments.segmentName(6L)).mkdir();
windowStore.close();
- windowStore = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context);
assertEquals(
Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
@@ -690,10 +700,9 @@ public class RocksDBWindowStoreTest {
);
}
- @SuppressWarnings("unchecked")
@Test
public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() {
- windowStore = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context);
context.setRecordContext(createRecordContext(0));
windowStore.put(1, "one", 1L);
windowStore.put(1, "two", 2L);
@@ -717,24 +726,19 @@ public class RocksDBWindowStoreTest {
}
}
- @SuppressWarnings("unchecked")
@Test
public void shouldFetchAndIterateOverExactKeys() {
final long windowSize = 0x7a00000000000000L;
final long retentionPeriod = 0x7a00000000000000L;
- final RocksDBWindowStoreSupplier<String, String> supplier =
- new RocksDBWindowStoreSupplier<>(
- "window",
- retentionPeriod, 2,
- true,
- Serdes.String(),
- Serdes.String(),
- windowSize,
- true,
- Collections.<String, String>emptyMap(),
- false);
-
- windowStore = supplier.get();
+ final WindowStore<String, String> windowStore = Stores.windowStoreBuilder(
+ Stores.persistentWindowStore(windowName,
+ retentionPeriod,
+ 2,
+ windowSize,
+ true),
+ Serdes.String(),
+ Serdes.String()).build();
+
windowStore.init(context, windowStore);
windowStore.put("a", "0001", 0);
@@ -763,50 +767,45 @@ public class RocksDBWindowStoreTest {
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerExceptionOnPutNullKey() {
- windowStore = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context);
windowStore.put(null, "anyValue");
}
@Test
public void shouldNotThrowNullPointerExceptionOnPutNullValue() {
- windowStore = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context);
windowStore.put(1, null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerExceptionOnGetNullKey() {
- windowStore = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context);
windowStore.fetch(null, 1L, 2L);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
- windowStore = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context);
windowStore.fetch(null, 2, 1L, 2L);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
- windowStore = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context);
windowStore.fetch(1, null, 1L, 2L);
}
- @SuppressWarnings("unchecked")
@Test
public void shouldFetchAndIterateOverExactBinaryKeys() {
- final RocksDBWindowStoreSupplier<Bytes, String> supplier =
- new RocksDBWindowStoreSupplier<>(
- "window",
- 60000, 2,
- true,
- Serdes.Bytes(),
- Serdes.String(),
+ final WindowStore<Bytes, String> windowStore = Stores.windowStoreBuilder(
+ Stores.persistentWindowStore(windowName,
+ 60000,
+ 2,
60000,
- true,
- Collections.<String, String>emptyMap(),
- false);
+ true),
+ Serdes.Bytes(),
+ Serdes.String()).build();
- windowStore = supplier.get();
windowStore.init(context, windowStore);
final Bytes key1 = Bytes.wrap(new byte[]{0});
@@ -891,11 +890,11 @@ public class RocksDBWindowStoreTest {
return entriesByKey;
}
- private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) {
- return windowedPair(key, value, timestamp, WINDOW_SIZE);
+ private <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) {
+ return windowedPair(key, value, timestamp, windowSize);
}
- private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp, long windowSize) {
+ private <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp, long windowSize) {
return KeyValue.pair(new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)), value);
}
}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.