You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/14 21:21:08 UTC
kafka git commit: KAFKA-4863;
Querying window store may return unwanted keys
Repository: kafka
Updated Branches:
refs/heads/trunk d3b8ff024 -> 9e4548df3
KAFKA-4863; Querying window store may return unwanted keys
Make sure that the iterator returned from `WindowStore.fetch(..)` only returns matching keys, rather than all keys that are a prefix match.
Author: Damian Guy <da...@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes #2662 from dguy/kafka-4863
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e4548df
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e4548df
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e4548df
Branch: refs/heads/trunk
Commit: 9e4548df30d50a56ae99cb3383f1a3f97bbe77bb
Parents: d3b8ff0
Author: Damian Guy <da...@gmail.com>
Authored: Tue Mar 14 14:21:03 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 14 14:21:03 2017 -0700
----------------------------------------------------------------------
.../state/internals/CachingSessionStore.java | 52 --
.../state/internals/CachingWindowStore.java | 12 +-
.../state/internals/FilteredCacheIterator.java | 73 ++
.../state/internals/WindowKeySchema.java | 22 +-
.../state/internals/CachingWindowStoreTest.java | 25 +-
.../internals/FilteredCacheIteratorTest.java | 118 +++
.../state/internals/RocksDBWindowStoreTest.java | 843 ++++++++++---------
7 files changed, 663 insertions(+), 482 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 58c0133..a4b46ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
@@ -31,7 +30,6 @@ import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import java.util.List;
-import java.util.NoSuchElementException;
class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG>, CachedStateStore<Windowed<K>, AGG> {
@@ -162,54 +160,4 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
this.flushListener = flushListener;
}
- private static class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
- private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
- private final HasNextCondition hasNextCondition;
-
- FilteredCacheIterator(final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator,
- final HasNextCondition hasNextCondition) {
- this.cacheIterator = cacheIterator;
- this.hasNextCondition = hasNextCondition;
- }
-
- @Override
- public void close() {
- // no-op
- }
-
- @Override
- public Bytes peekNextKey() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- return cacheIterator.peekNextKey();
- }
-
- @Override
- public boolean hasNext() {
- return hasNextCondition.hasNext(cacheIterator);
- }
-
- @Override
- public KeyValue<Bytes, LRUCacheEntry> next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- return cacheIterator.next();
-
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public KeyValue<Bytes, LRUCacheEntry> peekNext() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- return cacheIterator.peekNext();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 7ea2fa4..4003e54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -38,6 +38,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final long windowSize;
+ private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema();
private String name;
private ThreadCache cache;
@@ -149,9 +150,16 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
Bytes fromBytes = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
Bytes toBytes = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes);
- final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
+ final Bytes keyBytes = Bytes.wrap(serdes.rawKey(key));
+ final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(keyBytes, timeFrom, timeTo);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, fromBytes, toBytes);
- return new MergedSortedCacheWindowStoreIterator<>(cacheIterator,
+
+ final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyBytes,
+ timeFrom,
+ timeTo);
+ final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition);
+
+ return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator,
underlyingIterator,
new StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde()));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
new file mode 100644
index 0000000..19370b9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.NoSuchElementException;
+
+class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
+ private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
+ private final HasNextCondition hasNextCondition;
+
+ FilteredCacheIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
+ final HasNextCondition hasNextCondition) {
+ this.cacheIterator = cacheIterator;
+ this.hasNextCondition = hasNextCondition;
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+
+ @Override
+ public Bytes peekNextKey() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return cacheIterator.peekNextKey();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return hasNextCondition.hasNext(cacheIterator);
+ }
+
+ @Override
+ public KeyValue<Bytes, LRUCacheEntry> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return cacheIterator.next();
+
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public KeyValue<Bytes, LRUCacheEntry> peekNext() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return cacheIterator.peekNext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 7ed598e..0a89da7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -24,12 +24,6 @@ import org.apache.kafka.streams.state.StateSerdes;
import java.util.List;
class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
- private static final HasNextCondition ITERATOR_HAS_NEXT = new HasNextCondition() {
- @Override
- public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
- return iterator.hasNext();
- }
- };
private final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("window-store-key-schema", Serdes.Bytes(), Serdes.ByteArray());
@Override
@@ -49,7 +43,21 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
@Override
public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) {
- return ITERATOR_HAS_NEXT;
+ return new HasNextCondition() {
+ @Override
+ public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
+ if (iterator.hasNext()) {
+ final Bytes bytes = iterator.peekNextKey();
+ final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
+ if (!keyBytes.equals(binaryKey)) {
+ return false;
+ }
+ final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
+ return time >= from && time <= to;
+ }
+ return false;
+ }
+ };
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index c7b6846..297a88e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
@@ -30,12 +31,17 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.util.List;
import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
+import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -53,13 +59,12 @@ public class CachingWindowStoreTest {
private ThreadCache cache;
private String topic;
private WindowKeySchema keySchema;
- private RocksDBWindowStore<Bytes, byte[]> windowStore;
@Before
public void setUp() throws Exception {
keySchema = new WindowKeySchema();
underlying = new RocksDBSegmentedBytesStore("test", 30000, 3, keySchema);
- windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false);
+ final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false);
cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>();
cachingStore = new CachingWindowStore<>(windowStore,
Serdes.String(),
@@ -73,6 +78,10 @@ public class CachingWindowStoreTest {
cachingStore.init(context, cachingStore);
}
+ @After
+ public void closeStore() {
+ cachingStore.close();
+ }
@Test
public void shouldPutFetchFromCache() throws Exception {
@@ -179,6 +188,18 @@ public class CachingWindowStoreTest {
cachingStore.put("a", "a");
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldFetchAndIterateOverExactKeys() throws Exception {
+ cachingStore.put("a", "0001", 0);
+ cachingStore.put("aa", "0002", 0);
+ cachingStore.put("a", "0003", 1);
+ cachingStore.put("aa", "0004", 1);
+ cachingStore.put("a", "0005", 60000);
+
+ final List<KeyValue<Long, String>> expected = Utils.mkList(KeyValue.pair(0L, "0001"), KeyValue.pair(1L, "0003"), KeyValue.pair(60000L, "0005"));
+ assertThat(toList(cachingStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected));
+ }
private int addItemsToCache() throws IOException {
int cachedSize = 0;
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
new file mode 100644
index 0000000..acded8c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FilteredCacheIteratorTest {
+
+ @SuppressWarnings("unchecked")
+ private final InMemoryKeyValueStore<Bytes, LRUCacheEntry> store = new InMemoryKeyValueStore("name", null, null);
+ private final KeyValue<Bytes, LRUCacheEntry> firstEntry = KeyValue.pair(Bytes.wrap("a".getBytes()),
+ new LRUCacheEntry("1".getBytes()));
+ private final List<KeyValue<Bytes, LRUCacheEntry>> entries = Utils.mkList(
+ firstEntry,
+ KeyValue.pair(Bytes.wrap("b".getBytes()),
+ new LRUCacheEntry("2".getBytes())),
+ KeyValue.pair(Bytes.wrap("c".getBytes()),
+ new LRUCacheEntry("3".getBytes())));
+
+ private FilteredCacheIterator allIterator;
+ private FilteredCacheIterator firstEntryIterator;
+
+ @Before
+ public void before() {
+ store.putAll(entries);
+ final HasNextCondition allCondition = new HasNextCondition() {
+ @Override
+ public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
+ return iterator.hasNext();
+ }
+ };
+ allIterator = new FilteredCacheIterator(
+ new DelegatingPeekingKeyValueIterator<>("",
+ store.all()), allCondition);
+
+ final HasNextCondition firstEntryCondition = new HasNextCondition() {
+ @Override
+ public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
+ return iterator.hasNext() && iterator.peekNextKey().equals(firstEntry.key);
+ }
+ };
+ firstEntryIterator = new FilteredCacheIterator(
+ new DelegatingPeekingKeyValueIterator<>("",
+ store.all()), firstEntryCondition);
+
+ }
+
+ @Test
+ public void shouldAllowEntryMatchingHasNextCondition() throws Exception {
+ final List<KeyValue<Bytes, LRUCacheEntry>> keyValues = toList(allIterator);
+ assertThat(keyValues, equalTo(entries));
+ }
+
+ @Test
+ public void shouldPeekNextKey() throws Exception {
+ while (allIterator.hasNext()) {
+ final Bytes nextKey = allIterator.peekNextKey();
+ final KeyValue<Bytes, LRUCacheEntry> next = allIterator.next();
+ assertThat(next.key, equalTo(nextKey));
+ }
+ }
+
+ @Test
+ public void shouldPeekNext() throws Exception {
+ while (allIterator.hasNext()) {
+ final KeyValue<Bytes, LRUCacheEntry> peeked = allIterator.peekNext();
+ final KeyValue<Bytes, LRUCacheEntry> next = allIterator.next();
+ assertThat(peeked, equalTo(next));
+ }
+ }
+
+ @Test
+ public void shouldNotHaveNextIfHasNextConditionNotMet() throws Exception {
+ assertTrue(firstEntryIterator.hasNext());
+ firstEntryIterator.next();
+ assertFalse(firstEntryIterator.hasNext());
+ }
+
+ @Test
+ public void shouldFilterEntriesNotMatchingHasNextCondition() throws Exception {
+ final List<KeyValue<Bytes, LRUCacheEntry>> keyValues = toList(firstEntryIterator);
+ assertThat(keyValues, equalTo(Utils.mkList(firstEntry)));
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void shouldThrowUnsupportedOperationExeceptionOnRemove() throws Exception {
+ allIterator.remove();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index b7dd942..7352673 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Test;
import java.io.File;
@@ -47,6 +48,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -87,6 +90,7 @@ public class RocksDBWindowStoreTest {
private final File baseDir = TestUtils.tempDirectory("test");
private final MockProcessorContext context = new MockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
+ private WindowStore windowStore;
@SuppressWarnings("unchecked")
private <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, final boolean enableCaching, final boolean retainDuplicates) {
@@ -96,9 +100,15 @@ public class RocksDBWindowStoreTest {
return store;
}
+ @After
+ public void closeStore() {
+ windowStore.close();
+ }
+
+ @SuppressWarnings("unchecked")
@Test
public void shouldOnlyIterateOpenSegments() throws Exception {
- final WindowStore<Integer, String> windowStore = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context, false, true);
long currentTime = 0;
context.setRecordContext(createRecordContext(currentTime));
windowStore.put(1, "one");
@@ -128,494 +138,461 @@ public class RocksDBWindowStoreTest {
return new ProcessorRecordContext(time, 0, 0, "topic");
}
+ @SuppressWarnings("unchecked")
@Test
public void testPutAndFetch() throws IOException {
- WindowStore<Integer, String> store = createWindowStore(context, false, true);
- try {
- long startTime = segmentSize - 4L;
-
- putFirstBatch(store, startTime, context);
-
- assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize)));
- assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize)));
-
- putSecondBatch(store, startTime, context);
-
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
- assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
- assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize)));
- assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize)));
- assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize)));
- assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize)));
- assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize)));
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
-
- Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
- assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
- assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
- assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
- assertNull(entriesByKey.get(3));
- assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
- assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
- assertNull(entriesByKey.get(6));
-
- } finally {
- store.close();
- }
+ windowStore = createWindowStore(context, false, true);
+ long startTime = segmentSize - 4L;
+
+ putFirstBatch(windowStore, startTime, context);
+
+ assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize)));
+ assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize)));
+
+ putSecondBatch(windowStore, startTime, context);
+
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
+ assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
+ assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize)));
+ assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize)));
+ assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize)));
+ assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize)));
+ assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize)));
+
+ // Flush the store and verify all current entries were properly flushed ...
+ windowStore.flush();
+
+ Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+ assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+ assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+ assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+ assertNull(entriesByKey.get(3));
+ assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+ assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+ assertNull(entriesByKey.get(6));
}
+ @SuppressWarnings("unchecked")
@Test
public void testPutAndFetchBefore() throws IOException {
- WindowStore<Integer, String> store = createWindowStore(context, false, true);
- try {
- long startTime = segmentSize - 4L;
-
- putFirstBatch(store, startTime, context);
-
- assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L)));
- assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L)));
-
- putSecondBatch(store, startTime, context);
-
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize, startTime + 0L)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
- assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
- assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
- assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
- assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
- assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
- assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
- assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
- assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize, startTime + 13L)));
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
-
- Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
- assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
- assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
- assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
- assertNull(entriesByKey.get(3));
- assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
- assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
- assertNull(entriesByKey.get(6));
-
- } finally {
- store.close();
- }
+ windowStore = createWindowStore(context, false, true);
+ long startTime = segmentSize - 4L;
+
+ putFirstBatch(windowStore, startTime, context);
+
+ assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L)));
+ assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L)));
+
+ putSecondBatch(windowStore, startTime, context);
+
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 0L - windowSize, startTime + 0L)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+ assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
+ assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
+ assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
+ assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
+ assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
+ assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
+ assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 13L - windowSize, startTime + 13L)));
+
+ // Flush the store and verify all current entries were properly flushed ...
+ windowStore.flush();
+
+ Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+ assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+ assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+ assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+ assertNull(entriesByKey.get(3));
+ assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+ assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+ assertNull(entriesByKey.get(6));
}
+ @SuppressWarnings("unchecked")
@Test
public void testPutAndFetchAfter() throws IOException {
- WindowStore<Integer, String> store = createWindowStore(context, false, true);
- try {
- long startTime = segmentSize - 4L;
-
- putFirstBatch(store, startTime, context);
-
- assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L, startTime + 0L + windowSize)));
- assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime + 1L + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime + 3L + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L, startTime + 4L + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L, startTime + 5L + windowSize)));
-
- putSecondBatch(store, startTime, context);
-
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime - 2L + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime - 1L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime, startTime + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
- assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
- assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
- assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
- assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
- assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
- assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L, startTime + 8L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime + 9L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime + 10L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime + 11L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime + 12L + windowSize)));
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
-
- Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
- assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
- assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
- assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
- assertNull(entriesByKey.get(3));
- assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
- assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
- assertNull(entriesByKey.get(6));
-
- } finally {
- store.close();
- }
+ windowStore = createWindowStore(context, false, true);
+ long startTime = segmentSize - 4L;
+
+ putFirstBatch(windowStore, startTime, context);
+
+ assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L, startTime + 0L + windowSize)));
+ assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L, startTime + 4L + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L, startTime + 5L + windowSize)));
+
+ putSecondBatch(windowStore, startTime, context);
+
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L, startTime - 2L + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L, startTime - 1L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime, startTime + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
+ assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
+ assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
+ assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
+ assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 8L, startTime + 8L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 9L, startTime + 9L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 10L, startTime + 10L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 11L, startTime + 11L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L, startTime + 12L + windowSize)));
+
+ // Flush the store and verify all current entries were properly flushed ...
+ windowStore.flush();
+
+ Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+ assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+ assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+ assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+ assertNull(entriesByKey.get(3));
+ assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+ assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+ assertNull(entriesByKey.get(6));
}
+ @SuppressWarnings("unchecked")
@Test
public void testPutSameKeyTimestamp() throws IOException {
- WindowStore<Integer, String> store = createWindowStore(context, false, true);
- try {
- long startTime = segmentSize - 4L;
-
- context.setRecordContext(createRecordContext(startTime));
- store.put(0, "zero");
+ windowStore = createWindowStore(context, false, true);
+ long startTime = segmentSize - 4L;
- assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+ context.setRecordContext(createRecordContext(startTime));
+ windowStore.put(0, "zero");
- store.put(0, "zero");
- store.put(0, "zero+");
- store.put(0, "zero++");
+ assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize)));
- assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize)));
- assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+ windowStore.put(0, "zero");
+ windowStore.put(0, "zero+");
+ windowStore.put(0, "zero++");
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize)));
- Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+ // Flush the store and verify all current entries were properly flushed ...
+ windowStore.flush();
- assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
+ Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
- } finally {
- store.close();
- }
+ assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
}
@Test
public void testCachingEnabled() throws IOException {
- WindowStore<Integer, String> store = createWindowStore(context, true, false);
- assertTrue(store instanceof CachedStateStore);
+ windowStore = createWindowStore(context, true, false);
+ assertTrue(windowStore instanceof CachedStateStore);
}
+ @SuppressWarnings("unchecked")
@Test
public void testRolling() throws IOException {
- WindowStore<Integer, String> store = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context, false, true);
+
+ // to validate segments
+ final Segments segments = new Segments(windowName, retentionPeriod, numSegments);
+ long startTime = segmentSize * 2;
+ long incr = segmentSize / 2;
+ context.setRecordContext(createRecordContext(startTime));
+ windowStore.put(0, "zero");
+ assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
+
+ context.setRecordContext(createRecordContext(startTime + incr));
+ windowStore.put(1, "one");
+ assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
+
+ context.setRecordContext(createRecordContext(startTime + incr * 2));
+ windowStore.put(2, "two");
+ assertEquals(Utils.mkSet(segments.segmentName(2),
+ segments.segmentName(3)), segmentDirs(baseDir));
+
+ context.setRecordContext(createRecordContext(startTime + incr * 4));
+ windowStore.put(4, "four");
+ assertEquals(Utils.mkSet(segments.segmentName(2),
+ segments.segmentName(3),
+ segments.segmentName(4)), segmentDirs(baseDir));
+
+
+ context.setRecordContext(createRecordContext(startTime + incr * 5));
+ windowStore.put(5, "five");
+ assertEquals(Utils.mkSet(segments.segmentName(2),
+ segments.segmentName(3),
+ segments.segmentName(4)), segmentDirs(baseDir));
+
+ assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+
+ context.setRecordContext(createRecordContext(startTime + incr * 6));
+ windowStore.put(6, "six");
+ assertEquals(Utils.mkSet(segments.segmentName(3),
+ segments.segmentName(4),
+ segments.segmentName(5)), segmentDirs(baseDir));
+
+
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+
+
+ context.setRecordContext(createRecordContext(startTime + incr * 7));
+ windowStore.put(7, "seven");
+ assertEquals(Utils.mkSet(segments.segmentName(3),
+ segments.segmentName(4),
+ segments.segmentName(5)), segmentDirs(baseDir));
+
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+ assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+
+ context.setRecordContext(createRecordContext(startTime + incr * 8));
+ windowStore.put(8, "eight");
+ assertEquals(Utils.mkSet(segments.segmentName(4),
+ segments.segmentName(5),
+ segments.segmentName(6)), segmentDirs(baseDir));
+
+
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+ assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+ assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
+
+ // check segment directories
+ windowStore.flush();
+ assertEquals(Utils.mkSet(segments.segmentName(4),
+ segments.segmentName(5),
+ segments.segmentName(6)), segmentDirs(baseDir));
+
- try {
- // to validate segments
- final Segments segments = new Segments(windowName, retentionPeriod, numSegments);
- long startTime = segmentSize * 2;
- long incr = segmentSize / 2;
- context.setRecordContext(createRecordContext(startTime));
- store.put(0, "zero");
- assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
-
- context.setRecordContext(createRecordContext(startTime + incr));
- store.put(1, "one");
- assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
-
- context.setRecordContext(createRecordContext(startTime + incr * 2));
- store.put(2, "two");
- assertEquals(Utils.mkSet(segments.segmentName(2),
- segments.segmentName(3)), segmentDirs(baseDir));
-
- context.setRecordContext(createRecordContext(startTime + incr * 4));
- store.put(4, "four");
- assertEquals(Utils.mkSet(segments.segmentName(2),
- segments.segmentName(3),
- segments.segmentName(4)), segmentDirs(baseDir));
-
-
- context.setRecordContext(createRecordContext(startTime + incr * 5));
- store.put(5, "five");
- assertEquals(Utils.mkSet(segments.segmentName(2),
- segments.segmentName(3),
- segments.segmentName(4)), segmentDirs(baseDir));
-
- assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-
- context.setRecordContext(createRecordContext(startTime + incr * 6));
- store.put(6, "six");
- assertEquals(Utils.mkSet(segments.segmentName(3),
- segments.segmentName(4),
- segments.segmentName(5)), segmentDirs(baseDir));
-
-
- assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
- assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-
-
- context.setRecordContext(createRecordContext(startTime + incr * 7));
- store.put(7, "seven");
- assertEquals(Utils.mkSet(segments.segmentName(3),
- segments.segmentName(4),
- segments.segmentName(5)), segmentDirs(baseDir));
-
- assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
- assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
- assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
-
- context.setRecordContext(createRecordContext(startTime + incr * 8));
- store.put(8, "eight");
- assertEquals(Utils.mkSet(segments.segmentName(4),
- segments.segmentName(5),
- segments.segmentName(6)), segmentDirs(baseDir));
-
-
- assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
- assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
- assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
- assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
-
- // check segment directories
- store.flush();
- assertEquals(Utils.mkSet(segments.segmentName(4),
- segments.segmentName(5),
- segments.segmentName(6)), segmentDirs(baseDir));
-
- } finally {
- store.close();
- }
}
+ @SuppressWarnings("unchecked")
@Test
public void testRestore() throws IOException {
long startTime = segmentSize * 2;
long incr = segmentSize / 2;
- WindowStore<Integer, String> store = createWindowStore(context, false, true);
- try {
- context.setRecordContext(createRecordContext(startTime));
- store.put(0, "zero");
- context.setRecordContext(createRecordContext(startTime + incr));
- store.put(1, "one");
- context.setRecordContext(createRecordContext(startTime + incr * 2));
- store.put(2, "two");
- context.setRecordContext(createRecordContext(startTime + incr * 3));
- store.put(3, "three");
- context.setRecordContext(createRecordContext(startTime + incr * 4));
- store.put(4, "four");
- context.setRecordContext(createRecordContext(startTime + incr * 5));
- store.put(5, "five");
- context.setRecordContext(createRecordContext(startTime + incr * 6));
- store.put(6, "six");
- context.setRecordContext(createRecordContext(startTime + incr * 7));
- store.put(7, "seven");
- context.setRecordContext(createRecordContext(startTime + incr * 8));
- store.put(8, "eight");
- store.flush();
-
- } finally {
- store.close();
- }
+ windowStore = createWindowStore(context, false, true);
+ context.setRecordContext(createRecordContext(startTime));
+ windowStore.put(0, "zero");
+ context.setRecordContext(createRecordContext(startTime + incr));
+ windowStore.put(1, "one");
+ context.setRecordContext(createRecordContext(startTime + incr * 2));
+ windowStore.put(2, "two");
+ context.setRecordContext(createRecordContext(startTime + incr * 3));
+ windowStore.put(3, "three");
+ context.setRecordContext(createRecordContext(startTime + incr * 4));
+ windowStore.put(4, "four");
+ context.setRecordContext(createRecordContext(startTime + incr * 5));
+ windowStore.put(5, "five");
+ context.setRecordContext(createRecordContext(startTime + incr * 6));
+ windowStore.put(6, "six");
+ context.setRecordContext(createRecordContext(startTime + incr * 7));
+ windowStore.put(7, "seven");
+ context.setRecordContext(createRecordContext(startTime + incr * 8));
+ windowStore.put(8, "eight");
+ windowStore.flush();
+
+ windowStore.close();
// remove local store image
Utils.delete(baseDir);
- WindowStore<Integer, String> store2 = createWindowStore(context, false, true);
- assertEquals(Utils.mkList(), toList(store2.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList(), toList(store2.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
- assertEquals(Utils.mkList(), toList(store2.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
- assertEquals(Utils.mkList(), toList(store2.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
- assertEquals(Utils.mkList(), toList(store2.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
- assertEquals(Utils.mkList(), toList(store2.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
- assertEquals(Utils.mkList(), toList(store2.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
- assertEquals(Utils.mkList(), toList(store2.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
- assertEquals(Utils.mkList(), toList(store2.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
-
- try {
- context.restore(windowName, changeLog);
-
- assertEquals(Utils.mkList(), toList(store2.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList(), toList(store2.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
- assertEquals(Utils.mkList(), toList(store2.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
- assertEquals(Utils.mkList(), toList(store2.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store2.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store2.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
- assertEquals(Utils.mkList("six"), toList(store2.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
- assertEquals(Utils.mkList("seven"), toList(store2.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
- assertEquals(Utils.mkList("eight"), toList(store2.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
-
- // check segment directories
- store2.flush();
- assertEquals(
- Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
- segmentDirs(baseDir)
- );
- } finally {
- store2.close();
- }
+ windowStore = createWindowStore(context, false, true);
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
+
+ context.restore(windowName, changeLog);
+
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+ assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+ assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
+
+ // check segment directories
+ windowStore.flush();
+ assertEquals(
+ Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
+ segmentDirs(baseDir)
+ );
}
+ @SuppressWarnings("unchecked")
@Test
public void testSegmentMaintenance() throws IOException {
- WindowStore<Integer, String> store = createWindowStore(context, false, true);
- try {
- context.setTime(0L);
- context.setRecordContext(createRecordContext(0));
- store.put(0, "v");
- assertEquals(
- Utils.mkSet(segments.segmentName(0L)),
- segmentDirs(baseDir)
- );
-
- context.setRecordContext(createRecordContext(59999));
- store.put(0, "v");
- store.put(0, "v");
- assertEquals(
- Utils.mkSet(segments.segmentName(0L)),
- segmentDirs(baseDir)
- );
-
- context.setRecordContext(createRecordContext(60000));
- store.put(0, "v");
- assertEquals(
- Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
- segmentDirs(baseDir)
- );
-
- WindowStoreIterator iter;
- int fetchedCount;
-
- iter = store.fetch(0, 0L, 240000L);
- fetchedCount = 0;
- while (iter.hasNext()) {
- iter.next();
- fetchedCount++;
- }
- assertEquals(4, fetchedCount);
+ windowStore = createWindowStore(context, false, true);
+ context.setTime(0L);
+ context.setRecordContext(createRecordContext(0));
+ windowStore.put(0, "v");
+ assertEquals(
+ Utils.mkSet(segments.segmentName(0L)),
+ segmentDirs(baseDir)
+ );
+
+ context.setRecordContext(createRecordContext(59999));
+ windowStore.put(0, "v");
+ windowStore.put(0, "v");
+ assertEquals(
+ Utils.mkSet(segments.segmentName(0L)),
+ segmentDirs(baseDir)
+ );
+
+ context.setRecordContext(createRecordContext(60000));
+ windowStore.put(0, "v");
+ assertEquals(
+ Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
+ segmentDirs(baseDir)
+ );
+
+ WindowStoreIterator iter;
+ int fetchedCount;
+
+ iter = windowStore.fetch(0, 0L, 240000L);
+ fetchedCount = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ fetchedCount++;
+ }
+ assertEquals(4, fetchedCount);
- assertEquals(
- Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
- segmentDirs(baseDir)
- );
+ assertEquals(
+ Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
+ segmentDirs(baseDir)
+ );
- context.setRecordContext(createRecordContext(180000));
- store.put(0, "v");
+ context.setRecordContext(createRecordContext(180000));
+ windowStore.put(0, "v");
- iter = store.fetch(0, 0L, 240000L);
- fetchedCount = 0;
- while (iter.hasNext()) {
- iter.next();
- fetchedCount++;
- }
- assertEquals(2, fetchedCount);
+ iter = windowStore.fetch(0, 0L, 240000L);
+ fetchedCount = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ fetchedCount++;
+ }
+ assertEquals(2, fetchedCount);
- assertEquals(
- Utils.mkSet(segments.segmentName(1L), segments.segmentName(3L)),
- segmentDirs(baseDir)
- );
+ assertEquals(
+ Utils.mkSet(segments.segmentName(1L), segments.segmentName(3L)),
+ segmentDirs(baseDir)
+ );
- context.setRecordContext(createRecordContext(300000));
- store.put(0, "v");
+ context.setRecordContext(createRecordContext(300000));
+ windowStore.put(0, "v");
- iter = store.fetch(0, 240000L, 1000000L);
- fetchedCount = 0;
- while (iter.hasNext()) {
- iter.next();
- fetchedCount++;
- }
- assertEquals(1, fetchedCount);
+ iter = windowStore.fetch(0, 240000L, 1000000L);
+ fetchedCount = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ fetchedCount++;
+ }
+ assertEquals(1, fetchedCount);
- assertEquals(
- Utils.mkSet(segments.segmentName(3L), segments.segmentName(5L)),
- segmentDirs(baseDir)
- );
+ assertEquals(
+ Utils.mkSet(segments.segmentName(3L), segments.segmentName(5L)),
+ segmentDirs(baseDir)
+ );
- } finally {
- store.close();
- }
}
+ @SuppressWarnings("unchecked")
@Test
public void testInitialLoading() throws IOException {
File storeDir = new File(baseDir, windowName);
- WindowStore<Integer, String> store = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context, false, true);
- try {
- new File(storeDir, segments.segmentName(0L)).mkdir();
- new File(storeDir, segments.segmentName(1L)).mkdir();
- new File(storeDir, segments.segmentName(2L)).mkdir();
- new File(storeDir, segments.segmentName(3L)).mkdir();
- new File(storeDir, segments.segmentName(4L)).mkdir();
- new File(storeDir, segments.segmentName(5L)).mkdir();
- new File(storeDir, segments.segmentName(6L)).mkdir();
- } finally {
- store.close();
- }
+ new File(storeDir, segments.segmentName(0L)).mkdir();
+ new File(storeDir, segments.segmentName(1L)).mkdir();
+ new File(storeDir, segments.segmentName(2L)).mkdir();
+ new File(storeDir, segments.segmentName(3L)).mkdir();
+ new File(storeDir, segments.segmentName(4L)).mkdir();
+ new File(storeDir, segments.segmentName(5L)).mkdir();
+ new File(storeDir, segments.segmentName(6L)).mkdir();
+ windowStore.close();
- store = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context, false, true);
- try {
- assertEquals(
- Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
- segmentDirs(baseDir)
- );
+ assertEquals(
+ Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
+ segmentDirs(baseDir)
+ );
- try (WindowStoreIterator iter = store.fetch(0, 0L, 1000000L)) {
- while (iter.hasNext()) {
- iter.next();
- }
+ try (WindowStoreIterator iter = windowStore.fetch(0, 0L, 1000000L)) {
+ while (iter.hasNext()) {
+ iter.next();
}
-
- assertEquals(
- Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
- segmentDirs(baseDir)
- );
-
- } finally {
- store.close();
}
+
+ assertEquals(
+ Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
+ segmentDirs(baseDir)
+ );
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() throws Exception {
- final WindowStore<Integer, String> windowStore = createWindowStore(context, false, true);
+ windowStore = createWindowStore(context, false, true);
context.setRecordContext(createRecordContext(0));
windowStore.put(1, "one", 1L);
windowStore.put(1, "two", 2L);
@@ -639,6 +616,34 @@ public class RocksDBWindowStoreTest {
}
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldFetchAndIterateOverExactKeys() throws Exception {
+ final RocksDBWindowStoreSupplier<String, String> supplier =
+ new RocksDBWindowStoreSupplier<>(
+ "window",
+ 60 * 1000L * 2, 3,
+ true,
+ Serdes.String(),
+ Serdes.String(),
+ windowSize,
+ true,
+ Collections.<String, String>emptyMap(),
+ false);
+
+ windowStore = supplier.get();
+ windowStore.init(context, windowStore);
+
+ windowStore.put("a", "0001", 0);
+ windowStore.put("aa", "0002", 0);
+ windowStore.put("a", "0003", 1);
+ windowStore.put("aa", "0004", 1);
+ windowStore.put("a", "0005", 60000);
+
+ final List expected = Utils.mkList("0001", "0003", "0005");
+ assertThat(toList(windowStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected));
+ }
+
private void putFirstBatch(final WindowStore<Integer, String> store, final long startTime, final MockProcessorContext context) {
context.setRecordContext(createRecordContext(startTime));
store.put(0, "zero");