You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/14 21:21:08 UTC

kafka git commit: KAFKA-4863; Querying window store may return unwanted keys

Repository: kafka
Updated Branches:
  refs/heads/trunk d3b8ff024 -> 9e4548df3


KAFKA-4863; Querying window store may return unwanted keys

Make sure that the iterator returned from `WindowStore.fetch(..)` only returns matching keys, rather than all keys that are a prefix match.

Author: Damian Guy <da...@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #2662 from dguy/kafka-4863


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e4548df
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e4548df
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e4548df

Branch: refs/heads/trunk
Commit: 9e4548df30d50a56ae99cb3383f1a3f97bbe77bb
Parents: d3b8ff0
Author: Damian Guy <da...@gmail.com>
Authored: Tue Mar 14 14:21:03 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 14 14:21:03 2017 -0700

----------------------------------------------------------------------
 .../state/internals/CachingSessionStore.java    |  52 --
 .../state/internals/CachingWindowStore.java     |  12 +-
 .../state/internals/FilteredCacheIterator.java  |  73 ++
 .../state/internals/WindowKeySchema.java        |  22 +-
 .../state/internals/CachingWindowStoreTest.java |  25 +-
 .../internals/FilteredCacheIteratorTest.java    | 118 +++
 .../state/internals/RocksDBWindowStoreTest.java | 843 ++++++++++---------
 7 files changed, 663 insertions(+), 482 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 58c0133..a4b46ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
@@ -31,7 +30,6 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.List;
-import java.util.NoSuchElementException;
 
 
 class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG>, CachedStateStore<Windowed<K>, AGG> {
@@ -162,54 +160,4 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
         this.flushListener = flushListener;
     }
 
-    private static class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
-        private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
-        private final HasNextCondition hasNextCondition;
-
-        FilteredCacheIterator(final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator,
-                              final HasNextCondition hasNextCondition) {
-            this.cacheIterator = cacheIterator;
-            this.hasNextCondition = hasNextCondition;
-        }
-
-        @Override
-        public void close() {
-            // no-op
-        }
-
-        @Override
-        public Bytes peekNextKey() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            return cacheIterator.peekNextKey();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return hasNextCondition.hasNext(cacheIterator);
-        }
-
-        @Override
-        public KeyValue<Bytes, LRUCacheEntry> next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            return cacheIterator.next();
-
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public KeyValue<Bytes, LRUCacheEntry> peekNext() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            return cacheIterator.peekNext();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 7ea2fa4..4003e54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -38,6 +38,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private final long windowSize;
+    private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema();
 
     private String name;
     private ThreadCache cache;
@@ -149,9 +150,16 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
         Bytes fromBytes = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
         Bytes toBytes = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes);
 
-        final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
+        final Bytes keyBytes = Bytes.wrap(serdes.rawKey(key));
+        final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(keyBytes, timeFrom, timeTo);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, fromBytes, toBytes);
-        return new MergedSortedCacheWindowStoreIterator<>(cacheIterator,
+
+        final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyBytes,
+                                                                             timeFrom,
+                                                                             timeTo);
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition);
+
+        return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator,
                                                           underlyingIterator,
                                                           new StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde()));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
new file mode 100644
index 0000000..19370b9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.NoSuchElementException;
+
+class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
+    private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
+    private final HasNextCondition hasNextCondition;
+
+    FilteredCacheIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
+                          final HasNextCondition hasNextCondition) {
+        this.cacheIterator = cacheIterator;
+        this.hasNextCondition = hasNextCondition;
+    }
+
+    @Override
+    public void close() {
+        // no-op
+    }
+
+    @Override
+    public Bytes peekNextKey() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        return cacheIterator.peekNextKey();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return hasNextCondition.hasNext(cacheIterator);
+    }
+
+    @Override
+    public KeyValue<Bytes, LRUCacheEntry> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        return cacheIterator.next();
+
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public KeyValue<Bytes, LRUCacheEntry> peekNext() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        return cacheIterator.peekNext();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 7ed598e..0a89da7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -24,12 +24,6 @@ import org.apache.kafka.streams.state.StateSerdes;
 import java.util.List;
 
 class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
-    private static final HasNextCondition ITERATOR_HAS_NEXT = new HasNextCondition() {
-        @Override
-        public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
-            return iterator.hasNext();
-        }
-    };
     private final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("window-store-key-schema", Serdes.Bytes(), Serdes.ByteArray());
 
     @Override
@@ -49,7 +43,21 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
 
     @Override
     public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) {
-        return ITERATOR_HAS_NEXT;
+        return new HasNextCondition() {
+            @Override
+            public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
+                if (iterator.hasNext()) {
+                    final Bytes bytes = iterator.peekNextKey();
+                    final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
+                    if (!keyBytes.equals(binaryKey)) {
+                        return false;
+                    }
+                    final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
+                    return time >= from && time <= to;
+                }
+                return false;
+            }
+        };
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index c7b6846..297a88e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -30,12 +31,17 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.List;
 
 import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
+import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -53,13 +59,12 @@ public class CachingWindowStoreTest {
     private ThreadCache cache;
     private String topic;
     private WindowKeySchema keySchema;
-    private RocksDBWindowStore<Bytes, byte[]> windowStore;
 
     @Before
     public void setUp() throws Exception {
         keySchema = new WindowKeySchema();
         underlying = new RocksDBSegmentedBytesStore("test", 30000, 3, keySchema);
-        windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false);
+        final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false);
         cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>();
         cachingStore = new CachingWindowStore<>(windowStore,
                                                 Serdes.String(),
@@ -73,6 +78,10 @@ public class CachingWindowStoreTest {
         cachingStore.init(context, cachingStore);
     }
 
+    @After
+    public void closeStore() {
+        cachingStore.close();
+    }
 
     @Test
     public void shouldPutFetchFromCache() throws Exception {
@@ -179,6 +188,18 @@ public class CachingWindowStoreTest {
         cachingStore.put("a", "a");
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldFetchAndIterateOverExactKeys() throws Exception {
+        cachingStore.put("a", "0001", 0);
+        cachingStore.put("aa", "0002", 0);
+        cachingStore.put("a", "0003", 1);
+        cachingStore.put("aa", "0004", 1);
+        cachingStore.put("a", "0005", 60000);
+
+        final List<KeyValue<Long, String>> expected = Utils.mkList(KeyValue.pair(0L, "0001"), KeyValue.pair(1L, "0003"), KeyValue.pair(60000L, "0005"));
+        assertThat(toList(cachingStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected));
+    }
 
     private int addItemsToCache() throws IOException {
         int cachedSize = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
new file mode 100644
index 0000000..acded8c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FilteredCacheIteratorTest {
+
+    @SuppressWarnings("unchecked")
+    private final InMemoryKeyValueStore<Bytes, LRUCacheEntry> store = new InMemoryKeyValueStore("name", null, null);
+    private final KeyValue<Bytes, LRUCacheEntry> firstEntry = KeyValue.pair(Bytes.wrap("a".getBytes()),
+                                                                            new LRUCacheEntry("1".getBytes()));
+    private final List<KeyValue<Bytes, LRUCacheEntry>> entries = Utils.mkList(
+            firstEntry,
+            KeyValue.pair(Bytes.wrap("b".getBytes()),
+                          new LRUCacheEntry("2".getBytes())),
+            KeyValue.pair(Bytes.wrap("c".getBytes()),
+                          new LRUCacheEntry("3".getBytes())));
+
+    private FilteredCacheIterator allIterator;
+    private FilteredCacheIterator firstEntryIterator;
+
+    @Before
+    public void before() {
+        store.putAll(entries);
+        final HasNextCondition allCondition = new HasNextCondition() {
+            @Override
+            public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
+                return iterator.hasNext();
+            }
+        };
+        allIterator = new FilteredCacheIterator(
+                new DelegatingPeekingKeyValueIterator<>("",
+                                                        store.all()), allCondition);
+
+        final HasNextCondition firstEntryCondition = new HasNextCondition() {
+            @Override
+            public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
+                return iterator.hasNext() && iterator.peekNextKey().equals(firstEntry.key);
+            }
+        };
+        firstEntryIterator = new FilteredCacheIterator(
+                new DelegatingPeekingKeyValueIterator<>("",
+                                                        store.all()), firstEntryCondition);
+
+    }
+
+    @Test
+    public void shouldAllowEntryMatchingHasNextCondition() throws Exception {
+        final List<KeyValue<Bytes, LRUCacheEntry>> keyValues = toList(allIterator);
+        assertThat(keyValues, equalTo(entries));
+    }
+
+    @Test
+    public void shouldPeekNextKey() throws Exception {
+        while (allIterator.hasNext()) {
+            final Bytes nextKey = allIterator.peekNextKey();
+            final KeyValue<Bytes, LRUCacheEntry> next = allIterator.next();
+            assertThat(next.key, equalTo(nextKey));
+        }
+    }
+
+    @Test
+    public void shouldPeekNext() throws Exception {
+        while (allIterator.hasNext()) {
+            final KeyValue<Bytes, LRUCacheEntry> peeked = allIterator.peekNext();
+            final KeyValue<Bytes, LRUCacheEntry> next = allIterator.next();
+            assertThat(peeked, equalTo(next));
+        }
+    }
+
+    @Test
+    public void shouldNotHaveNextIfHasNextConditionNotMet() throws Exception {
+        assertTrue(firstEntryIterator.hasNext());
+        firstEntryIterator.next();
+        assertFalse(firstEntryIterator.hasNext());
+    }
+
+    @Test
+    public void shouldFilterEntriesNotMatchingHasNextCondition() throws Exception {
+        final List<KeyValue<Bytes, LRUCacheEntry>> keyValues = toList(firstEntryIterator);
+        assertThat(keyValues, equalTo(Utils.mkList(firstEntry)));
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldThrowUnsupportedOperationExeceptionOnRemove() throws Exception {
+        allIterator.remove();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e4548df/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index b7dd942..7352673 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Test;
 
 import java.io.File;
@@ -47,6 +48,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -87,6 +90,7 @@ public class RocksDBWindowStoreTest {
 
     private final File baseDir = TestUtils.tempDirectory("test");
     private final MockProcessorContext context = new MockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
+    private WindowStore windowStore;
 
     @SuppressWarnings("unchecked")
     private <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, final boolean enableCaching, final boolean retainDuplicates) {
@@ -96,9 +100,15 @@ public class RocksDBWindowStoreTest {
         return store;
     }
 
+    @After
+    public void closeStore() {
+        windowStore.close();
+    }
+
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldOnlyIterateOpenSegments() throws Exception {
-        final WindowStore<Integer, String> windowStore = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context, false, true);
         long currentTime = 0;
         context.setRecordContext(createRecordContext(currentTime));
         windowStore.put(1, "one");
@@ -128,494 +138,461 @@ public class RocksDBWindowStoreTest {
         return new ProcessorRecordContext(time, 0, 0, "topic");
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testPutAndFetch() throws IOException {
-        WindowStore<Integer, String> store = createWindowStore(context, false, true);
-        try {
-            long startTime = segmentSize - 4L;
-
-            putFirstBatch(store, startTime, context);
-
-            assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize)));
-            assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize)));
-            assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize)));
-            assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize)));
-            assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize)));
-
-            putSecondBatch(store, startTime, context);
-
-            assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize)));
-            assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize)));
-            assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime - windowSize, startTime + windowSize)));
-            assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize)));
-            assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
-            assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize)));
-            assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
-            assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
-            assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
-            assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize)));
-            assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize)));
-            assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize)));
-            assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize)));
-            assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize)));
-
-            // Flush the store and verify all current entries were properly flushed ...
-            store.flush();
-
-            Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
-            assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
-            assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
-            assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
-            assertNull(entriesByKey.get(3));
-            assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
-            assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
-            assertNull(entriesByKey.get(6));
-
-        } finally {
-            store.close();
-        }
+        windowStore = createWindowStore(context, false, true);
+        long startTime = segmentSize - 4L;
+
+        putFirstBatch(windowStore, startTime, context);
+
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize)));
+        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize)));
+
+        putSecondBatch(windowStore, startTime, context);
+
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
+        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
+        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize)));
+        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize)));
+        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize)));
+        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize)));
+        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize)));
+
+        // Flush the store and verify all current entries were properly flushed ...
+        windowStore.flush();
+
+        Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+        assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+        assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+        assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+        assertNull(entriesByKey.get(3));
+        assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+        assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+        assertNull(entriesByKey.get(6));
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testPutAndFetchBefore() throws IOException {
-        WindowStore<Integer, String> store = createWindowStore(context, false, true);
-        try {
-            long startTime = segmentSize - 4L;
-
-            putFirstBatch(store, startTime, context);
-
-            assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L)));
-            assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L)));
-            assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
-            assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L)));
-            assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L)));
-            assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L)));
-
-            putSecondBatch(store, startTime, context);
-
-            assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L)));
-            assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize, startTime + 0L)));
-            assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L)));
-            assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
-            assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
-            assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
-            assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
-            assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
-            assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
-            assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
-            assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
-            assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
-            assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L)));
-            assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L)));
-            assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize, startTime + 13L)));
-
-            // Flush the store and verify all current entries were properly flushed ...
-            store.flush();
-
-            Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
-            assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
-            assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
-            assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
-            assertNull(entriesByKey.get(3));
-            assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
-            assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
-            assertNull(entriesByKey.get(6));
-
-        } finally {
-            store.close();
-        }
+        windowStore = createWindowStore(context, false, true);
+        long startTime = segmentSize - 4L;
+
+        putFirstBatch(windowStore, startTime, context);
+
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L)));
+        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L)));
+
+        putSecondBatch(windowStore, startTime, context);
+
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 0L - windowSize, startTime + 0L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
+        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
+        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
+        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
+        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
+        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
+        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 13L - windowSize, startTime + 13L)));
+
+        // Flush the store and verify all current entries were properly flushed ...
+        windowStore.flush();
+
+        Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+        assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+        assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+        assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+        assertNull(entriesByKey.get(3));
+        assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+        assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+        assertNull(entriesByKey.get(6));
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testPutAndFetchAfter() throws IOException {
-        WindowStore<Integer, String> store = createWindowStore(context, false, true);
-        try {
-            long startTime = segmentSize - 4L;
-
-            putFirstBatch(store, startTime, context);
-
-            assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L, startTime + 0L + windowSize)));
-            assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime + 1L + windowSize)));
-            assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime + 3L + windowSize)));
-            assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L, startTime + 4L + windowSize)));
-            assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L, startTime + 5L + windowSize)));
-
-            putSecondBatch(store, startTime, context);
-
-            assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime - 2L + windowSize)));
-            assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime - 1L + windowSize)));
-            assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime, startTime + windowSize)));
-            assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
-            assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
-            assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
-            assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
-            assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
-            assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
-            assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
-            assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L, startTime + 8L + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime + 9L + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime + 10L + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime + 11L + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime + 12L + windowSize)));
-
-            // Flush the store and verify all current entries were properly flushed ...
-            store.flush();
-
-            Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
-            assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
-            assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
-            assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
-            assertNull(entriesByKey.get(3));
-            assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
-            assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
-            assertNull(entriesByKey.get(6));
-
-        } finally {
-            store.close();
-        }
+        windowStore = createWindowStore(context, false, true);
+        long startTime = segmentSize - 4L;
+
+        putFirstBatch(windowStore, startTime, context);
+
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L, startTime + 0L + windowSize)));
+        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L, startTime + 1L + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L, startTime + 3L + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L, startTime + 4L + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L, startTime + 5L + windowSize)));
+
+        putSecondBatch(windowStore, startTime, context);
+
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L, startTime - 2L + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L, startTime - 1L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime, startTime + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
+        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
+        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
+        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
+        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
+        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 8L, startTime + 8L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 9L, startTime + 9L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 10L, startTime + 10L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 11L, startTime + 11L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L, startTime + 12L + windowSize)));
+
+        // Flush the store and verify all current entries were properly flushed ...
+        windowStore.flush();
+
+        Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+        assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+        assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+        assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+        assertNull(entriesByKey.get(3));
+        assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+        assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+        assertNull(entriesByKey.get(6));
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testPutSameKeyTimestamp() throws IOException {
-        WindowStore<Integer, String> store = createWindowStore(context, false, true);
-        try {
-            long startTime = segmentSize - 4L;
-
-            context.setRecordContext(createRecordContext(startTime));
-            store.put(0, "zero");
+        windowStore = createWindowStore(context, false, true);
+        long startTime = segmentSize - 4L;
 
-            assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+        context.setRecordContext(createRecordContext(startTime));
+        windowStore.put(0, "zero");
 
-            store.put(0, "zero");
-            store.put(0, "zero+");
-            store.put(0, "zero++");
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
 
-            assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
-            assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize)));
-            assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize)));
-            assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+        windowStore.put(0, "zero");
+        windowStore.put(0, "zero+");
+        windowStore.put(0, "zero++");
 
-            // Flush the store and verify all current entries were properly flushed ...
-            store.flush();
+        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize)));
 
-            Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+        // Flush the store and verify all current entries were properly flushed ...
+        windowStore.flush();
 
-            assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
+        Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
 
-        } finally {
-            store.close();
-        }
+        assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
     }
 
     @Test
     public void testCachingEnabled() throws IOException {
-        WindowStore<Integer, String> store = createWindowStore(context, true, false);
-        assertTrue(store instanceof CachedStateStore);
+        windowStore = createWindowStore(context, true, false);
+        assertTrue(windowStore instanceof CachedStateStore);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testRolling() throws IOException {
-        WindowStore<Integer, String> store = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context, false, true);
+
+        // to validate segments
+        final Segments segments = new Segments(windowName, retentionPeriod, numSegments);
+        long startTime = segmentSize * 2;
+        long incr = segmentSize / 2;
+        context.setRecordContext(createRecordContext(startTime));
+        windowStore.put(0, "zero");
+        assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
+
+        context.setRecordContext(createRecordContext(startTime + incr));
+        windowStore.put(1, "one");
+        assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
+
+        context.setRecordContext(createRecordContext(startTime + incr * 2));
+        windowStore.put(2, "two");
+        assertEquals(Utils.mkSet(segments.segmentName(2),
+                                 segments.segmentName(3)), segmentDirs(baseDir));
+
+        context.setRecordContext(createRecordContext(startTime + incr * 4));
+        windowStore.put(4, "four");
+        assertEquals(Utils.mkSet(segments.segmentName(2),
+                                 segments.segmentName(3),
+                                 segments.segmentName(4)), segmentDirs(baseDir));
+
+
+        context.setRecordContext(createRecordContext(startTime + incr * 5));
+        windowStore.put(5, "five");
+        assertEquals(Utils.mkSet(segments.segmentName(2),
+                                 segments.segmentName(3),
+                                 segments.segmentName(4)), segmentDirs(baseDir));
+
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+
+        context.setRecordContext(createRecordContext(startTime + incr * 6));
+        windowStore.put(6, "six");
+        assertEquals(Utils.mkSet(segments.segmentName(3),
+                                 segments.segmentName(4),
+                                 segments.segmentName(5)), segmentDirs(baseDir));
+
+
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+
+
+        context.setRecordContext(createRecordContext(startTime + incr * 7));
+        windowStore.put(7, "seven");
+        assertEquals(Utils.mkSet(segments.segmentName(3),
+                                 segments.segmentName(4),
+                                 segments.segmentName(5)), segmentDirs(baseDir));
+
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+
+        context.setRecordContext(createRecordContext(startTime + incr * 8));
+        windowStore.put(8, "eight");
+        assertEquals(Utils.mkSet(segments.segmentName(4),
+                                 segments.segmentName(5),
+                                 segments.segmentName(6)), segmentDirs(baseDir));
+
+
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+        assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
+
+        // check segment directories
+        windowStore.flush();
+        assertEquals(Utils.mkSet(segments.segmentName(4),
+                                 segments.segmentName(5),
+                                 segments.segmentName(6)), segmentDirs(baseDir));
+
 
-        try {
-            // to validate segments
-            final Segments segments = new Segments(windowName, retentionPeriod, numSegments);
-            long startTime = segmentSize * 2;
-            long incr = segmentSize / 2;
-            context.setRecordContext(createRecordContext(startTime));
-            store.put(0, "zero");
-            assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
-
-            context.setRecordContext(createRecordContext(startTime + incr));
-            store.put(1, "one");
-            assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
-
-            context.setRecordContext(createRecordContext(startTime + incr * 2));
-            store.put(2, "two");
-            assertEquals(Utils.mkSet(segments.segmentName(2),
-                    segments.segmentName(3)), segmentDirs(baseDir));
-
-            context.setRecordContext(createRecordContext(startTime + incr * 4));
-            store.put(4, "four");
-            assertEquals(Utils.mkSet(segments.segmentName(2),
-                    segments.segmentName(3),
-                    segments.segmentName(4)), segmentDirs(baseDir));
-
-
-            context.setRecordContext(createRecordContext(startTime + incr * 5));
-            store.put(5, "five");
-            assertEquals(Utils.mkSet(segments.segmentName(2),
-                    segments.segmentName(3),
-                    segments.segmentName(4)), segmentDirs(baseDir));
-
-            assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
-            assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-            assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-            assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-            assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-
-            context.setRecordContext(createRecordContext(startTime + incr * 6));
-            store.put(6, "six");
-            assertEquals(Utils.mkSet(segments.segmentName(3),
-                    segments.segmentName(4),
-                    segments.segmentName(5)), segmentDirs(baseDir));
-
-
-            assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-            assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-            assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-            assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-            assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-
-
-            context.setRecordContext(createRecordContext(startTime + incr * 7));
-            store.put(7, "seven");
-            assertEquals(Utils.mkSet(segments.segmentName(3),
-                    segments.segmentName(4),
-                    segments.segmentName(5)), segmentDirs(baseDir));
-
-            assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-            assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-            assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-            assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-            assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-            assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
-
-            context.setRecordContext(createRecordContext(startTime + incr * 8));
-            store.put(8, "eight");
-            assertEquals(Utils.mkSet(segments.segmentName(4),
-                    segments.segmentName(5),
-                    segments.segmentName(6)), segmentDirs(baseDir));
-
-
-            assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-            assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-            assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-            assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-            assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-            assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
-            assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
-
-            // check segment directories
-            store.flush();
-            assertEquals(Utils.mkSet(segments.segmentName(4),
-                    segments.segmentName(5),
-                    segments.segmentName(6)), segmentDirs(baseDir));
-
-        } finally {
-            store.close();
-        }
     }
 
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testRestore() throws IOException {
         long startTime = segmentSize * 2;
         long incr = segmentSize / 2;
 
-        WindowStore<Integer, String> store = createWindowStore(context, false, true);
-        try {
-            context.setRecordContext(createRecordContext(startTime));
-            store.put(0, "zero");
-            context.setRecordContext(createRecordContext(startTime + incr));
-            store.put(1, "one");
-            context.setRecordContext(createRecordContext(startTime + incr * 2));
-            store.put(2, "two");
-            context.setRecordContext(createRecordContext(startTime + incr * 3));
-            store.put(3, "three");
-            context.setRecordContext(createRecordContext(startTime + incr * 4));
-            store.put(4, "four");
-            context.setRecordContext(createRecordContext(startTime + incr * 5));
-            store.put(5, "five");
-            context.setRecordContext(createRecordContext(startTime + incr * 6));
-            store.put(6, "six");
-            context.setRecordContext(createRecordContext(startTime + incr * 7));
-            store.put(7, "seven");
-            context.setRecordContext(createRecordContext(startTime + incr * 8));
-            store.put(8, "eight");
-            store.flush();
-
-        } finally {
-            store.close();
-        }
+        windowStore = createWindowStore(context, false, true);
+        context.setRecordContext(createRecordContext(startTime));
+        windowStore.put(0, "zero");
+        context.setRecordContext(createRecordContext(startTime + incr));
+        windowStore.put(1, "one");
+        context.setRecordContext(createRecordContext(startTime + incr * 2));
+        windowStore.put(2, "two");
+        context.setRecordContext(createRecordContext(startTime + incr * 3));
+        windowStore.put(3, "three");
+        context.setRecordContext(createRecordContext(startTime + incr * 4));
+        windowStore.put(4, "four");
+        context.setRecordContext(createRecordContext(startTime + incr * 5));
+        windowStore.put(5, "five");
+        context.setRecordContext(createRecordContext(startTime + incr * 6));
+        windowStore.put(6, "six");
+        context.setRecordContext(createRecordContext(startTime + incr * 7));
+        windowStore.put(7, "seven");
+        context.setRecordContext(createRecordContext(startTime + incr * 8));
+        windowStore.put(8, "eight");
+        windowStore.flush();
+
+        windowStore.close();
 
         // remove local store image
         Utils.delete(baseDir);
 
-        WindowStore<Integer, String> store2 = createWindowStore(context, false, true);
-        assertEquals(Utils.mkList(), toList(store2.fetch(0, startTime - windowSize, startTime + windowSize)));
-        assertEquals(Utils.mkList(), toList(store2.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-        assertEquals(Utils.mkList(), toList(store2.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-        assertEquals(Utils.mkList(), toList(store2.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-        assertEquals(Utils.mkList(), toList(store2.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-        assertEquals(Utils.mkList(), toList(store2.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-        assertEquals(Utils.mkList(), toList(store2.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-        assertEquals(Utils.mkList(), toList(store2.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
-        assertEquals(Utils.mkList(), toList(store2.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
-
-        try {
-            context.restore(windowName, changeLog);
-
-            assertEquals(Utils.mkList(), toList(store2.fetch(0, startTime - windowSize, startTime + windowSize)));
-            assertEquals(Utils.mkList(), toList(store2.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-            assertEquals(Utils.mkList(), toList(store2.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-            assertEquals(Utils.mkList(), toList(store2.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-            assertEquals(Utils.mkList("four"), toList(store2.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-            assertEquals(Utils.mkList("five"), toList(store2.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-            assertEquals(Utils.mkList("six"), toList(store2.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-            assertEquals(Utils.mkList("seven"), toList(store2.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
-            assertEquals(Utils.mkList("eight"), toList(store2.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
-
-            // check segment directories
-            store2.flush();
-            assertEquals(
-                    Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
-                    segmentDirs(baseDir)
-            );
-        } finally {
-            store2.close();
-        }
+        windowStore = createWindowStore(context, false, true);
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
+
+        context.restore(windowName, changeLog);
+
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+        assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
+
+        // check segment directories
+        windowStore.flush();
+        assertEquals(
+                Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
+                segmentDirs(baseDir)
+        );
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testSegmentMaintenance() throws IOException {
-        WindowStore<Integer, String> store = createWindowStore(context, false, true);
-        try {
-            context.setTime(0L);
-            context.setRecordContext(createRecordContext(0));
-            store.put(0, "v");
-            assertEquals(
-                    Utils.mkSet(segments.segmentName(0L)),
-                    segmentDirs(baseDir)
-            );
-
-            context.setRecordContext(createRecordContext(59999));
-            store.put(0, "v");
-            store.put(0, "v");
-            assertEquals(
-                    Utils.mkSet(segments.segmentName(0L)),
-                    segmentDirs(baseDir)
-            );
-
-            context.setRecordContext(createRecordContext(60000));
-            store.put(0, "v");
-            assertEquals(
-                    Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
-                    segmentDirs(baseDir)
-            );
-
-            WindowStoreIterator iter;
-            int fetchedCount;
-
-            iter = store.fetch(0, 0L, 240000L);
-            fetchedCount = 0;
-            while (iter.hasNext()) {
-                iter.next();
-                fetchedCount++;
-            }
-            assertEquals(4, fetchedCount);
+        windowStore = createWindowStore(context, false, true);
+        context.setTime(0L);
+        context.setRecordContext(createRecordContext(0));
+        windowStore.put(0, "v");
+        assertEquals(
+                Utils.mkSet(segments.segmentName(0L)),
+                segmentDirs(baseDir)
+        );
+
+        context.setRecordContext(createRecordContext(59999));
+        windowStore.put(0, "v");
+        windowStore.put(0, "v");
+        assertEquals(
+                Utils.mkSet(segments.segmentName(0L)),
+                segmentDirs(baseDir)
+        );
+
+        context.setRecordContext(createRecordContext(60000));
+        windowStore.put(0, "v");
+        assertEquals(
+                Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
+                segmentDirs(baseDir)
+        );
+
+        WindowStoreIterator iter;
+        int fetchedCount;
+
+        iter = windowStore.fetch(0, 0L, 240000L);
+        fetchedCount = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            fetchedCount++;
+        }
+        assertEquals(4, fetchedCount);
 
-            assertEquals(
-                    Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
-                    segmentDirs(baseDir)
-            );
+        assertEquals(
+                Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
+                segmentDirs(baseDir)
+        );
 
-            context.setRecordContext(createRecordContext(180000));
-            store.put(0, "v");
+        context.setRecordContext(createRecordContext(180000));
+        windowStore.put(0, "v");
 
-            iter = store.fetch(0, 0L, 240000L);
-            fetchedCount = 0;
-            while (iter.hasNext()) {
-                iter.next();
-                fetchedCount++;
-            }
-            assertEquals(2, fetchedCount);
+        iter = windowStore.fetch(0, 0L, 240000L);
+        fetchedCount = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            fetchedCount++;
+        }
+        assertEquals(2, fetchedCount);
 
-            assertEquals(
-                    Utils.mkSet(segments.segmentName(1L), segments.segmentName(3L)),
-                    segmentDirs(baseDir)
-            );
+        assertEquals(
+                Utils.mkSet(segments.segmentName(1L), segments.segmentName(3L)),
+                segmentDirs(baseDir)
+        );
 
-            context.setRecordContext(createRecordContext(300000));
-            store.put(0, "v");
+        context.setRecordContext(createRecordContext(300000));
+        windowStore.put(0, "v");
 
-            iter = store.fetch(0, 240000L, 1000000L);
-            fetchedCount = 0;
-            while (iter.hasNext()) {
-                iter.next();
-                fetchedCount++;
-            }
-            assertEquals(1, fetchedCount);
+        iter = windowStore.fetch(0, 240000L, 1000000L);
+        fetchedCount = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            fetchedCount++;
+        }
+        assertEquals(1, fetchedCount);
 
-            assertEquals(
-                    Utils.mkSet(segments.segmentName(3L), segments.segmentName(5L)),
-                    segmentDirs(baseDir)
-            );
+        assertEquals(
+                Utils.mkSet(segments.segmentName(3L), segments.segmentName(5L)),
+                segmentDirs(baseDir)
+        );
 
-        } finally {
-            store.close();
-        }
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testInitialLoading() throws IOException {
         File storeDir = new File(baseDir, windowName);
 
-        WindowStore<Integer, String> store = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context, false, true);
 
-        try {
-            new File(storeDir, segments.segmentName(0L)).mkdir();
-            new File(storeDir, segments.segmentName(1L)).mkdir();
-            new File(storeDir, segments.segmentName(2L)).mkdir();
-            new File(storeDir, segments.segmentName(3L)).mkdir();
-            new File(storeDir, segments.segmentName(4L)).mkdir();
-            new File(storeDir, segments.segmentName(5L)).mkdir();
-            new File(storeDir, segments.segmentName(6L)).mkdir();
-        } finally {
-            store.close();
-        }
+        new File(storeDir, segments.segmentName(0L)).mkdir();
+        new File(storeDir, segments.segmentName(1L)).mkdir();
+        new File(storeDir, segments.segmentName(2L)).mkdir();
+        new File(storeDir, segments.segmentName(3L)).mkdir();
+        new File(storeDir, segments.segmentName(4L)).mkdir();
+        new File(storeDir, segments.segmentName(5L)).mkdir();
+        new File(storeDir, segments.segmentName(6L)).mkdir();
+        windowStore.close();
 
-        store = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context, false, true);
 
-        try {
-            assertEquals(
-                    Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
-                    segmentDirs(baseDir)
-            );
+        assertEquals(
+                Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
+                segmentDirs(baseDir)
+        );
 
-            try (WindowStoreIterator iter = store.fetch(0, 0L, 1000000L)) {
-                while (iter.hasNext()) {
-                    iter.next();
-                }
+        try (WindowStoreIterator iter = windowStore.fetch(0, 0L, 1000000L)) {
+            while (iter.hasNext()) {
+                iter.next();
             }
-
-            assertEquals(
-                    Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
-                    segmentDirs(baseDir)
-            );
-
-        } finally {
-            store.close();
         }
+
+        assertEquals(
+                Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
+                segmentDirs(baseDir)
+        );
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() throws Exception {
-        final WindowStore<Integer, String> windowStore = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context, false, true);
         context.setRecordContext(createRecordContext(0));
         windowStore.put(1, "one", 1L);
         windowStore.put(1, "two", 2L);
@@ -639,6 +616,34 @@ public class RocksDBWindowStoreTest {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldFetchAndIterateOverExactKeys() throws Exception {
+        final RocksDBWindowStoreSupplier<String, String> supplier =
+                new RocksDBWindowStoreSupplier<>(
+                        "window",
+                        60 * 1000L * 2, 3,
+                        true,
+                        Serdes.String(),
+                        Serdes.String(),
+                        windowSize,
+                        true,
+                        Collections.<String, String>emptyMap(),
+                        false);
+
+        windowStore = supplier.get();
+        windowStore.init(context, windowStore);
+
+        windowStore.put("a", "0001", 0);
+        windowStore.put("aa", "0002", 0);
+        windowStore.put("a", "0003", 1);
+        windowStore.put("aa", "0004", 1);
+        windowStore.put("a", "0005", 60000);
+
+        final List expected = Utils.mkList("0001", "0003", "0005");
+        assertThat(toList(windowStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected));
+    }
+
     private void putFirstBatch(final WindowStore<Integer, String> store, final long startTime, final MockProcessorContext context) {
         context.setRecordContext(createRecordContext(startTime));
         store.put(0, "zero");