You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/29 18:55:52 UTC
kafka git commit: KAFKA-4499: Add all() and fetchAll() API for
querying window store
Repository: kafka
Updated Branches:
refs/heads/trunk 58877a0de -> 0cc32abc1
KAFKA-4499: Add all() and fetchAll() API for querying window store
A rebased version of the code.
Author: RichardYuSTUG <yo...@gmail.com>
Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #4258 from ConcurrencyPractitioner/trunk
Add EmptyWindowStoreIterator to NoOpWindowStore
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0cc32abc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0cc32abc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0cc32abc
Branch: refs/heads/trunk
Commit: 0cc32abc17d26c5faf6fdc3b63a01016e05a65b9
Parents: 58877a0
Author: Richard Yu <yo...@gmail.com>
Authored: Wed Nov 29 10:45:14 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 29 10:55:43 2017 -0800
----------------------------------------------------------------------
.../streams/state/ReadOnlyWindowStore.java | 19 ++++
.../state/internals/CachingWindowStore.java | 35 ++++++++
.../ChangeLoggingWindowBytesStore.java | 11 ++-
.../internals/CompositeReadOnlyWindowStore.java | 28 ++++++
.../state/internals/MeteredWindowStore.java | 14 +++
.../streams/state/internals/NamedCache.java | 5 ++
.../internals/RocksDBSegmentedBytesStore.java | 21 ++++-
.../streams/state/internals/RocksDBStore.java | 22 +++++
.../state/internals/RocksDBWindowStore.java | 12 +++
.../kafka/streams/state/internals/Segment.java | 7 +-
.../state/internals/SegmentIterator.java | 14 +--
.../state/internals/SegmentedBytesStore.java | 20 +++++
.../state/internals/SegmentedCacheFunction.java | 2 +-
.../kafka/streams/state/internals/Segments.java | 16 ++++
.../state/internals/SessionKeySchema.java | 5 +-
.../streams/state/internals/ThreadCache.java | 3 +-
.../state/internals/WindowKeySchema.java | 5 +-
.../kafka/streams/state/NoOpWindowStore.java | 44 +++++++++-
.../state/internals/CachingWindowStoreTest.java | 50 ++++++++++-
.../CompositeReadOnlyWindowStoreTest.java | 26 ++++++
.../internals/ReadOnlyWindowStoreStub.java | 92 ++++++++++++++++++++
.../RocksDBSegmentedBytesStoreTest.java | 50 +++++++++++
.../state/internals/RocksDBWindowStoreTest.java | 50 +++++++++++
.../state/internals/SessionKeySchemaTest.java | 7 ++
.../state/internals/WindowKeySchemaTest.java | 38 ++++++++
.../kafka/test/SegmentedBytesStoreStub.java | 12 +++
26 files changed, 589 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index cff1f6c..f92ab6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -77,4 +77,23 @@ public interface ReadOnlyWindowStore<K, V> {
* @throws NullPointerException If null is used for any key.
*/
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
+
+ /**
+ * Gets all the key-value pairs in the existing windows.
+ *
+ * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
+ * @throws InvalidStateStoreException if the store is not initialized
+ */
+ KeyValueIterator<Windowed<K>, V> all();
+
+ /**
+ * Gets all the key-value pairs that belong to the windows within in the given time range.
+ *
+ * @param timeFrom the beginning of the time slot from which to search
+ * @param timeTo the end of the time slot from which to search
+ * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
+ * @throws InvalidStateStoreException if the store is not initialized
+ * @throws NullPointerException if null is used for any key
+ */
+ KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 31ca68b..75acd77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -216,5 +216,40 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
}
}
}
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+ validateStoreOpen();
+ final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = underlying.all();
+ final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name);
+
+ return new MergedSortedCacheWindowStoreKeyValueIterator(
+ cacheIterator,
+ underlyingIterator,
+ bytesSerdes,
+ windowSize,
+ cacheFunction
+ );
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
+ validateStoreOpen();
+
+ final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = underlying.fetchAll(timeFrom, timeTo);
+ final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name);
+
+ final HasNextCondition hasNextCondition = keySchema.hasNextCondition(null, null, timeFrom, timeTo);
+ final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator,
+ hasNextCondition,
+ cacheFunction);
+ return new MergedSortedCacheWindowStoreKeyValueIterator(
+ filteredCacheIterator,
+ underlyingIterator,
+ bytesSerdes,
+ windowSize,
+ cacheFunction
+ );
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 0035019..4fe4b99 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -56,7 +56,16 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
return bytesStore.fetch(keyFrom, keyTo, from, to);
}
-
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+ return bytesStore.all();
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
+ return bytesStore.fetchAll(timeFrom, timeTo);
+ }
+
@Override
public void put(final Bytes key, final byte[] value) {
put(key, value, context.timestamp());
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index 5acb6b4..6afc6fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -80,4 +80,32 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
provider.stores(storeName, windowStoreType).iterator(),
nextIteratorFunction));
}
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> all() {
+ final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>>() {
+ @Override
+ public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlyWindowStore<K, V> store) {
+ return store.all();
+ }
+ };
+ return new DelegatingPeekingKeyValueIterator<>(storeName,
+ new CompositeKeyValueIterator<>(
+ provider.stores(storeName, windowStoreType).iterator(),
+ nextIteratorFunction));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
+ final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>>() {
+ @Override
+ public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlyWindowStore<K, V> store) {
+ return store.fetchAll(timeFrom, timeTo);
+ }
+ };
+ return new DelegatingPeekingKeyValueIterator<>(storeName,
+ new CompositeKeyValueIterator<>(
+ provider.stores(storeName, windowStoreType).iterator(),
+ nextIteratorFunction));
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 20c7c43..e890005 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -113,6 +113,20 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
}
@Override
+ public KeyValueIterator<Windowed<K>, V> all() {
+ return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime, metrics, serdes, time);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
+ return new MeteredWindowedKeyValueIterator<>(inner.fetchAll(timeFrom, timeTo),
+ fetchTime,
+ metrics,
+ serdes,
+ time);
+ }
+
+ @Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo),
fetchTime,
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 47d5152..f838c55 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -32,6 +32,7 @@ import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -278,6 +279,10 @@ class NamedCache {
synchronized Iterator<Bytes> allKeys() {
return keySetIterator(cache.navigableKeySet());
}
+
+ synchronized NavigableSet<Bytes> keySet() {
+ return cache.navigableKeySet();
+ }
synchronized LRUCacheEntry first() {
if (head == null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 4d4ee41..865703c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -65,7 +65,26 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
keySchema.hasNextCondition(keyFrom, keyTo, from, to),
binaryFrom, binaryTo);
}
-
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> all() {
+
+ final List<Segment> searchSpace = segments.allSegments();
+
+ return new SegmentIterator(searchSpace.iterator(),
+ keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE),
+ null, null);
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom, final long timeTo) {
+ final List<Segment> searchSpace = segments.segments(timeFrom, timeTo);
+
+ return new SegmentIterator(searchSpace.iterator(),
+ keySchema.hasNextCondition(null, null, timeFrom, timeTo),
+ null, null);
+ }
+
@Override
public void remove(final Bytes key) {
final Segment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index ea01694..4c7e61b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -387,6 +387,28 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
return rocksDbIterator;
}
+ public synchronized KeyValue<K, V> first() {
+ validateStoreOpen();
+
+ RocksIterator innerIter = db.newIterator();
+ innerIter.seekToFirst();
+ KeyValue<K, V> pair = new KeyValue<>(serdes.keyFrom(innerIter.key()), serdes.valueFrom(innerIter.value()));
+ innerIter.close();
+
+ return pair;
+ }
+
+ public synchronized KeyValue<K, V> last() {
+ validateStoreOpen();
+
+ RocksIterator innerIter = db.newIterator();
+ innerIter.seekToLast();
+ KeyValue<K, V> pair = new KeyValue<>(serdes.keyFrom(innerIter.key()), serdes.valueFrom(innerIter.value()));
+ innerIter.close();
+
+ return pair;
+ }
+
/**
* Return an approximate count of key-value mappings in this store.
*
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index b7dd532..f1a9c63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -118,6 +118,18 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo);
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
}
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> all() {
+ final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all();
+ return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) {
+ final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetchAll(timeFrom, timeTo);
+ return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
+ }
void maybeUpdateSeqnumForDups() {
if (retainDuplicates) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index b9cc8b7..50c1547 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import java.io.IOException;
// Use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures
-class Segment extends RocksDBStore<Bytes, byte[]> {
+class Segment extends RocksDBStore<Bytes, byte[]> implements Comparable<Segment> {
public final long id;
Segment(String segmentName, String windowName, long id) {
@@ -36,6 +36,11 @@ class Segment extends RocksDBStore<Bytes, byte[]> {
}
@Override
+ public int compareTo(Segment segment) {
+ return Long.compare(id, segment.id);
+ }
+
+ @Override
public void openDB(final ProcessorContext context) {
super.openDB(context);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
index 9d0b6cd..099cba1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
@@ -32,11 +32,11 @@ class SegmentIterator implements KeyValueIterator<Bytes, byte[]> {
private final Bytes from;
private final Bytes to;
- private final Iterator<Segment> segments;
- private final HasNextCondition hasNextCondition;
+ protected final Iterator<Segment> segments;
+ protected final HasNextCondition hasNextCondition;
- private KeyValueStore<Bytes, byte[]> currentSegment;
- private KeyValueIterator<Bytes, byte[]> currentIterator;
+ protected KeyValueStore<Bytes, byte[]> currentSegment;
+ protected KeyValueIterator<Bytes, byte[]> currentIterator;
SegmentIterator(final Iterator<Segment> segments,
final HasNextCondition hasNextCondition,
@@ -71,7 +71,11 @@ class SegmentIterator implements KeyValueIterator<Bytes, byte[]> {
close();
currentSegment = segments.next();
try {
- currentIterator = currentSegment.range(from, to);
+ if (from == null || to == null) {
+ currentIterator = currentSegment.all();
+ } else {
+ currentIterator = currentSegment.range(from, to);
+ }
} catch (InvalidStateStoreException e) {
// segment may have been closed so we ignore it.
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index 72ae6e2..19cf319 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -49,6 +50,25 @@ public interface SegmentedBytesStore extends StateStore {
* @return an iterator over key-value pairs
*/
KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to);
+
+ /**
+ * Gets all the key-value pairs in the existing windows.
+ *
+ * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
+ * @throws InvalidStateStoreException if the store is not initialized
+ */
+ KeyValueIterator<Bytes, byte[]> all();
+
+ /**
+ * Gets all the key-value pairs that belong to the windows within in the given time range.
+ *
+ * @param timeFrom the beginning of the time slot from which to search
+ * @param timeTo the end of the time slot from which to search
+ * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
+ * @throws InvalidStateStoreException if the store is not initialized
+ * @throws NullPointerException if null is used for any key
+ */
+ KeyValueIterator<Bytes, byte[]> fetchAll(final long from, final long to);
/**
* Remove the record with the provided key. The key
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
index 8571f92..3ab7930 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
@@ -52,7 +52,7 @@ class SegmentedCacheFunction implements CacheFunction {
System.arraycopy(cacheKey.get(), SEGMENT_ID_BYTES, binaryKey, 0, binaryKey.length);
return binaryKey;
}
-
+
public long segmentId(Bytes key) {
return keySchema.segmentTimestamp(key) / segmentInterval;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 5993972..2cf0913 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -28,6 +28,7 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SimpleTimeZone;
@@ -143,6 +144,21 @@ class Segments {
return segments;
}
+ List<Segment> allSegments() {
+ final List<Segment> segments = new ArrayList<>();
+ for (Segment segment : this.segments.values()) {
+ if (segment.isOpen()) {
+ try {
+ segments.add(segment);
+ } catch (InvalidStateStoreException ise) {
+ // segment may have been closed by streams thread;
+ }
+ }
+ }
+ Collections.sort(segments);
+ return segments;
+ }
+
void flush() {
for (Segment segment : segments.values()) {
segment.flush();
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index 6d6d9bf..e3dd553 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -79,8 +79,8 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema {
while (iterator.hasNext()) {
final Bytes bytes = iterator.peekNextKey();
final Windowed<Bytes> windowedKey = SessionKeySerde.fromBytes(bytes);
- if (windowedKey.key().compareTo(binaryKeyFrom) >= 0
- && windowedKey.key().compareTo(binaryKeyTo) <= 0
+ if ((binaryKeyFrom == null || windowedKey.key().compareTo(binaryKeyFrom) >= 0)
+ && (binaryKeyTo == null || windowedKey.key().compareTo(binaryKeyTo) <= 0)
&& windowedKey.window().end() >= from
&& windowedKey.window().start() <= to) {
return true;
@@ -96,5 +96,4 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema {
public List<Segment> segmentsToSearch(final Segments segments, final long from, final long to) {
return segments.segments(from, Long.MAX_VALUE);
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 01a4bef..b1fd198 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -195,8 +195,7 @@ public class ThreadCache {
}
return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache);
}
-
-
+
public long size() {
long size = 0;
for (NamedCache cache : caches.values()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 214f36b..739792f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -75,8 +75,8 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
final Bytes bytes = iterator.peekNextKey();
final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
- if (keyBytes.compareTo(binaryKeyFrom) >= 0
- && keyBytes.compareTo(binaryKeyTo) <= 0
+ if ((binaryKeyFrom == null || keyBytes.compareTo(binaryKeyFrom) >= 0)
+ && (binaryKeyTo == null || keyBytes.compareTo(binaryKeyTo) <= 0)
&& time >= from
&& time <= to) {
return true;
@@ -92,4 +92,5 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
public List<Segment> segmentsToSearch(final Segments segments, final long from, final long to) {
return segments.segments(from, to);
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
index 3ad6475..1ded31f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
@@ -20,8 +20,38 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import java.util.NoSuchElementException;
+
public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
+ private static class EmptyWindowStoreIterator implements WindowStoreIterator<KeyValue> {
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Long peekNextKey() {
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public KeyValue<Long, KeyValue> next() {
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void remove() {
+ }
+ }
+
+ private static final WindowStoreIterator<KeyValue> EMPTY_WINDOW_STORE_ITERATOR = new EmptyWindowStoreIterator();
+
@Override
public String name() {
return "";
@@ -54,11 +84,21 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
@Override
public WindowStoreIterator fetch(final Object key, final long timeFrom, final long timeTo) {
- return null;
+ return EMPTY_WINDOW_STORE_ITERATOR;
}
@Override
public WindowStoreIterator<KeyValue> fetch(Object from, Object to, long timeFrom, long timeTo) {
- return null;
+ return EMPTY_WINDOW_STORE_ITERATOR;
+ }
+
+ @Override
+ public WindowStoreIterator<KeyValue> all() {
+ return EMPTY_WINDOW_STORE_ITERATOR;
+ }
+
+ @Override
+ public WindowStoreIterator<KeyValue> fetchAll(long timeFrom, long timeTo) {
+ return EMPTY_WINDOW_STORE_ITERATOR;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index f1a0038..239a007 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -129,7 +129,55 @@ public class CachingWindowStoreTest {
assertFalse(iterator.hasNext());
assertEquals(2, cache.size());
}
-
+
+ @Test
+ public void shouldGetAllFromCache() {
+ cachingStore.put(bytesKey("a"), bytesValue("a"));
+ cachingStore.put(bytesKey("b"), bytesValue("b"));
+ cachingStore.put(bytesKey("c"), bytesValue("c"));
+ cachingStore.put(bytesKey("d"), bytesValue("d"));
+ cachingStore.put(bytesKey("e"), bytesValue("e"));
+ cachingStore.put(bytesKey("f"), bytesValue("f"));
+ cachingStore.put(bytesKey("g"), bytesValue("g"));
+ cachingStore.put(bytesKey("h"), bytesValue("h"));
+
+ final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.all();
+ String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
+ for (String s : array) {
+ verifyWindowedKeyValue(iterator.next(), new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), s);
+ }
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void shouldFetchAllWithinTimestampRange() {
+ String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
+ for (int i = 0; i < array.length; i++) {
+ context.setTime(i);
+ cachingStore.put(bytesKey(array[i]), bytesValue(array[i]));
+ }
+
+ final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.fetchAll(0, 7);
+ for (int i = 0; i < array.length; i++) {
+ String str = array[i];
+ verifyWindowedKeyValue(iterator.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
+ }
+ assertFalse(iterator.hasNext());
+
+ final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 = cachingStore.fetchAll(2, 4);
+ for (int i = 2; i <= 4; i++) {
+ String str = array[i];
+ verifyWindowedKeyValue(iterator1.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
+ }
+ assertFalse(iterator1.hasNext());
+
+ final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 = cachingStore.fetchAll(5, 7);
+ for (int i = 5; i <= 7; i++) {
+ String str = array[i];
+ verifyWindowedKeyValue(iterator2.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
+ }
+ assertFalse(iterator2.hasNext());
+ }
@Test
public void shouldFlushEvictedItemsIntoUnderlyingStore() throws IOException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index f887858..58fddaa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -179,6 +179,32 @@ public class CompositeReadOnlyWindowStoreTest {
KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"))));
}
+
+ @Test
+ public void shouldGetAllAcrossStores() {
+ final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new
+ ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+ stubProviderTwo.addStore(storeName, secondUnderlying);
+ underlyingWindowStore.put("a", "a", 0L);
+ secondUnderlying.put("b", "b", 10L);
+ List<KeyValue<Windowed<String>, String>> results = StreamsTestUtils.toList(windowStore.all());
+ assertThat(results, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+ KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"))));
+ }
+
+ @Test
+ public void shouldFetchAllAcrossStores() {
+ final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new
+ ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+ stubProviderTwo.addStore(storeName, secondUnderlying);
+ underlyingWindowStore.put("a", "a", 0L);
+ secondUnderlying.put("b", "b", 10L);
+ List<KeyValue<Windowed<String>, String>> results = StreamsTestUtils.toList(windowStore.fetchAll(0, 10));
+ assertThat(results, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+ KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"))));
+ }
@Test(expected = NullPointerException.class)
public void shouldThrowNPEIfKeyIsNull() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index 6974240..256df33 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -47,6 +47,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
public ReadOnlyWindowStoreStub(long windowSize) {
this.windowSize = windowSize;
}
+
@Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
@@ -64,6 +65,97 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
}
@Override
+ public KeyValueIterator<Windowed<K>, V> all() {
+ if (!open) {
+ throw new InvalidStateStoreException("Store is not open");
+ }
+ final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>();
+ for (long now : data.keySet()) {
+ final NavigableMap<K, V> kvMap = data.get(now);
+ if (kvMap != null) {
+ for (Entry<K, V> entry : kvMap.entrySet()) {
+ results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue()));
+ }
+ }
+ }
+ final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
+
+ return new KeyValueIterator<Windowed<K>, V>() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Windowed<K> peekNextKey() {
+ throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public KeyValue<Windowed<K>, V> next() {
+ return iterator.next();
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove() not supported in " + getClass().getName());
+ }
+ };
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) {
+ if (!open) {
+ throw new InvalidStateStoreException("Store is not open");
+ }
+ final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>();
+ for (long now : data.keySet()) {
+ if (!(now >= timeFrom && now <= timeTo)) continue;
+ final NavigableMap<K, V> kvMap = data.get(now);
+ if (kvMap != null) {
+ for (Entry<K, V> entry : kvMap.entrySet()) {
+ results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue()));
+ }
+ }
+ }
+ final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
+
+ return new KeyValueIterator<Windowed<K>, V>() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Windowed<K> peekNextKey() {
+ throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public KeyValue<Windowed<K>, V> next() {
+ return iterator.next();
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove() not supported in " + getClass().getName());
+ }
+ };
+ }
+
+ @Override
public KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) {
if (!open) {
throw new InvalidStateStoreException("Store is not open");
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index bf3386d..d8d291c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -156,6 +156,55 @@ public class RocksDBSegmentedBytesStoreTest {
}
@Test
+ public void shouldGetAllSegments() {
+ // just to validate directories
+ final Segments segments = new Segments(storeName, retention, numSegments);
+ final String key = "a";
+ bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+ assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
+
+ bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
+ assertEquals(Utils.mkSet(segments.segmentName(0),
+ segments.segmentName(1)), segmentDirs());
+
+ bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L));
+ assertEquals(Utils.mkSet(segments.segmentName(0),
+ segments.segmentName(1),
+ segments.segmentName(2)), segmentDirs());
+
+ final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
+ assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
+ KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L),
+ KeyValue.pair(new Windowed<>(key, new SessionWindow(61000L, 120000L)), 200L)
+ ), results);
+
+ }
+
+ @Test
+ public void shouldFetchAllSegments() {
+ // just to validate directories
+ final Segments segments = new Segments(storeName, retention, numSegments);
+ final String key = "a";
+ bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+ assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
+
+ bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
+ assertEquals(Utils.mkSet(segments.segmentName(0),
+ segments.segmentName(1)), segmentDirs());
+
+ bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L));
+ assertEquals(Utils.mkSet(segments.segmentName(0),
+ segments.segmentName(1),
+ segments.segmentName(2)), segmentDirs());
+
+ final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetchAll(0L, 60000L));
+ assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
+ KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L)
+ ), results);
+
+ }
+
+ @Test
public void shouldLoadSegementsWithOldStyleDateFormattedName() {
final Segments segments = new Segments(storeName, retention, numSegments);
final String key = "a";
@@ -209,6 +258,7 @@ public class RocksDBSegmentedBytesStoreTest {
KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
}
+
private Set<String> segmentDirs() {
File windowDir = new File(stateDir, storeName);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 7736d9d..3b7e2c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -190,6 +190,56 @@ public class RocksDBWindowStoreTest {
assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
assertNull(entriesByKey.get(6));
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldGetAll() throws IOException {
+ windowStore = createWindowStore(context, false, true);
+ long startTime = segmentSize - 4L;
+
+ putFirstBatch(windowStore, startTime, context);
+
+ final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
+ final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
+ final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
+ final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
+ final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+
+ assertEquals(
+ Utils.mkList(zero, one, two, four, five),
+ StreamsTestUtils.toList(windowStore.all())
+ );
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldFetchAllInTimeRange() throws IOException {
+ windowStore = createWindowStore(context, false, true);
+ long startTime = segmentSize - 4L;
+
+ putFirstBatch(windowStore, startTime, context);
+
+ final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
+ final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
+ final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
+ final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
+ final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+
+ assertEquals(
+ Utils.mkList(one, two, four),
+ StreamsTestUtils.toList(windowStore.fetchAll(startTime + 1, startTime + 4))
+ );
+
+ assertEquals(
+ Utils.mkList(zero, one, two),
+ StreamsTestUtils.toList(windowStore.fetchAll(startTime + 0, startTime + 3))
+ );
+
+ assertEquals(
+ Utils.mkList(one, two, four, five),
+ StreamsTestUtils.toList(windowStore.fetchAll(startTime + 1, startTime + 5))
+ );
+ }
@SuppressWarnings("unchecked")
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
index c3f52c9..3b731f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
@@ -66,6 +66,13 @@ public class SessionKeySchemaTest {
}
@Test
+ public void shouldFetchAllKeysUsingNullKeys() {
+ final HasNextCondition hasNextCondition = sessionKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE);
+ final List<Integer> results = getValues(hasNextCondition);
+ assertThat(results, equalTo(Arrays.asList(1, 2, 3, 4, 5, 6)));
+ }
+
+ @Test
public void testUpperBoundWithLargeTimestamps() {
Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
index 9720a9f..d75cca0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -18,14 +18,44 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.test.KeyValueIteratorStub;
+import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
public class WindowKeySchemaTest {
private final WindowKeySchema windowKeySchema = new WindowKeySchema();
+ private DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator;
+
+ @Before
+ public void before() {
+ windowKeySchema.init("topic");
+ final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1),
+ KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2),
+ KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3),
+ KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20))), 4),
+ KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20))), 5),
+ KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20))), 6));
+ iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator()));
+ }
+
+ @Test
+ public void testHasNextConditionUsingNullKeys() {
+ final HasNextCondition hasNextCondition = windowKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE);
+ final List<Integer> results = getValues(hasNextCondition);
+ assertThat(results, equalTo(Arrays.asList(1, 2, 3, 4, 5, 6)));
+ }
@Test
public void testUpperBoundWithLargeTimestamps() {
@@ -128,4 +158,12 @@ public class WindowKeySchemaTest {
assertThat(lower, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, 0)));
}
+
+ private List<Integer> getValues(final HasNextCondition hasNextCondition) {
+ final List<Integer> results = new ArrayList<>();
+ while (hasNextCondition.hasNext(iterator)) {
+ results.add(iterator.next().value);
+ }
+ return results;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
index 7f8457c..4709ef3 100644
--- a/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
@@ -58,6 +58,18 @@ public class SegmentedBytesStoreStub implements SegmentedBytesStore {
fetchCalled = true;
return new KeyValueIteratorStub<>(Collections.<KeyValue<Bytes, byte[]>>emptyIterator());
}
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> all() {
+ fetchCalled = true;
+ return new KeyValueIteratorStub<>(Collections.<KeyValue<Bytes, byte[]>>emptyIterator());
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> fetchAll(long timeFrom, long timeTo) {
+ fetchCalled = true;
+ return new KeyValueIteratorStub<>(Collections.<KeyValue<Bytes, byte[]>>emptyIterator());
+ }
@Override
public void remove(final Bytes key) {