You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/09/22 16:16:53 UTC
[kafka] branch trunk updated: KAFKA-13211: add support for infinite
range query for WindowStore (#11227)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b61ec00 KAFKA-13211: add support for infinite range query for WindowStore (#11227)
b61ec00 is described below
commit b61ec0003f907b61243102811fdb2e92f8d7d2c5
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Thu Sep 23 00:14:58 2021 +0800
KAFKA-13211: add support for infinite range query for WindowStore (#11227)
Add support for infinite range query for WindowStore. Story JIRA: https://issues.apache.org/jira/browse/KAFKA-13210
Reviewers: Patrick Stuedi <ps...@gmail.com>, Guozhang Wang <wa...@gmail.com>
---
.../kafka/streams/state/ReadOnlyWindowStore.java | 6 +-
.../apache/kafka/streams/state/WindowStore.java | 3 +-
.../AbstractRocksDBSegmentedBytesStore.java | 6 +-
.../state/internals/CachingWindowStore.java | 20 +--
.../internals/CompositeReadOnlyWindowStore.java | 4 -
.../state/internals/InMemoryWindowStore.java | 40 +++--
.../state/internals/MeteredWindowStore.java | 16 +-
.../streams/state/internals/SegmentIterator.java | 14 +-
.../AbstractRocksDBSegmentedBytesStoreTest.java | 136 ++++++++++++++++-
.../internals/AbstractWindowBytesStoreTest.java | 58 +++++--
.../CachingPersistentWindowStoreTest.java | 167 ++++++++++++++++++---
.../CompositeReadOnlyWindowStoreTest.java | 103 +++++++++++--
.../state/internals/MeteredWindowStoreTest.java | 39 +++--
.../state/internals/ReadOnlyWindowStoreStub.java | 32 +++-
.../state/internals/SegmentIteratorTest.java | 145 ++++++++++++++++++
.../state/internals/WindowStoreFetchTest.java | 81 ++++++++++
.../org/apache/kafka/test/StreamsTestUtils.java | 20 +++
17 files changed, 773 insertions(+), 117 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index aa84bfc..3df170d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -128,12 +128,13 @@ public interface ReadOnlyWindowStore<K, V> {
* This iterator must be closed after use.
*
* @param keyFrom the first key in the range
+ * A null value indicates a starting position from the first element in the store.
* @param keyTo the last key in the range
+ * A null value indicates that the range ends with the last element in the store.
* @param timeFrom time range start (inclusive), where iteration starts.
* @param timeTo time range end (inclusive), where iteration ends.
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from beginning to end of time.
* @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException if {@code null} is used for any key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo)
@@ -146,12 +147,13 @@ public interface ReadOnlyWindowStore<K, V> {
* This iterator must be closed after use.
*
* @param keyFrom the first key in the range
+ * A null value indicates a starting position from the first element in the store.
* @param keyTo the last key in the range
+ * A null value indicates that the range ends with the last element in the store.
* @param timeFrom time range start (inclusive), where iteration ends.
* @param timeTo time range end (inclusive), where iteration starts.
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from end to beginning of time.
* @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException if {@code null} is used for any key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
default KeyValueIterator<Windowed<K>, V> backwardFetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 31c6eb1..86c82fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -117,12 +117,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* This iterator must be closed after use.
*
* @param keyFrom the first key in the range
+ * A null value indicates a starting position from the first element in the store.
* @param keyTo the last key in the range
+ * A null value indicates that the range ends with the last element in the store.
* @param timeFrom time range start (inclusive)
* @param timeTo time range end (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException if one of the given keys is {@code null}
*/
// WindowStore keeps a long-based implementation of ReadOnlyWindowStore#fetch Instant-based
// if super#fetch is removed, keep this implementation as it serves PAPI Stores.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index f7aef11..bfee6b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -114,7 +114,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
final long from,
final long to,
final boolean forward) {
- if (keyFrom.compareTo(keyTo) > 0) {
+ if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
"This may be due to range arguments set in the wrong order, " +
"or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
@@ -124,8 +124,8 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);
- final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from);
- final Bytes binaryTo = keySchema.upperRange(keyTo, to);
+ final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from);
+ final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to);
return new SegmentIterator<>(
searchSpace.iterator(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 59604a5..fa04ac8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -244,7 +244,7 @@ class CachingWindowStore
final Bytes keyTo,
final long timeFrom,
final long timeTo) {
- if (keyFrom.compareTo(keyTo) > 0) {
+ if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
"This may be due to range arguments set in the wrong order, " +
"or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
@@ -266,8 +266,8 @@ class CachingWindowStore
new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, true) :
context.cache().range(
cacheName,
- cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)),
- cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo))
+ keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)),
+ keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo))
);
final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo);
@@ -289,7 +289,7 @@ class CachingWindowStore
final Bytes keyTo,
final long timeFrom,
final long timeTo) {
- if (keyFrom.compareTo(keyTo) > 0) {
+ if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
@@ -310,8 +310,8 @@ class CachingWindowStore
new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, false) :
context.cache().reverseRange(
cacheName,
- cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)),
- cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo))
+ keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)),
+ keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo))
);
final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo);
@@ -573,12 +573,14 @@ class CachingWindowStore
throw new IllegalStateException("Error iterating over segments: segment interval has changed");
}
- if (keyFrom.equals(keyTo)) {
+ if (keyFrom != null && keyTo != null && keyFrom.equals(keyTo)) {
cacheKeyFrom = cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime));
cacheKeyTo = cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime));
} else {
- cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId);
- cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId);
+ cacheKeyFrom = keyFrom == null ? null :
+ cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId);
+ cacheKeyTo = keyTo == null ? null :
+ cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index 4227855..c6b0b60 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -115,8 +115,6 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
final K keyTo,
final Instant timeFrom,
final Instant timeTo) {
- Objects.requireNonNull(keyFrom, "keyFrom can't be null");
- Objects.requireNonNull(keyTo, "keyTo can't be null");
final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction =
store -> store.fetch(keyFrom, keyTo, timeFrom, timeTo);
return new DelegatingPeekingKeyValueIterator<>(
@@ -131,8 +129,6 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
final K keyTo,
final Instant timeFrom,
final Instant timeTo) throws IllegalArgumentException {
- Objects.requireNonNull(keyFrom, "keyFrom can't be null");
- Objects.requireNonNull(keyTo, "keyTo can't be null");
final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction =
store -> store.backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
return new DelegatingPeekingKeyValueIterator<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index ae37542..5327e75 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -213,12 +213,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final long timeFrom,
final long timeTo,
final boolean forward) {
- Objects.requireNonNull(from, "from key cannot be null");
- Objects.requireNonNull(to, "to key cannot be null");
-
removeExpiredSegments();
- if (from.compareTo(to) > 0) {
+ if (from != null && to != null && from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
"This may be due to range arguments set in the wrong order, " +
"or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
@@ -397,7 +394,8 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final Bytes to = (retainDuplicates && keyTo != null) ? wrapForDups(keyTo, Integer.MAX_VALUE) : keyTo;
final WrappedWindowedKeyValueIterator iterator =
- new WrappedWindowedKeyValueIterator(from,
+ new WrappedWindowedKeyValueIterator(
+ from,
to,
segmentIterator,
openIterators::remove,
@@ -462,7 +460,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
}
final Bytes key = getKey(next.key);
- if (key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0) {
+ if (isKeyWithinRange(key)) {
return true;
} else {
next = null;
@@ -470,6 +468,23 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
}
}
+ private boolean isKeyWithinRange(final Bytes key) {
+ // split all cases for readability and avoid BooleanExpressionComplexity checkstyle warning
+ if (keyFrom == null && keyTo == null) {
+ // fetch all
+ return true;
+ } else if (keyFrom == null) {
+ // start from the beginning
+ return key.compareTo(getKey(keyTo)) <= 0;
+ } else if (keyTo == null) {
+ // end to the last
+ return key.compareTo(getKey(keyFrom)) >= 0;
+ } else {
+ // key is within the range
+ return key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0;
+ }
+ }
+
public void close() {
next = null;
recordIterator = null;
@@ -499,9 +514,16 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>> currentSegment = segmentIterator.next();
currentTime = currentSegment.getKey();
- final ConcurrentNavigableMap<Bytes, byte[]> subMap = allKeys ?
- currentSegment.getValue() :
- currentSegment.getValue().subMap(keyFrom, true, keyTo, true);
+ final ConcurrentNavigableMap<Bytes, byte[]> subMap;
+ if (allKeys) { // keyFrom == null && keyTo == null
+ subMap = currentSegment.getValue();
+ } else if (keyFrom == null) {
+ subMap = currentSegment.getValue().headMap(keyTo, true);
+ } else if (keyTo == null) {
+ subMap = currentSegment.getValue().tailMap(keyFrom, true);
+ } else {
+ subMap = currentSegment.getValue().subMap(keyFrom, true, keyTo, true);
+ }
if (forward) {
return subMap.entrySet().iterator();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 24c17f2..48b42fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -245,10 +245,12 @@ public class MeteredWindowStore<K, V>
final K keyTo,
final long timeFrom,
final long timeTo) {
- Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
- Objects.requireNonNull(keyTo, "keyTo cannot be null");
return new MeteredWindowedKeyValueIterator<>(
- wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo), timeFrom, timeTo),
+ wrapped().fetch(
+ keyFrom == null ? null : keyBytes(keyFrom),
+ keyTo == null ? null : keyBytes(keyTo),
+ timeFrom,
+ timeTo),
fetchSensor,
streamsMetrics,
serdes,
@@ -260,10 +262,12 @@ public class MeteredWindowStore<K, V>
final K keyTo,
final long timeFrom,
final long timeTo) {
- Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
- Objects.requireNonNull(keyTo, "keyTo cannot be null");
return new MeteredWindowedKeyValueIterator<>(
- wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo), timeFrom, timeTo),
+ wrapped().backwardFetch(
+ keyFrom == null ? null : keyBytes(keyFrom),
+ keyTo == null ? null : keyBytes(keyTo),
+ timeFrom,
+ timeTo),
fetchSensor,
streamsMetrics,
serdes,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
index 03b66a6..6191c49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
@@ -74,18 +74,10 @@ class SegmentIterator<S extends Segment> implements KeyValueIterator<Bytes, byte
close();
currentSegment = segments.next();
try {
- if (from == null || to == null) {
- if (forward) {
- currentIterator = currentSegment.all();
- } else {
- currentIterator = currentSegment.reverseAll();
- }
+ if (forward) {
+ currentIterator = currentSegment.range(from, to);
} else {
- if (forward) {
- currentIterator = currentSegment.range(from, to);
- } else {
- currentIterator = currentSegment.reverseRange(from, to);
- }
+ currentIterator = currentSegment.reverseRange(from, to);
}
} catch (final InvalidStateStoreException e) {
// segment may have been closed so we ignore it.
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 514bb79..0640a46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -145,16 +145,138 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
@Test
public void shouldPutAndFetch() {
- final String key = "a";
- bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(10));
- bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50));
- bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100));
+ final String keyA = "a";
+ final String keyB = "b";
+ final String keyC = "c";
+ bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), serializeValue(10));
+ bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), serializeValue(50));
+ bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), serializeValue(100));
+ bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), serializeValue(200));
- try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 500)) {
+ try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
+ Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
+ KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
+ KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
+ Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+ KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
+ KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+ KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
+ null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+ KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
+ KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+ KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
+ Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+ KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+ KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
+ null, null, 0, windows[3].start())) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+ KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
+ KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+ KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+ KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+ }
+
+ @Test
+ public void shouldPutAndBackwardFetch() {
+ final String keyA = "a";
+ final String keyB = "b";
+ final String keyC = "c";
+ bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), serializeValue(10));
+ bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), serializeValue(50));
+ bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), serializeValue(100));
+ bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), serializeValue(200));
+
+ try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
+ Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+ KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+ KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
+ Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+ KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+ KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+ KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
+ null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+ KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+ KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+ KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
+ Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+ KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
+ KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
+ null, null, 0, windows[3].start())) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+ KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
+ KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+ KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+ KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
);
assertEquals(expected, toList(values));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index 153db97..e93f758 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -436,6 +436,30 @@ public abstract class AbstractWindowBytesStoreTest {
ofEpochMilli(defaultStartTime + 3L),
ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5)))
);
+ assertEquals(
+ asList(zero, one, two),
+ toList(windowStore.fetch(
+ null,
+ 2,
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + WINDOW_SIZE + 2L)))
+ );
+ assertEquals(
+ asList(two, three, four, five),
+ toList(windowStore.fetch(
+ 2,
+ null,
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L)))
+ );
+ assertEquals(
+ asList(zero, one, two, three, four, five),
+ toList(windowStore.fetch(
+ null,
+ null,
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L)))
+ );
}
@Test
@@ -506,6 +530,30 @@ public abstract class AbstractWindowBytesStoreTest {
ofEpochMilli(defaultStartTime + 3L),
ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5)))
);
+ assertEquals(
+ asList(two, one, zero),
+ toList(windowStore.backwardFetch(
+ null,
+ 2,
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + WINDOW_SIZE + 2L)))
+ );
+ assertEquals(
+ asList(five, four, three, two),
+ toList(windowStore.backwardFetch(
+ 2,
+ null,
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L)))
+ );
+ assertEquals(
+ asList(five, four, three, two, one, zero),
+ toList(windowStore.backwardFetch(
+ null,
+ null,
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L)))
+ );
}
@Test
@@ -861,16 +909,6 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
- public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
- assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 2, ofEpochMilli(1L), ofEpochMilli(2L)));
- }
-
- @Test
- public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
- assertThrows(NullPointerException.class, () -> windowStore.fetch(1, null, ofEpochMilli(1L), ofEpochMilli(2L)));
- }
-
- @Test
public void shouldFetchAndIterateOverExactBinaryKeys() {
final WindowStore<Bytes, String> windowStore = buildWindowStore(RETENTION_PERIOD,
WINDOW_SIZE,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index 023d69a..2d64a44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -54,6 +54,7 @@ import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
+import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
@@ -64,6 +65,7 @@ import static java.time.Instant.ofEpochMilli;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.apache.kafka.test.StreamsTestUtils.verifyAllWindowedKeyValues;
import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -283,21 +285,158 @@ public class CachingPersistentWindowStoreTest {
cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
- cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10))) {
- verifyWindowedKeyValue(
- iterator.next(),
+ cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP))) {
+ final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
- "a");
- verifyWindowedKeyValue(
- iterator.next(),
- new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
- "b");
- assertFalse(iterator.hasNext());
+ new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE))
+ );
+
+ final List<String> expectedValues = Arrays.asList("a", "b");
+
+ verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
assertEquals(2, cache.size());
}
}
@Test
+ public void shouldPutFetchRangeFromCacheForNullKeyFrom() {
+ cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L);
+ cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L);
+ cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L);
+
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+ cachingStore.fetch(null, bytesKey("d"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+ final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+ new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE))
+ );
+
+ final List<String> expectedValues = Arrays.asList("a", "b", "c", "d");
+
+ verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+ }
+ }
+
+ @Test
+ public void shouldPutFetchRangeFromCacheForNullKeyTo() {
+ cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L);
+ cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L);
+ cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L);
+
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+ cachingStore.fetch(bytesKey("b"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+ final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+ new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE))
+ );
+
+ final List<String> expectedValues = Arrays.asList("b", "c", "d", "e");
+
+ verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+ }
+ }
+
+ @Test
+ public void shouldPutFetchRangeFromCacheForNullKeyFromKeyTo() {
+ cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L);
+ cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L);
+ cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L);
+
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+ cachingStore.fetch(null, null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+ final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+ new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE))
+ );
+
+ final List<String> expectedValues = Arrays.asList("a", "b", "c", "d", "e");
+
+ verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+ }
+ }
+
+ @Test
+ public void shouldPutBackwardFetchRangeFromCacheForNullKeyFrom() {
+ cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L);
+ cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L);
+ cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L);
+
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+ cachingStore.backwardFetch(null, bytesKey("c"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+ final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+ new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE))
+ );
+
+ final List<String> expectedValues = Arrays.asList("c", "b", "a");
+
+ verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+ }
+ }
+
+ @Test
+ public void shouldPutBackwardFetchRangeFromCacheForNullKeyTo() {
+ cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L);
+ cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L);
+ cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L);
+
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+ cachingStore.backwardFetch(bytesKey("c"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+ final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+ new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE))
+ );
+
+ final List<String> expectedValues = Arrays.asList("e", "d", "c");
+
+ verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+ }
+ }
+
+ @Test
+ public void shouldPutBackwardFetchRangeFromCacheForNullKeyFromKeyTo() {
+ cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L);
+ cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L);
+ cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L);
+
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+ cachingStore.backwardFetch(null, null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+ final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+ new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+ new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE))
+ );
+
+ final List<String> expectedValues = Arrays.asList("e", "d", "c", "b", "a");
+
+ verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+ }
+ }
+
+ @Test
public void shouldGetAllFromCache() {
cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
@@ -814,16 +953,6 @@ public class CachingPersistentWindowStoreTest {
}
@Test
- public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
- assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, bytesKey("anyTo"), ofEpochMilli(1L), ofEpochMilli(2L)));
- }
-
- @Test
- public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
- assertThrows(NullPointerException.class, () -> cachingStore.fetch(bytesKey("anyFrom"), null, ofEpochMilli(1L), ofEpochMilli(2L)));
- }
-
- @Test
public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 73439b1..db15276 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -86,6 +86,7 @@ public class CompositeReadOnlyWindowStoreTest {
);
}
+
@Test
public void shouldBackwardFetchValuesFromWindowStore() {
underlyingWindowStore.put("my-key", "my-value", 0L);
@@ -354,6 +355,97 @@ public class CompositeReadOnlyWindowStoreTest {
}
@Test
+ public void shouldFetchKeyRangeAcrossStoresWithNullKeyTo() {
+ final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+ stubProviderTwo.addStore(storeName, secondUnderlying);
+ underlyingWindowStore.put("a", "a", 0L);
+ secondUnderlying.put("b", "b", 10L);
+ secondUnderlying.put("c", "c", 10L);
+ final List<KeyValue<Windowed<String>, String>> results =
+ StreamsTestUtils.toList(windowStore.fetch("a", null, ofEpochMilli(0), ofEpochMilli(10)));
+ assertThat(results, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+ KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"),
+ KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"))));
+ }
+
+ @Test
+ public void shouldFetchKeyRangeAcrossStoresWithNullKeyFrom() {
+ final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+ stubProviderTwo.addStore(storeName, secondUnderlying);
+ underlyingWindowStore.put("a", "a", 0L);
+ secondUnderlying.put("b", "b", 10L);
+ secondUnderlying.put("c", "c", 10L);
+ final List<KeyValue<Windowed<String>, String>> results =
+ StreamsTestUtils.toList(windowStore.fetch(null, "c", ofEpochMilli(0), ofEpochMilli(10)));
+ assertThat(results, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+ KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"),
+ KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"))));
+ }
+
+ @Test
+ public void shouldFetchKeyRangeAcrossStoresWithNullKeyFromKeyTo() {
+ final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+ stubProviderTwo.addStore(storeName, secondUnderlying);
+ underlyingWindowStore.put("a", "a", 0L);
+ secondUnderlying.put("b", "b", 10L);
+ secondUnderlying.put("c", "c", 10L);
+ final List<KeyValue<Windowed<String>, String>> results =
+ StreamsTestUtils.toList(windowStore.fetch(null, null, ofEpochMilli(0), ofEpochMilli(10)));
+ assertThat(results, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+ KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"),
+ KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"))));
+ }
+
+ @Test
+ public void shouldBackwardFetchKeyRangeAcrossStoresWithNullKeyTo() {
+ final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+ stubProviderTwo.addStore(storeName, secondUnderlying);
+ underlyingWindowStore.put("a", "a", 0L);
+ secondUnderlying.put("b", "b", 10L);
+ secondUnderlying.put("c", "c", 10L);
+ final List<KeyValue<Windowed<String>, String>> results =
+ StreamsTestUtils.toList(windowStore.backwardFetch("a", null, ofEpochMilli(0), ofEpochMilli(10)));
+ assertThat(results, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+ KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"),
+ KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"))));
+ }
+
+ @Test
+ public void shouldBackwardFetchKeyRangeAcrossStoresWithNullKeyFrom() {
+ final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+ stubProviderTwo.addStore(storeName, secondUnderlying);
+ underlyingWindowStore.put("a", "a", 0L);
+ secondUnderlying.put("b", "b", 10L);
+ secondUnderlying.put("c", "c", 10L);
+ final List<KeyValue<Windowed<String>, String>> results =
+ StreamsTestUtils.toList(windowStore.backwardFetch(null, "c", ofEpochMilli(0), ofEpochMilli(10)));
+ assertThat(results, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+ KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"),
+ KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b")
+ )));
+ }
+
+ @Test
+ public void shouldBackwardFetchKeyRangeAcrossStoresWithNullKeyFromKeyTo() {
+ final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+ stubProviderTwo.addStore(storeName, secondUnderlying);
+ underlyingWindowStore.put("a", "a", 0L);
+ secondUnderlying.put("b", "b", 10L);
+ secondUnderlying.put("c", "c", 10L);
+ final List<KeyValue<Windowed<String>, String>> results =
+ StreamsTestUtils.toList(windowStore.backwardFetch(null, null, ofEpochMilli(0), ofEpochMilli(10)));
+ assertThat(results, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+ KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"),
+ KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"))));
+ }
+
+ @Test
public void shouldBackwardFetchKeyRangeAcrossStores() {
final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
stubProviderTwo.addStore(storeName, secondUnderlying);
@@ -436,15 +528,4 @@ public class CompositeReadOnlyWindowStoreTest {
public void shouldThrowNPEIfKeyIsNull() {
assertThrows(NullPointerException.class, () -> windowStore.fetch(null, ofEpochMilli(0), ofEpochMilli(0)));
}
-
- @Test
- public void shouldThrowNPEIfFromKeyIsNull() {
- assertThrows(NullPointerException.class, () -> windowStore.fetch(null, "a", ofEpochMilli(0), ofEpochMilli(0)));
- }
-
- @Test
- public void shouldThrowNPEIfToKeyIsNull() {
- assertThrows(NullPointerException.class, () -> windowStore.fetch("a", null, ofEpochMilli(0), ofEpochMilli(0)));
- }
-
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 649e159..ca6a518 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -274,10 +274,19 @@ public class MeteredWindowStoreTest {
public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() {
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1))
.andReturn(KeyValueIterators.emptyIterator());
+ expect(innerStoreMock.fetch(null, Bytes.wrap("b".getBytes()), 1, 1))
+ .andReturn(KeyValueIterators.emptyIterator());
+ expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), null, 1, 1))
+ .andReturn(KeyValueIterators.emptyIterator());
+ expect(innerStoreMock.fetch(null, null, 1, 1))
+ .andReturn(KeyValueIterators.emptyIterator());
replay(innerStoreMock);
store.init((StateStoreContext) context, store);
store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
+ store.fetch(null, "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
+ store.fetch("a", null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
+ store.fetch(null, null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
// it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor
// and the sensor is tested elsewhere
@@ -306,10 +315,19 @@ public class MeteredWindowStoreTest {
public void shouldBackwardFetchRangeFromInnerStoreAndRecordFetchMetrics() {
expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1))
.andReturn(KeyValueIterators.emptyIterator());
+ expect(innerStoreMock.backwardFetch(null, Bytes.wrap("b".getBytes()), 1, 1))
+ .andReturn(KeyValueIterators.emptyIterator());
+ expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), null, 1, 1))
+ .andReturn(KeyValueIterators.emptyIterator());
+ expect(innerStoreMock.backwardFetch(null, null, 1, 1))
+ .andReturn(KeyValueIterators.emptyIterator());
replay(innerStoreMock);
store.init((StateStoreContext) context, store);
store.backwardFetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
+ store.backwardFetch(null, "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
+ store.backwardFetch("a", null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
+ store.backwardFetch(null, null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
// it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor
// and the sensor is tested elsewhere
@@ -454,27 +472,6 @@ public class MeteredWindowStoreTest {
assertThrows(NullPointerException.class, () -> store.backwardFetch(null, 0L, 1L));
}
- @Test
- public void shouldThrowNullPointerOnFetchRangeIfFromIsNull() {
- assertThrows(NullPointerException.class, () -> store.fetch(null, "to", 0L, 1L));
- }
-
- @Test
- public void shouldThrowNullPointerOnFetchRangeIfToIsNull() {
- assertThrows(NullPointerException.class, () -> store.fetch("from", null, 0L, 1L));
- }
-
-
- @Test
- public void shouldThrowNullPointerOnbackwardFetchRangeIfFromIsNull() {
- assertThrows(NullPointerException.class, () -> store.backwardFetch(null, "to", 0L, 1L));
- }
-
- @Test
- public void shouldThrowNullPointerOnbackwardFetchRangeIfToIsNull() {
- assertThrows(NullPointerException.class, () -> store.backwardFetch("from", null, 0L, 1L));
- }
-
private KafkaMetric metric(final String name) {
return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", tags));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index 79d2229..752334d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -266,7 +266,19 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
for (long now = timeFrom.toEpochMilli(); now <= timeTo.toEpochMilli(); now++) {
final NavigableMap<K, V> kvMap = data.get(now);
if (kvMap != null) {
- for (final Entry<K, V> entry : kvMap.subMap(keyFrom, true, keyTo, true).entrySet()) {
+ final NavigableMap<K, V> kvSubMap;
+ if (keyFrom == null && keyFrom == null) {
+ kvSubMap = kvMap;
+ } else if (keyFrom == null) {
+ kvSubMap = kvMap.headMap(keyTo, true);
+ } else if (keyTo == null) {
+ kvSubMap = kvMap.tailMap(keyFrom, true);
+ } else {
+ // keyFrom != null and KeyTo != null
+ kvSubMap = kvMap.subMap(keyFrom, true, keyTo, true);
+ }
+
+ for (final Entry<K, V> entry : kvSubMap.entrySet()) {
results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue()));
}
}
@@ -297,8 +309,8 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
}
@Override
- public KeyValueIterator<Windowed<K>, V> backwardFetch(final K from,
- final K to,
+ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
+ final K keyTo,
final Instant timeFrom,
final Instant timeTo) throws IllegalArgumentException {
final long timeFromTs = ApiUtils.validateMillisecondInstant(timeFrom, prepareMillisCheckFailMsgPrefix(timeFrom, "timeFrom"));
@@ -310,7 +322,19 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
for (long now = timeToTs; now >= timeFromTs; now--) {
final NavigableMap<K, V> kvMap = data.get(now);
if (kvMap != null) {
- for (final Entry<K, V> entry : kvMap.subMap(from, true, to, true).descendingMap().entrySet()) {
+ final NavigableMap<K, V> kvSubMap;
+ if (keyFrom == null && keyFrom == null) {
+ kvSubMap = kvMap;
+ } else if (keyFrom == null) {
+ kvSubMap = kvMap.headMap(keyTo, true);
+ } else if (keyTo == null) {
+ kvSubMap = kvMap.tailMap(keyFrom, true);
+ } else {
+ // keyFrom != null and KeyTo != null
+ kvSubMap = kvMap.subMap(keyFrom, true, keyTo, true);
+ }
+
+ for (final Entry<K, V> entry : kvSubMap.descendingMap().entrySet()) {
results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue()));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index c7e5924..31ce3c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -112,6 +112,34 @@ public class SegmentIteratorTest {
}
@Test
+ public void shouldIterateOverAllSegmentsWhenNullKeyFromKeyTo() {
+ iterator = new SegmentIterator<>(
+ Arrays.asList(segmentOne, segmentTwo).iterator(),
+ hasNextCondition,
+ null,
+ null,
+ true);
+
+ assertTrue(iterator.hasNext());
+ assertEquals("a", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next()));
+
+ assertTrue(iterator.hasNext());
+ assertEquals("b", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next()));
+
+ assertTrue(iterator.hasNext());
+ assertEquals("c", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next()));
+
+ assertTrue(iterator.hasNext());
+ assertEquals("d", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next()));
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
public void shouldIterateBackwardOverAllSegments() {
iterator = new SegmentIterator<>(
Arrays.asList(segmentTwo, segmentOne).iterator(), //store should pass the segments in the right order
@@ -140,6 +168,34 @@ public class SegmentIteratorTest {
}
@Test
+ public void shouldIterateBackwardOverAllSegmentsWhenNullKeyFromKeyTo() {
+ iterator = new SegmentIterator<>(
+ Arrays.asList(segmentTwo, segmentOne).iterator(), //store should pass the segments in the right order
+ hasNextCondition,
+ null,
+ null,
+ false);
+
+ assertTrue(iterator.hasNext());
+ assertEquals("d", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next()));
+
+ assertTrue(iterator.hasNext());
+ assertEquals("c", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next()));
+
+ assertTrue(iterator.hasNext());
+ assertEquals("b", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next()));
+
+ assertTrue(iterator.hasNext());
+ assertEquals("a", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next()));
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() {
iterator = new SegmentIterator<>(
Collections.singletonList(segmentOne).iterator(),
@@ -174,6 +230,47 @@ public class SegmentIteratorTest {
}
@Test
+ public void shouldOnlyIterateOverSegmentsInBackwardRangeWhenNullKeyFrom() {
+ iterator = new SegmentIterator<>(
+ Arrays.asList(segmentOne, segmentTwo).iterator(),
+ hasNextCondition,
+ null,
+ Bytes.wrap("b".getBytes()),
+ false);
+
+ assertTrue(iterator.hasNext());
+ assertEquals("b", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next()));
+
+
+ assertTrue(iterator.hasNext());
+ assertEquals("a", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next()));
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void shouldOnlyIterateOverSegmentsInBackwardRangeWhenNullKeyTo() {
+ iterator = new SegmentIterator<>(
+ Arrays.asList(segmentOne, segmentTwo).iterator(),
+ hasNextCondition,
+ Bytes.wrap("c".getBytes()),
+ null,
+ false);
+
+ assertTrue(iterator.hasNext());
+ assertEquals("d", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next()));
+
+ assertTrue(iterator.hasNext());
+ assertEquals("c", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next()));
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
public void shouldOnlyIterateOverSegmentsInRange() {
iterator = new SegmentIterator<>(
Arrays.asList(segmentOne, segmentTwo).iterator(),
@@ -194,6 +291,54 @@ public class SegmentIteratorTest {
}
@Test
+ public void shouldOnlyIterateOverSegmentsInRangeWhenNullKeyFrom() {
+ iterator = new SegmentIterator<>(
+ Arrays.asList(segmentOne, segmentTwo).iterator(),
+ hasNextCondition,
+ null,
+ Bytes.wrap("c".getBytes()),
+ true);
+
+ assertTrue(iterator.hasNext());
+ assertEquals("a", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next()));
+
+ assertTrue(iterator.hasNext());
+ assertEquals("b", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next()));
+
+ assertTrue(iterator.hasNext());
+ assertEquals("c", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next()));
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void shouldOnlyIterateOverSegmentsInRangeWhenNullKeyTo() {
+ iterator = new SegmentIterator<>(
+ Arrays.asList(segmentOne, segmentTwo).iterator(),
+ hasNextCondition,
+ Bytes.wrap("b".getBytes()),
+ null,
+ true);
+
+ assertTrue(iterator.hasNext());
+ assertEquals("b", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next()));
+
+ assertTrue(iterator.hasNext());
+ assertEquals("c", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next()));
+
+ assertTrue(iterator.hasNext());
+ assertEquals("d", new String(iterator.peekNextKey().get()));
+ assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next()));
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
public void shouldThrowNoSuchElementOnPeekNextKeyIfNoNext() {
iterator = new SegmentIterator<>(
Arrays.asList(segmentOne, segmentTwo).iterator(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
index 833ab6a..a429256 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@@ -39,6 +40,7 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.TestUtils;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -54,12 +56,15 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(Parameterized.class)
public class WindowStoreFetchTest {
@@ -77,6 +82,14 @@ public class WindowStoreFetchTest {
private LinkedList<KeyValue<Windowed<String>, Long>> expectedRecords;
private LinkedList<KeyValue<String, String>> records;
private Properties streamsConfig;
+ private String low;
+ private String high;
+ private String middle;
+ private String innerLow;
+ private String innerHigh;
+ private String innerLowBetween;
+ private String innerHighBetween;
+ private String storeName;
private TimeWindowedKStream<String, String> windowedStream;
@@ -98,7 +111,31 @@ public class WindowStoreFetchTest {
// expected the count of each key is 2
final long windowStartTime = i < m ? 0 : WINDOW_SIZE;
expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(windowStartTime, windowStartTime + WINDOW_SIZE)), 2L));
+ high = key;
+ if (low == null) {
+ low = key;
+ }
+ if (i == m) {
+ middle = key;
+ }
+ if (i == 1) {
+ innerLow = key;
+ final int index = i * 2 - 1;
+ innerLowBetween = "key-" + index;
+ }
+ if (i == DATA_SIZE - 2) {
+ innerHigh = key;
+ final int index = i * 2 + 1;
+ innerHighBetween = "key-" + index;
+ }
}
+ Assert.assertNotNull(low);
+ Assert.assertNotNull(high);
+ Assert.assertNotNull(middle);
+ Assert.assertNotNull(innerLow);
+ Assert.assertNotNull(innerHigh);
+ Assert.assertNotNull(innerLowBetween);
+ Assert.assertNotNull(innerHighBetween);
}
@Rule
@@ -161,6 +198,50 @@ public class WindowStoreFetchTest {
TestUtils.checkEquals(scanIterator, dataIterator);
}
+
+ try (final KeyValueIterator<Windowed<String>, Long> scanIterator = forward ?
+ stateStore.fetch(null, null, 0, Long.MAX_VALUE) :
+ stateStore.backwardFetch(null, null, 0, Long.MAX_VALUE)) {
+
+ final Iterator<KeyValue<Windowed<String>, Long>> dataIterator = forward ?
+ expectedRecords.iterator() :
+ expectedRecords.descendingIterator();
+
+ TestUtils.checkEquals(scanIterator, dataIterator);
+ }
+
+ testRange("range", stateStore, innerLow, innerHigh, forward);
+ testRange("until", stateStore, null, middle, forward);
+ testRange("from", stateStore, middle, null, forward);
+
+ testRange("untilBetween", stateStore, null, innerHighBetween, forward);
+ testRange("fromBetween", stateStore, innerLowBetween, null, forward);
+ }
+ }
+
+ private List<KeyValue<Windowed<String>, Long>> filterList(final KeyValueIterator<Windowed<String>, Long> iterator, final String from, final String to) {
+ final Predicate<KeyValue<Windowed<String>, Long>> pred = new Predicate<KeyValue<Windowed<String>, Long>>() {
+ @Override
+ public boolean test(final KeyValue<Windowed<String>, Long> elem) {
+ if (from != null && elem.key.key().compareTo(from) < 0) {
+ return false;
+ }
+ if (to != null && elem.key.key().compareTo(to) > 0) {
+ return false;
+ }
+ return elem != null;
+ }
+ };
+
+ return Utils.toList(iterator, pred);
+ }
+
+ private void testRange(final String name, final WindowStore<String, Long> store, final String from, final String to, final boolean forward) {
+ try (final KeyValueIterator<Windowed<String>, Long> resultIterator = forward ? store.fetch(from, to, 0, Long.MAX_VALUE) : store.backwardFetch(from, to, 0, Long.MAX_VALUE);
+ final KeyValueIterator<Windowed<String>, Long> expectedIterator = forward ? store.fetchAll(0, Long.MAX_VALUE) : store.backwardFetchAll(0, Long.MAX_VALUE)) {
+ final List<KeyValue<Windowed<String>, Long>> result = Utils.toList(resultIterator);
+ final List<KeyValue<Windowed<String>, Long>> expected = filterList(expectedIterator, from, to);
+ assertThat(result, is(expected));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index c4c0235..9008982 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
import java.io.Closeable;
import java.io.IOException;
@@ -46,6 +47,7 @@ import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
public final class StreamsTestUtils {
private StreamsTestUtils() {}
@@ -170,6 +172,24 @@ public final class StreamsTestUtils {
}
}
+ public static void verifyAllWindowedKeyValues(final KeyValueIterator<Windowed<Bytes>, byte[]> iterator,
+ final List<Windowed<Bytes>> expectedKeys,
+ final List<String> expectedValues) {
+ if (expectedKeys.size() != expectedValues.size()) {
+ throw new IllegalArgumentException("expectedKeys and expectedValues should have the same size. " +
+ "expectedKeys size: " + expectedKeys.size() + ", expectedValues size: " + expectedValues.size());
+ }
+
+ for (int i = 0; i < expectedKeys.size(); i++) {
+ verifyWindowedKeyValue(
+ iterator.next(),
+ expectedKeys.get(i),
+ expectedValues.get(i)
+ );
+ }
+ assertFalse(iterator.hasNext());
+ }
+
public static void verifyWindowedKeyValue(final KeyValue<Windowed<Bytes>, byte[]> actual,
final Windowed<Bytes> expectedKey,
final String expectedValue) {