You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/09/13 21:43:20 UTC
[kafka] branch trunk updated: KAFKA-13264: fix inMemoryWindowStore
backward fetch not in reversed order (#11292)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9628c12 KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order (#11292)
9628c12 is described below
commit 9628c1278e5dcd4f6995c89809461d350d9a530a
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Tue Sep 14 05:40:54 2021 +0800
KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order (#11292)
When introducing backward iterator for WindowStroe in #9138, we forgot to make "each segment" in reverse order (i.e. in descendingMap) in InMemoryWindowStore. Fix it and add integration tests for it.
Currently, in Window store, we store records in [segments -> [records] ].
For example:
window size = 500,
input records:
key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window
So, internally, the "a" and "b" will be in the same segment, and "c" in another segments.
segments: [0 /* window start */, records], [500, records].
And the records for window start 0 will be "a" and "b".
the records for window start 500 will be "c".
Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing backwardFetchAll, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously.
Reviewers: Jorge Esteban Quilcate Otoya <qu...@gmail.com>, Anna Sophie Blee-Goldman <ab...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
.../state/internals/InMemoryWindowStore.java | 10 +-
.../internals/AbstractWindowBytesStoreTest.java | 500 +++++++++------------
.../state/internals/WindowStoreFetchTest.java | 229 ++++++++++
3 files changed, 451 insertions(+), 288 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 7c0d21c..ae37542 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -499,10 +499,14 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>> currentSegment = segmentIterator.next();
currentTime = currentSegment.getKey();
- if (allKeys) {
- return currentSegment.getValue().entrySet().iterator();
+ final ConcurrentNavigableMap<Bytes, byte[]> subMap = allKeys ?
+ currentSegment.getValue() :
+ currentSegment.getValue().subMap(keyFrom, true, keyTo, true);
+
+ if (forward) {
+ return subMap.entrySet().iterator();
} else {
- return currentSegment.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator();
+ return subMap.descendingMap().entrySet().iterator();
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index 02db7e7..153db97 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -80,6 +80,15 @@ public abstract class AbstractWindowBytesStoreTest {
static final long SEGMENT_INTERVAL = 60_000L;
static final long RETENTION_PERIOD = 2 * SEGMENT_INTERVAL;
+ final long defaultStartTime = SEGMENT_INTERVAL - 4L;
+
+ final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", defaultStartTime);
+ final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", defaultStartTime + 1);
+ final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", defaultStartTime + 2);
+ final KeyValue<Windowed<Integer>, String> three = windowedPair(3, "three", defaultStartTime + 2);
+ final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", defaultStartTime + 4);
+ final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", defaultStartTime + 5);
+
WindowStore<Integer, String> windowStore;
InternalMockProcessorContext context;
MockRecordCollector recordCollector;
@@ -119,122 +128,114 @@ public abstract class AbstractWindowBytesStoreTest {
@Test
public void testRangeAndSinglePointFetch() {
- final long startTime = SEGMENT_INTERVAL - 4L;
-
- putFirstBatch(windowStore, startTime, context);
-
- assertEquals("zero", windowStore.fetch(0, startTime));
- assertEquals("one", windowStore.fetch(1, startTime + 1L));
- assertEquals("two", windowStore.fetch(2, startTime + 2L));
- assertEquals("four", windowStore.fetch(4, startTime + 4L));
- assertEquals("five", windowStore.fetch(5, startTime + 5L));
+ putFirstBatch(windowStore, defaultStartTime, context);
assertEquals(
new HashSet<>(Collections.singletonList("zero")),
valuesToSet(windowStore.fetch(
0,
- ofEpochMilli(startTime + 0 - WINDOW_SIZE),
- ofEpochMilli(startTime + 0 + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 0 - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 0 + WINDOW_SIZE))));
- putSecondBatch(windowStore, startTime, context);
+ putSecondBatch(windowStore, defaultStartTime, context);
- assertEquals("two+1", windowStore.fetch(2, startTime + 3L));
- assertEquals("two+2", windowStore.fetch(2, startTime + 4L));
- assertEquals("two+3", windowStore.fetch(2, startTime + 5L));
- assertEquals("two+4", windowStore.fetch(2, startTime + 6L));
- assertEquals("two+5", windowStore.fetch(2, startTime + 7L));
- assertEquals("two+6", windowStore.fetch(2, startTime + 8L));
+ assertEquals("two+1", windowStore.fetch(2, defaultStartTime + 3L));
+ assertEquals("two+2", windowStore.fetch(2, defaultStartTime + 4L));
+ assertEquals("two+3", windowStore.fetch(2, defaultStartTime + 5L));
+ assertEquals("two+4", windowStore.fetch(2, defaultStartTime + 6L));
+ assertEquals("two+5", windowStore.fetch(2, defaultStartTime + 7L));
+ assertEquals("two+6", windowStore.fetch(2, defaultStartTime + 8L));
assertEquals(
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime - 2L - WINDOW_SIZE),
- ofEpochMilli(startTime - 2L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime - 2L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime - 2L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.singletonList("two")),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime - 1L - WINDOW_SIZE),
- ofEpochMilli(startTime - 1L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime - 1L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime - 1L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two", "two+1")),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime - WINDOW_SIZE),
- ofEpochMilli(startTime + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two", "two+1", "two+2")),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime + 1L - WINDOW_SIZE),
- ofEpochMilli(startTime + 1L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 1L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 1L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two", "two+1", "two+2", "two+3")),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime + 2L - WINDOW_SIZE),
- ofEpochMilli(startTime + 2L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 2L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 2L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two", "two+1", "two+2", "two+3", "two+4")),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime + 3L - WINDOW_SIZE),
- ofEpochMilli(startTime + 3L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 3L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 3L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two", "two+1", "two+2", "two+3", "two+4", "two+5")),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime + 4L - WINDOW_SIZE),
- ofEpochMilli(startTime + 4L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6")),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime + 5L - WINDOW_SIZE),
- ofEpochMilli(startTime + 5L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 5L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 5L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6")),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime + 6L - WINDOW_SIZE),
- ofEpochMilli(startTime + 6L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 6L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 6L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two+2", "two+3", "two+4", "two+5", "two+6")),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime + 7L - WINDOW_SIZE),
- ofEpochMilli(startTime + 7L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 7L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 7L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two+3", "two+4", "two+5", "two+6")),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime + 8L - WINDOW_SIZE),
- ofEpochMilli(startTime + 8L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 8L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 8L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two+4", "two+5", "two+6")),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime + 9L - WINDOW_SIZE),
- ofEpochMilli(startTime + 9L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 9L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 9L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two+5", "two+6")),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime + 10L - WINDOW_SIZE),
- ofEpochMilli(startTime + 10L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 10L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 10L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.singletonList("two+6")),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime + 11L - WINDOW_SIZE),
- ofEpochMilli(startTime + 11L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 11L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 11L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
2,
- ofEpochMilli(startTime + 12L - WINDOW_SIZE),
- ofEpochMilli(startTime + 12L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 12L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 12L + WINDOW_SIZE))));
// Flush the store and verify all current entries were properly flushed ...
windowStore.flush();
@@ -244,14 +245,14 @@ public abstract class AbstractWindowBytesStoreTest {
changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value()));
}
- final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+ final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, defaultStartTime);
assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
assertEquals(
Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"),
entriesByKey.get(2));
- assertNull(entriesByKey.get(3));
+ assertEquals(Utils.mkSet("three@2"), entriesByKey.get(3));
assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
assertNull(entriesByKey.get(6));
@@ -259,42 +260,28 @@ public abstract class AbstractWindowBytesStoreTest {
@Test
public void shouldGetAll() {
- final long startTime = SEGMENT_INTERVAL - 4L;
-
- putFirstBatch(windowStore, startTime, context);
-
- final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
- final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
- final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
- final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
- final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+ putFirstBatch(windowStore, defaultStartTime, context);
assertEquals(
- asList(zero, one, two, four, five),
+ asList(zero, one, two, three, four, five),
toList(windowStore.all())
);
}
@Test
public void shouldGetAllNonDeletedRecords() {
- final long startTime = SEGMENT_INTERVAL - 4L;
-
// Add some records
- windowStore.put(0, "zero", startTime + 0);
- windowStore.put(1, "one", startTime + 1);
- windowStore.put(2, "two", startTime + 2);
- windowStore.put(3, "three", startTime + 3);
- windowStore.put(4, "four", startTime + 4);
+ windowStore.put(0, "zero", defaultStartTime + 0);
+ windowStore.put(1, "one", defaultStartTime + 1);
+ windowStore.put(2, "two", defaultStartTime + 2);
+ windowStore.put(3, "three", defaultStartTime + 3);
+ windowStore.put(4, "four", defaultStartTime + 4);
// Delete some records
- windowStore.put(1, null, startTime + 1);
- windowStore.put(3, null, startTime + 3);
+ windowStore.put(1, null, defaultStartTime + 1);
+ windowStore.put(3, null, defaultStartTime + 3);
// Only non-deleted records should appear in the all() iterator
- final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
- final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
- final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
-
assertEquals(
asList(zero, two, four),
toList(windowStore.all())
@@ -303,21 +290,15 @@ public abstract class AbstractWindowBytesStoreTest {
@Test
public void shouldGetAllReturnTimestampOrderedRecords() {
- final long startTime = SEGMENT_INTERVAL - 4L;
-
// Add some records in different order
- windowStore.put(4, "four", startTime + 4);
- windowStore.put(0, "zero", startTime + 0);
- windowStore.put(2, "two", startTime + 2);
- windowStore.put(3, "three", startTime + 3);
- windowStore.put(1, "one", startTime + 1);
+ windowStore.put(4, "four", defaultStartTime + 4);
+ windowStore.put(0, "zero", defaultStartTime + 0);
+ windowStore.put(2, "two", defaultStartTime + 2);
+ windowStore.put(3, "three", defaultStartTime + 3);
+ windowStore.put(1, "one", defaultStartTime + 1);
// Only non-deleted records should appear in the all() iterator
- final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
- final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
- final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
- final KeyValue<Windowed<Integer>, String> three = windowedPair(3, "three", startTime + 3);
- final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
+ final KeyValue<Windowed<Integer>, String> three = windowedPair(3, "three", defaultStartTime + 3);
assertEquals(
asList(zero, one, two, three, four),
@@ -327,13 +308,8 @@ public abstract class AbstractWindowBytesStoreTest {
@Test
public void shouldEarlyClosedIteratorStillGetAllRecords() {
- final long startTime = SEGMENT_INTERVAL - 4L;
-
- windowStore.put(0, "zero", startTime + 0);
- windowStore.put(1, "one", startTime + 1);
-
- final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
- final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
+ windowStore.put(0, "zero", defaultStartTime + 0);
+ windowStore.put(1, "one", defaultStartTime + 1);
final KeyValueIterator<Windowed<Integer>, String> it = windowStore.all();
assertEquals(zero, it.next());
@@ -348,302 +324,260 @@ public abstract class AbstractWindowBytesStoreTest {
@Test
public void shouldGetBackwardAll() {
- final long startTime = SEGMENT_INTERVAL - 4L;
-
- putFirstBatch(windowStore, startTime, context);
-
- final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
- final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
- final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
- final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
- final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+ putFirstBatch(windowStore, defaultStartTime, context);
assertEquals(
- asList(five, four, two, one, zero),
+ asList(five, four, three, two, one, zero),
toList(windowStore.backwardAll())
);
}
@Test
public void shouldFetchAllInTimeRange() {
- final long startTime = SEGMENT_INTERVAL - 4L;
-
- putFirstBatch(windowStore, startTime, context);
-
- final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
- final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
- final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
- final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
- final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+ putFirstBatch(windowStore, defaultStartTime, context);
assertEquals(
- asList(one, two, four),
- toList(windowStore.fetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 4)))
+ asList(one, two, three, four),
+ toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 1), ofEpochMilli(defaultStartTime + 4)))
);
assertEquals(
- asList(zero, one, two),
- toList(windowStore.fetchAll(ofEpochMilli(startTime + 0), ofEpochMilli(startTime + 3)))
+ asList(zero, one, two, three),
+ toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3)))
);
assertEquals(
- asList(one, two, four, five),
- toList(windowStore.fetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 5)))
+ asList(one, two, three, four, five),
+ toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 1), ofEpochMilli(defaultStartTime + 5)))
);
}
@Test
public void shouldBackwardFetchAllInTimeRange() {
- final long startTime = SEGMENT_INTERVAL - 4L;
-
- putFirstBatch(windowStore, startTime, context);
-
- final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
- final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
- final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
- final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
- final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+ putFirstBatch(windowStore, defaultStartTime, context);
assertEquals(
- asList(four, two, one),
- toList(windowStore.backwardFetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 4)))
+ asList(four, three, two, one),
+ toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime + 1), ofEpochMilli(defaultStartTime + 4)))
);
assertEquals(
- asList(two, one, zero),
- toList(windowStore.backwardFetchAll(ofEpochMilli(startTime + 0), ofEpochMilli(startTime + 3)))
+ asList(three, two, one, zero),
+ toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3)))
);
assertEquals(
- asList(five, four, two, one),
- toList(windowStore.backwardFetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 5)))
+ asList(five, four, three, two, one),
+ toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime + 1), ofEpochMilli(defaultStartTime + 5)))
);
}
@Test
public void testFetchRange() {
- final long startTime = SEGMENT_INTERVAL - 4L;
-
- putFirstBatch(windowStore, startTime, context);
-
- final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
- final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
- final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
- final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
- final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+ putFirstBatch(windowStore, defaultStartTime, context);
assertEquals(
asList(zero, one),
toList(windowStore.fetch(
0,
1,
- ofEpochMilli(startTime + 0L - WINDOW_SIZE),
- ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
);
assertEquals(
Collections.singletonList(one),
toList(windowStore.fetch(
1,
1,
- ofEpochMilli(startTime + 0L - WINDOW_SIZE),
- ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
);
assertEquals(
- asList(one, two),
+ asList(one, two, three),
toList(windowStore.fetch(
1,
3,
- ofEpochMilli(startTime + 0L - WINDOW_SIZE),
- ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
);
assertEquals(
- asList(zero, one, two),
+ asList(zero, one, two, three),
toList(windowStore.fetch(
0,
5,
- ofEpochMilli(startTime + 0L - WINDOW_SIZE),
- ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
);
assertEquals(
- asList(zero, one, two, four, five),
+ asList(zero, one, two, three, four, five),
toList(windowStore.fetch(
0,
5,
- ofEpochMilli(startTime + 0L - WINDOW_SIZE),
- ofEpochMilli(startTime + 0L + WINDOW_SIZE + 5L)))
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE + 5L)))
);
assertEquals(
- asList(two, four, five),
+ asList(two, three, four, five),
toList(windowStore.fetch(
0,
5,
- ofEpochMilli(startTime + 2L),
- ofEpochMilli(startTime + 0L + WINDOW_SIZE + 5L)))
+ ofEpochMilli(defaultStartTime + 2L),
+ ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE + 5L)))
);
assertEquals(
Collections.emptyList(),
toList(windowStore.fetch(
4,
5,
- ofEpochMilli(startTime + 2L),
- ofEpochMilli(startTime + WINDOW_SIZE)))
+ ofEpochMilli(defaultStartTime + 2L),
+ ofEpochMilli(defaultStartTime + WINDOW_SIZE)))
);
assertEquals(
Collections.emptyList(),
toList(windowStore.fetch(
0,
3,
- ofEpochMilli(startTime + 3L),
- ofEpochMilli(startTime + WINDOW_SIZE + 5)))
+ ofEpochMilli(defaultStartTime + 3L),
+ ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5)))
);
}
@Test
public void testBackwardFetchRange() {
- final long startTime = SEGMENT_INTERVAL - 4L;
-
- putFirstBatch(windowStore, startTime, context);
-
- final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
- final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
- final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
- final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
- final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+ putFirstBatch(windowStore, defaultStartTime, context);
assertEquals(
asList(one, zero),
toList(windowStore.backwardFetch(
0,
1,
- ofEpochMilli(startTime + 0L - WINDOW_SIZE),
- ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
);
assertEquals(
Collections.singletonList(one),
toList(windowStore.backwardFetch(
1,
1,
- ofEpochMilli(startTime + 0L - WINDOW_SIZE),
- ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
);
assertEquals(
- asList(two, one),
+ asList(three, two, one),
toList(windowStore.backwardFetch(
1,
3,
- ofEpochMilli(startTime + 0L - WINDOW_SIZE),
- ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
);
assertEquals(
- asList(two, one, zero),
+ asList(three, two, one, zero),
toList(windowStore.backwardFetch(
0,
5,
- ofEpochMilli(startTime + 0L - WINDOW_SIZE),
- ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
);
assertEquals(
- asList(five, four, two, one, zero),
+ asList(five, four, three, two, one, zero),
toList(windowStore.backwardFetch(
0,
5,
- ofEpochMilli(startTime + 0L - WINDOW_SIZE),
- ofEpochMilli(startTime + 0L + WINDOW_SIZE + 5L)))
+ ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE + 5L)))
);
assertEquals(
- asList(five, four, two),
+ asList(five, four, three, two),
toList(windowStore.backwardFetch(
0,
5,
- ofEpochMilli(startTime + 2L),
- ofEpochMilli(startTime + 0L + WINDOW_SIZE + 5L)))
+ ofEpochMilli(defaultStartTime + 2L),
+ ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE + 5L)))
);
assertEquals(
Collections.emptyList(),
toList(windowStore.backwardFetch(
4,
5,
- ofEpochMilli(startTime + 2L),
- ofEpochMilli(startTime + WINDOW_SIZE)))
+ ofEpochMilli(defaultStartTime + 2L),
+ ofEpochMilli(defaultStartTime + WINDOW_SIZE)))
);
assertEquals(
Collections.emptyList(),
toList(windowStore.backwardFetch(
0,
3,
- ofEpochMilli(startTime + 3L),
- ofEpochMilli(startTime + WINDOW_SIZE + 5)))
+ ofEpochMilli(defaultStartTime + 3L),
+ ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5)))
);
}
@Test
public void testPutAndFetchBefore() {
- final long startTime = SEGMENT_INTERVAL - 4L;
-
- putFirstBatch(windowStore, startTime, context);
+ putFirstBatch(windowStore, defaultStartTime, context);
assertEquals(
new HashSet<>(Collections.singletonList("zero")),
- valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime + 0L - WINDOW_SIZE), ofEpochMilli(startTime + 0L))));
+ valuesToSet(windowStore.fetch(0, ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 0L))));
assertEquals(
new HashSet<>(Collections.singletonList("one")),
- valuesToSet(windowStore.fetch(1, ofEpochMilli(startTime + 1L - WINDOW_SIZE), ofEpochMilli(startTime + 1L))));
+ valuesToSet(windowStore.fetch(1, ofEpochMilli(defaultStartTime + 1L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 1L))));
assertEquals(
new HashSet<>(Collections.singletonList("two")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 2L - WINDOW_SIZE), ofEpochMilli(startTime + 2L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 2L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 2L))));
assertEquals(
- new HashSet<>(Collections.emptyList()),
- valuesToSet(windowStore.fetch(3, ofEpochMilli(startTime + 3L - WINDOW_SIZE), ofEpochMilli(startTime + 3L))));
+ new HashSet<>(Collections.singletonList("three")),
+ valuesToSet(windowStore.fetch(3, ofEpochMilli(defaultStartTime + 3L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 3L))));
assertEquals(
new HashSet<>(Collections.singletonList("four")),
- valuesToSet(windowStore.fetch(4, ofEpochMilli(startTime + 4L - WINDOW_SIZE), ofEpochMilli(startTime + 4L))));
+ valuesToSet(windowStore.fetch(4, ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 4L))));
assertEquals(
new HashSet<>(Collections.singletonList("five")),
- valuesToSet(windowStore.fetch(5, ofEpochMilli(startTime + 5L - WINDOW_SIZE), ofEpochMilli(startTime + 5L))));
+ valuesToSet(windowStore.fetch(5, ofEpochMilli(defaultStartTime + 5L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 5L))));
- putSecondBatch(windowStore, startTime, context);
+ putSecondBatch(windowStore, defaultStartTime, context);
assertEquals(
new HashSet<>(Collections.emptyList()),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime - 1L - WINDOW_SIZE), ofEpochMilli(startTime - 1L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime - 1L - WINDOW_SIZE), ofEpochMilli(defaultStartTime - 1L))));
assertEquals(
new HashSet<>(Collections.emptyList()),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 0L - WINDOW_SIZE), ofEpochMilli(startTime + 0L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 0L))));
assertEquals(
new HashSet<>(Collections.emptyList()),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 1L - WINDOW_SIZE), ofEpochMilli(startTime + 1L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 1L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 1L))));
assertEquals(
new HashSet<>(Collections.singletonList("two")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 2L - WINDOW_SIZE), ofEpochMilli(startTime + 2L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 2L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 2L))));
assertEquals(
new HashSet<>(asList("two", "two+1")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 3L - WINDOW_SIZE), ofEpochMilli(startTime + 3L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 3L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 3L))));
assertEquals(
new HashSet<>(asList("two", "two+1", "two+2")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 4L - WINDOW_SIZE), ofEpochMilli(startTime + 4L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 4L))));
assertEquals(
new HashSet<>(asList("two", "two+1", "two+2", "two+3")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 5L - WINDOW_SIZE), ofEpochMilli(startTime + 5L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 5L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 5L))));
assertEquals(
new HashSet<>(asList("two+1", "two+2", "two+3", "two+4")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 6L - WINDOW_SIZE), ofEpochMilli(startTime + 6L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 6L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 6L))));
assertEquals(
new HashSet<>(asList("two+2", "two+3", "two+4", "two+5")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 7L - WINDOW_SIZE), ofEpochMilli(startTime + 7L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 7L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 7L))));
assertEquals(
new HashSet<>(asList("two+3", "two+4", "two+5", "two+6")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 8L - WINDOW_SIZE), ofEpochMilli(startTime + 8L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 8L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 8L))));
assertEquals(
new HashSet<>(asList("two+4", "two+5", "two+6")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 9L - WINDOW_SIZE), ofEpochMilli(startTime + 9L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 9L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 9L))));
assertEquals(
new HashSet<>(asList("two+5", "two+6")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 10L - WINDOW_SIZE), ofEpochMilli(startTime + 10L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 10L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 10L))));
assertEquals(
new HashSet<>(Collections.singletonList("two+6")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 11L - WINDOW_SIZE), ofEpochMilli(startTime + 11L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 11L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 11L))));
assertEquals(
new HashSet<>(Collections.emptyList()),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 12L - WINDOW_SIZE), ofEpochMilli(startTime + 12L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 12L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 12L))));
assertEquals(
new HashSet<>(Collections.emptyList()),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 13L - WINDOW_SIZE), ofEpochMilli(startTime + 13L))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 13L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 13L))));
// Flush the store and verify all current entries were properly flushed ...
windowStore.flush();
@@ -653,11 +587,11 @@ public abstract class AbstractWindowBytesStoreTest {
changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value()));
}
- final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+ final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, defaultStartTime);
assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
- assertNull(entriesByKey.get(3));
+ assertEquals(Utils.mkSet("three@2"), entriesByKey.get(3));
assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
assertNull(entriesByKey.get(6));
@@ -665,97 +599,95 @@ public abstract class AbstractWindowBytesStoreTest {
@Test
public void testPutAndFetchAfter() {
- final long startTime = SEGMENT_INTERVAL - 4L;
-
- putFirstBatch(windowStore, startTime, context);
+ putFirstBatch(windowStore, defaultStartTime, context);
assertEquals(
new HashSet<>(Collections.singletonList("zero")),
- valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime + 0L),
- ofEpochMilli(startTime + 0L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(0, ofEpochMilli(defaultStartTime + 0L),
+ ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.singletonList("one")),
- valuesToSet(windowStore.fetch(1, ofEpochMilli(startTime + 1L),
- ofEpochMilli(startTime + 1L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(1, ofEpochMilli(defaultStartTime + 1L),
+ ofEpochMilli(defaultStartTime + 1L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.singletonList("two")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 2L),
- ofEpochMilli(startTime + 2L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 2L),
+ ofEpochMilli(defaultStartTime + 2L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.emptyList()),
- valuesToSet(windowStore.fetch(3, ofEpochMilli(startTime + 3L),
- ofEpochMilli(startTime + 3L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(3, ofEpochMilli(defaultStartTime + 3L),
+ ofEpochMilli(defaultStartTime + 3L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.singletonList("four")),
- valuesToSet(windowStore.fetch(4, ofEpochMilli(startTime + 4L),
- ofEpochMilli(startTime + 4L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(4, ofEpochMilli(defaultStartTime + 4L),
+ ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.singletonList("five")),
- valuesToSet(windowStore.fetch(5, ofEpochMilli(startTime + 5L),
- ofEpochMilli(startTime + 5L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(5, ofEpochMilli(defaultStartTime + 5L),
+ ofEpochMilli(defaultStartTime + 5L + WINDOW_SIZE))));
- putSecondBatch(windowStore, startTime, context);
+ putSecondBatch(windowStore, defaultStartTime, context);
assertEquals(
new HashSet<>(Collections.emptyList()),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime - 2L),
- ofEpochMilli(startTime - 2L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime - 2L),
+ ofEpochMilli(defaultStartTime - 2L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.singletonList("two")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime - 1L),
- ofEpochMilli(startTime - 1L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime - 1L),
+ ofEpochMilli(defaultStartTime - 1L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two", "two+1")),
valuesToSet(windowStore
- .fetch(2, ofEpochMilli(startTime), ofEpochMilli(startTime + WINDOW_SIZE))));
+ .fetch(2, ofEpochMilli(defaultStartTime), ofEpochMilli(defaultStartTime + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two", "two+1", "two+2")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 1L),
- ofEpochMilli(startTime + 1L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 1L),
+ ofEpochMilli(defaultStartTime + 1L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two", "two+1", "two+2", "two+3")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 2L),
- ofEpochMilli(startTime + 2L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 2L),
+ ofEpochMilli(defaultStartTime + 2L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two+1", "two+2", "two+3", "two+4")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 3L),
- ofEpochMilli(startTime + 3L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 3L),
+ ofEpochMilli(defaultStartTime + 3L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two+2", "two+3", "two+4", "two+5")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 4L),
- ofEpochMilli(startTime + 4L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 4L),
+ ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two+3", "two+4", "two+5", "two+6")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 5L),
- ofEpochMilli(startTime + 5L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 5L),
+ ofEpochMilli(defaultStartTime + 5L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two+4", "two+5", "two+6")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 6L),
- ofEpochMilli(startTime + 6L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 6L),
+ ofEpochMilli(defaultStartTime + 6L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("two+5", "two+6")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 7L),
- ofEpochMilli(startTime + 7L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 7L),
+ ofEpochMilli(defaultStartTime + 7L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.singletonList("two+6")),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 8L),
- ofEpochMilli(startTime + 8L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 8L),
+ ofEpochMilli(defaultStartTime + 8L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.emptyList()),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 9L),
- ofEpochMilli(startTime + 9L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 9L),
+ ofEpochMilli(defaultStartTime + 9L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.emptyList()),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 10L),
- ofEpochMilli(startTime + 10L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 10L),
+ ofEpochMilli(defaultStartTime + 10L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.emptyList()),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 11L),
- ofEpochMilli(startTime + 11L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 11L),
+ ofEpochMilli(defaultStartTime + 11L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.emptyList()),
- valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 12L),
- ofEpochMilli(startTime + 12L + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 12L),
+ ofEpochMilli(defaultStartTime + 12L + WINDOW_SIZE))));
// Flush the store and verify all current entries were properly flushed ...
windowStore.flush();
@@ -765,14 +697,14 @@ public abstract class AbstractWindowBytesStoreTest {
changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value()));
}
- final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+ final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, defaultStartTime);
assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
assertEquals(
Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"),
entriesByKey.get(2));
- assertNull(entriesByKey.get(3));
+ assertEquals(Utils.mkSet("three@2"), entriesByKey.get(3));
assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
assertNull(entriesByKey.get(6));
@@ -784,49 +716,47 @@ public abstract class AbstractWindowBytesStoreTest {
windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
windowStore.init((StateStoreContext) context, windowStore);
- final long startTime = SEGMENT_INTERVAL - 4L;
-
- windowStore.put(0, "zero", startTime);
+ windowStore.put(0, "zero", defaultStartTime);
assertEquals(
new HashSet<>(Collections.singletonList("zero")),
- valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime - WINDOW_SIZE),
- ofEpochMilli(startTime + WINDOW_SIZE))));
+ valuesToSet(windowStore.fetch(0, ofEpochMilli(defaultStartTime - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + WINDOW_SIZE))));
- windowStore.put(0, "zero", startTime);
- windowStore.put(0, "zero+", startTime);
- windowStore.put(0, "zero++", startTime);
+ windowStore.put(0, "zero", defaultStartTime);
+ windowStore.put(0, "zero+", defaultStartTime);
+ windowStore.put(0, "zero++", defaultStartTime);
assertEquals(
new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
valuesToSet(windowStore.fetch(
0,
- ofEpochMilli(startTime - WINDOW_SIZE),
- ofEpochMilli(startTime + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
valuesToSet(windowStore.fetch(
0,
- ofEpochMilli(startTime + 1L - WINDOW_SIZE),
- ofEpochMilli(startTime + 1L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 1L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 1L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
valuesToSet(windowStore.fetch(
0,
- ofEpochMilli(startTime + 2L - WINDOW_SIZE),
- ofEpochMilli(startTime + 2L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 2L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 2L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
valuesToSet(windowStore.fetch(
0,
- ofEpochMilli(startTime + 3L - WINDOW_SIZE),
- ofEpochMilli(startTime + 3L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 3L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 3L + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
0,
- ofEpochMilli(startTime + 4L - WINDOW_SIZE),
- ofEpochMilli(startTime + 4L + WINDOW_SIZE))));
+ ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE),
+ ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE))));
// Flush the store and verify all current entries were properly flushed ...
windowStore.flush();
@@ -836,7 +766,7 @@ public abstract class AbstractWindowBytesStoreTest {
changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value()));
}
- final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+ final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, defaultStartTime);
assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
}
@@ -903,7 +833,6 @@ public abstract class AbstractWindowBytesStoreTest {
@Test
public void testDeleteAndUpdate() {
-
final long currentTime = 0;
windowStore.put(1, "one", currentTime);
windowStore.put(1, "one v2", currentTime);
@@ -1176,6 +1105,7 @@ public abstract class AbstractWindowBytesStoreTest {
store.put(0, "zero", startTime);
store.put(1, "one", startTime + 1L);
store.put(2, "two", startTime + 2L);
+ store.put(3, "three", startTime + 2L);
store.put(4, "four", startTime + 4L);
store.put(5, "five", startTime + 5L);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
new file mode 100644
index 0000000..833ab6a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+@RunWith(Parameterized.class)
+public class WindowStoreFetchTest {
+ private enum StoreType { InMemory, RocksDB, Timed };
+ private static final String STORE_NAME = "store";
+ private static final int DATA_SIZE = 5;
+ private static final long WINDOW_SIZE = 500L;
+ private static final long RETENTION_MS = 10000L;
+
+ private StoreType storeType;
+ private boolean enableLogging;
+ private boolean enableCaching;
+ private boolean forward;
+
+ private LinkedList<KeyValue<Windowed<String>, Long>> expectedRecords;
+ private LinkedList<KeyValue<String, String>> records;
+ private Properties streamsConfig;
+
+ private TimeWindowedKStream<String, String> windowedStream;
+
+ public WindowStoreFetchTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) {
+ this.storeType = storeType;
+ this.enableLogging = enableLogging;
+ this.enableCaching = enableCaching;
+ this.forward = forward;
+
+ this.records = new LinkedList<>();
+ this.expectedRecords = new LinkedList<>();
+ final int m = DATA_SIZE / 2;
+ for (int i = 0; i < DATA_SIZE; i++) {
+ final String key = "key-" + i * 2;
+ final String value = "val-" + i * 2;
+ final KeyValue<String, String> r = new KeyValue<>(key, value);
+ records.add(r);
+ records.add(r);
+ // expected the count of each key is 2
+ final long windowStartTime = i < m ? 0 : WINDOW_SIZE;
+ expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(windowStartTime, windowStartTime + WINDOW_SIZE)), 2L));
+ }
+ }
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}")
+ public static Collection<Object[]> data() {
+ final List<StoreType> types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed);
+ final List<Boolean> logging = Arrays.asList(true, false);
+ final List<Boolean> caching = Arrays.asList(true, false);
+ final List<Boolean> forward = Arrays.asList(true, false);
+ return buildParameters(types, logging, caching, forward);
+ }
+
+ @Before
+ public void setup() {
+ streamsConfig = mkProperties(mkMap(
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
+ ));
+ }
+
+ @Test
+ public void testStoreConfig() {
+ final Materialized<String, Long, WindowStore<Bytes, byte[]>> stateStoreConfig = getStoreConfig(storeType, STORE_NAME, enableLogging, enableCaching);
+ //Create topology: table from input topic
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KStream<String, String> stream = builder.stream("input", Consumed.with(Serdes.String(), Serdes.String()));
+ stream.
+ groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE)))
+ .count(stateStoreConfig)
+ .toStream()
+ .to("output");
+
+ final Topology topology = builder.build();
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(topology)) {
+ //get input topic and stateStore
+ final TestInputTopic<String, String> input = driver
+ .createInputTopic("input", new StringSerializer(), new StringSerializer());
+ final WindowStore<String, Long> stateStore = driver.getWindowStore(STORE_NAME);
+
+ //write some data
+ final int medium = DATA_SIZE / 2 * 2;
+ for (int i = 0; i < records.size(); i++) {
+ final KeyValue<String, String> kv = records.get(i);
+ final long windowStartTime = i < medium ? 0 : WINDOW_SIZE;
+ input.pipeInput(kv.key, kv.value, windowStartTime + i);
+ }
+
+ // query the state store
+ try (final KeyValueIterator<Windowed<String>, Long> scanIterator = forward ?
+ stateStore.fetchAll(0, Long.MAX_VALUE) :
+ stateStore.backwardFetchAll(0, Long.MAX_VALUE)) {
+
+ final Iterator<KeyValue<Windowed<String>, Long>> dataIterator = forward ?
+ expectedRecords.iterator() :
+ expectedRecords.descendingIterator();
+
+ TestUtils.checkEquals(scanIterator, dataIterator);
+ }
+ }
+ }
+
+ private static Collection<Object[]> buildParameters(final List<?>... argOptions) {
+ List<Object[]> result = new LinkedList<>();
+ result.add(new Object[0]);
+
+ for (final List<?> argOption : argOptions) {
+ result = times(result, argOption);
+ }
+
+ return result;
+ }
+
+ private static List<Object[]> times(final List<Object[]> left, final List<?> right) {
+ final List<Object[]> result = new LinkedList<>();
+ for (final Object[] args : left) {
+ for (final Object rightElem : right) {
+ final Object[] resArgs = new Object[args.length + 1];
+ System.arraycopy(args, 0, resArgs, 0, args.length);
+ resArgs[args.length] = rightElem;
+ result.add(resArgs);
+ }
+ }
+ return result;
+ }
+
+ private Materialized<String, Long, WindowStore<Bytes, byte[]>> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) {
+ final Supplier<WindowBytesStoreSupplier> createStore = () -> {
+ if (type == StoreType.InMemory) {
+ return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE),
+ false);
+ } else if (type == StoreType.RocksDB) {
+ return Stores.persistentWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE),
+ false);
+ } else if (type == StoreType.Timed) {
+ return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE),
+ false);
+ } else {
+ return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE),
+ false);
+ }
+ };
+
+ final WindowBytesStoreSupplier stateStoreSupplier = createStore.get();
+ final Materialized<String, Long, WindowStore<Bytes, byte[]>> stateStoreConfig = Materialized
+ .<String, Long>as(stateStoreSupplier)
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Long());
+ if (cachingEnabled) {
+ stateStoreConfig.withCachingEnabled();
+ } else {
+ stateStoreConfig.withCachingDisabled();
+ }
+ if (loggingEnabled) {
+ stateStoreConfig.withLoggingEnabled(new HashMap<String, String>());
+ } else {
+ stateStoreConfig.withLoggingDisabled();
+ }
+ return stateStoreConfig;
+ }
+}