You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/01 17:27:20 UTC

[kafka] branch trunk updated: KAFKA-6560: Replace range query with newly added single point query in Windowed Aggregation (#4578)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eb449fe  KAFKA-6560: Replace range query with newly added single point query in Windowed Aggregation (#4578)
eb449fe is described below

commit eb449fe7c55a0816328a851fc1102dfeac6d8616
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Mar 1 09:27:11 2018 -0800

    KAFKA-6560: Replace range query with newly added single point query in Windowed Aggregation (#4578)
    
    * Add a new fetch(K key, long window-start-timestamp) API into ReadOnlyWindowStore.
    * Use the new API to replace the range fetch API in KStreamWindowedAggregate and KStreamWindowedReduce.
    * Added corresponding unit tests.
    * Also removed some redundant byte serdes in byte stores.
---
 .../kstream/internals/KStreamWindowAggregate.java  |  61 +--
 .../kstream/internals/KStreamWindowReduce.java     |  65 +---
 .../kafka/streams/state/ReadOnlyWindowStore.java   |  24 +-
 .../state/internals/CachingKeyValueStore.java      |   8 +-
 .../state/internals/CachingWindowStore.java        |  20 +-
 .../internals/ChangeLoggingWindowBytesStore.java   |  13 +-
 .../internals/CompositeReadOnlyWindowStore.java    |  23 +-
 .../state/internals/MeteredWindowStore.java        |  16 +-
 .../state/internals/RocksDBWindowStore.java        |  16 +-
 .../streams/state/internals/WindowKeySchema.java   |  10 +-
 .../kafka/streams/state/NoOpWindowStore.java       |   5 +
 .../state/internals/CachingWindowStoreTest.java    |   5 +
 .../CompositeReadOnlyWindowStoreTest.java          |  18 +-
 .../state/internals/ReadOnlyWindowStoreStub.java   |  11 +-
 .../state/internals/RocksDBWindowStoreTest.java    | 415 ++++++++++-----------
 15 files changed, 377 insertions(+), 333 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index ec26866..27f8408 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.Window;
@@ -39,7 +38,10 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
 
     private boolean sendOldValues = false;
 
-    public KStreamWindowAggregate(Windows<W> windows, String storeName, Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator) {
+    KStreamWindowAggregate(final Windows<W> windows,
+                           final String storeName,
+                           final Initializer<T> initializer,
+                           final Aggregator<? super K, ? super V, T> aggregator) {
         this.windows = windows;
         this.storeName = storeName;
         this.initializer = initializer;
@@ -63,7 +65,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
 
             windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
@@ -71,54 +73,27 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
         }
 
         @Override
-        public void process(K key, V value) {
+        public void process(final K key, final V value) {
             // if the key is null, we do not need proceed aggregating the record
             // the record with the table
             if (key == null)
                 return;
 
             // first get the matching windows
-            long timestamp = context().timestamp();
-            Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
+            final long timestamp = context().timestamp();
+            final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
 
-            long timeFrom = Long.MAX_VALUE;
-            long timeTo = Long.MIN_VALUE;
+            // try update the window, and create the new window for the rest of unmatched window that do not exist yet
+            for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
+                T oldAgg = windowStore.fetch(key, entry.getKey());
 
-            // use range query on window store for efficient reads
-            for (long windowStartMs : matchedWindows.keySet()) {
-                timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
-                timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
-            }
-
-            try (WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo)) {
-
-                // for each matching window, try to update the corresponding key
-                while (iter.hasNext()) {
-                    KeyValue<Long, T> entry = iter.next();
-                    W window = matchedWindows.get(entry.key);
-
-                    if (window != null) {
-
-                        T oldAgg = entry.value;
-
-                        if (oldAgg == null)
-                            oldAgg = initializer.apply();
-
-                        // try to add the new value (there will never be old value)
-                        T newAgg = aggregator.apply(key, value, oldAgg);
-
-                        // update the store with the new value
-                        windowStore.put(key, newAgg, window.start());
-                        tupleForwarder.maybeForward(new Windowed<>(key, window), newAgg, oldAgg);
-                        matchedWindows.remove(entry.key);
-                    }
+                if (oldAgg == null) {
+                    oldAgg = initializer.apply();
                 }
-            }
 
-            // create the new window for the rest of unmatched window that do not exist yet
-            for (Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
-                T oldAgg = initializer.apply();
-                T newAgg = aggregator.apply(key, value, oldAgg);
+                final T newAgg = aggregator.apply(key, value, oldAgg);
+
+                // update the store with the new value
                 windowStore.put(key, newAgg, entry.getKey());
                 tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg);
             }
@@ -147,13 +122,13 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
         }
 
         @SuppressWarnings("unchecked")
         @Override
-        public T get(Windowed<K> windowedKey) {
+        public T get(final Windowed<K> windowedKey) {
             K key = windowedKey.key();
             W window = (W) windowedKey.window();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 7d02f11..c3d95d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -37,7 +36,9 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
 
     private boolean sendOldValues = false;
 
-    public KStreamWindowReduce(Windows<W> windows, String storeName, Reducer<V> reducer) {
+    KStreamWindowReduce(final Windows<W> windows,
+                        final String storeName,
+                        final Reducer<V> reducer) {
         this.windows = windows;
         this.storeName = storeName;
         this.reducer = reducer;
@@ -60,63 +61,37 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues);
         }
 
         @Override
-        public void process(K key, V value) {
+        public void process(final K key, final V value) {
             // if the key is null, we do not need proceed aggregating
             // the record with the table
             if (key == null)
                 return;
 
             // first get the matching windows
-            long timestamp = context().timestamp();
+            final long timestamp = context().timestamp();
+            final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
 
-            Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
-
-            long timeFrom = Long.MAX_VALUE;
-            long timeTo = Long.MIN_VALUE;
-
-            // use range query on window store for efficient reads
-            for (long windowStartMs : matchedWindows.keySet()) {
-                timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
-                timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
-            }
+            // try update the window, and create the new window for the rest of unmatched window that do not exist yet
+            for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
+                final V oldAgg = windowStore.fetch(key, entry.getKey());
 
-            try (WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo)) {
-                // for each matching window, try to update the corresponding key and send to the downstream
-                while (iter.hasNext()) {
-                    KeyValue<Long, V> entry = iter.next();
-                    W window = matchedWindows.get(entry.key);
-
-                    if (window != null) {
-
-                        V oldAgg = entry.value;
-                        V newAgg = oldAgg;
-
-                        // try to add the new value (there will never be old value)
-                        if (newAgg == null) {
-                            newAgg = value;
-                        } else {
-                            newAgg = reducer.apply(newAgg, value);
-                        }
-
-                        // update the store with the new value
-                        windowStore.put(key, newAgg, window.start());
-                        tupleForwarder.maybeForward(new Windowed<>(key, window), newAgg, oldAgg);
-                        matchedWindows.remove(entry.key);
-                    }
+                V newAgg;
+                if (oldAgg == null) {
+                    newAgg = value;
+                } else {
+                    newAgg = reducer.apply(oldAgg, value);
                 }
-            }
 
-            // create the new window for the rest of unmatched window that do not exist yet
-            for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
-                windowStore.put(key, value, entry.getKey());
-                tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), value, null);
+                // update the store with the new value
+                windowStore.put(key, newAgg, entry.getKey());
+                tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg);
             }
         }
     }
@@ -143,13 +118,13 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
         }
 
         @SuppressWarnings("unchecked")
         @Override
-        public V get(Windowed<K> windowedKey) {
+        public V get(final Windowed<K> windowedKey) {
             K key = windowedKey.key();
             W window = (W) windowedKey.window();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index f92ab6e..dea759f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -29,6 +29,17 @@ import org.apache.kafka.streams.kstream.Windowed;
 public interface ReadOnlyWindowStore<K, V> {
 
     /**
+     * Get the value of key from a window.
+     *
+     * @param key       the key to fetch
+     * @param time      start timestamp (inclusive) of the window
+     * @return The value or {@code null} if no value is found in the window
+     * @throws InvalidStateStoreException if the store is not initialized
+     * @throws NullPointerException If {@code null} is used for any key.
+     */
+    V fetch(K key, long time);
+
+    /**
      * Get all the key-value pairs with the given key and the time range from all
      * the existing windows.
      *
@@ -56,9 +67,12 @@ public interface ReadOnlyWindowStore<K, V> {
      * For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest
      * available window to the newest/latest window.
      *
+     * @param key       the key to fetch
+     * @param timeFrom  time range start (inclusive)
+     * @param timeTo    time range end (inclusive)
      * @return an iterator over key-value pairs {@code <timestamp, value>}
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException If null is used for key.
+     * @throws NullPointerException If {@code null} is used for key.
      */
     WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
 
@@ -74,7 +88,7 @@ public interface ReadOnlyWindowStore<K, V> {
      * @param timeTo    time range end (inclusive)
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException If null is used for any key.
+     * @throws NullPointerException If {@code null} is used for any key.
      */
     KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
     
@@ -89,11 +103,11 @@ public interface ReadOnlyWindowStore<K, V> {
     /**
      * Gets all the key-value pairs that belong to the windows within in the given time range.
      *
-     * @param timeFrom the beginning of the time slot from which to search
-     * @param timeTo   the end of the time slot from which to search
+     * @param timeFrom the beginning of the time slot from which to search (inclusive)
+     * @param timeTo   the end of the time slot from which to search (inclusive)
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException if null is used for any key
+     * @throws NullPointerException if {@code null} is used for any key
      */
     KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 82a6ac7..45f606f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -175,13 +175,9 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
                 cache.put(cacheName, key, new LRUCacheEntry(rawValue));
             }
             return rawValue;
+        } else {
+            return entry.value;
         }
-
-        if (entry.value == null) {
-            return null;
-        }
-
-        return entry.value;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index ad0bd99..e3d0f62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -45,11 +45,12 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
 
     private String name;
     private ThreadCache cache;
-    private InternalProcessorContext context;
+    private boolean sendOldValues;
     private StateSerdes<K, V> serdes;
+    private InternalProcessorContext context;
     private StateSerdes<Bytes, byte[]> bytesSerdes;
     private CacheFlushListener<Windowed<K>, V> flushListener;
-    private boolean sendOldValues;
+
     private final SegmentedCacheFunction cacheFunction;
 
     CachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
@@ -150,13 +151,26 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
         // if store is open outside as well.
         validateStoreOpen();
         
-        final Bytes keyBytes = WindowStoreUtils.toBinaryKey(key, timestamp, 0, bytesSerdes);
+        final Bytes keyBytes = WindowStoreUtils.toBinaryKey(key.get(), timestamp, 0);
         final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(),
                                                       timestamp, context.partition(), context.topic());
         cache.put(name, cacheFunction.cacheKey(keyBytes), entry);
     }
 
     @Override
+    public byte[] fetch(final Bytes key, final long timestamp) {
+        validateStoreOpen();
+        final Bytes bytesKey = WindowStoreUtils.toBinaryKey(key.get(), timestamp, 0);
+        final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
+        final LRUCacheEntry entry = cache.get(name, cacheKey);
+        if (entry == null) {
+            return underlying.fetch(key, timestamp);
+        } else {
+            return entry.value;
+        }
+    }
+
+    @Override
     public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
         // since this function may not access the underlying inner store, we need to validate
         // if store is open outside as well.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 4fe4b99..e69a320 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -36,7 +36,6 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
     private final boolean retainDuplicates;
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
     private ProcessorContext context;
-    private StateSerdes<Bytes, byte[]> innerStateSerde;
     private int seqnum = 0;
 
     ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore,
@@ -47,6 +46,11 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
     }
 
     @Override
+    public byte[] fetch(final Bytes key, final long timestamp) {
+        return bytesStore.fetch(key, timestamp);
+    }
+
+    @Override
     public WindowStoreIterator<byte[]> fetch(final Bytes key, final long from, final long to) {
         return bytesStore.fetch(key, from, to);
     }
@@ -74,18 +78,19 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
     @Override
     public void put(final Bytes key, final byte[] value, final long timestamp) {
         bytesStore.put(key, value, timestamp);
-        changeLogger.logChange(WindowStoreUtils.toBinaryKey(key, timestamp, maybeUpdateSeqnumForDups(), innerStateSerde), value);
+        changeLogger.logChange(WindowStoreUtils.toBinaryKey(key.get(), timestamp, maybeUpdateSeqnumForDups()), value);
     }
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = context;
         bytesStore.init(context, root);
-        innerStateSerde = WindowStoreUtils.getInnerStateSerde(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()));
+
+        final StateSerdes<Bytes, byte[]> bytesSerde = WindowStoreUtils.getInnerStateSerde(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()));
         changeLogger = new StoreChangeLogger<>(
             name(),
             context,
-            innerStateSerde);
+            bytesSerde);
     }
 
     private int maybeUpdateSeqnumForDups() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index 6afc6fd..1b5d5e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -45,10 +45,29 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
     }
 
     @Override
+    public V fetch(final K key, final long time) {
+        Objects.requireNonNull(key, "key can't be null");
+        final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName, windowStoreType);
+        for (final ReadOnlyWindowStore<K, V> windowStore : stores) {
+            try {
+                final V result = windowStore.fetch(key, time);
+                if (result != null) {
+                    return result;
+                }
+            } catch (final InvalidStateStoreException e) {
+                throw new InvalidStateStoreException(
+                        "State store is not available anymore and may have been migrated to another instance; " +
+                                "please re-discover its location from the state metadata.");
+            }
+        }
+        return null;
+    }
+
+    @Override
     public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
         Objects.requireNonNull(key, "key can't be null");
         final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName, windowStoreType);
-        for (ReadOnlyWindowStore<K, V> windowStore : stores) {
+        for (final ReadOnlyWindowStore<K, V> windowStore : stores) {
             try {
                 final WindowStoreIterator<V> result = windowStore.fetch(key, timeFrom, timeTo);
                 if (!result.hasNext()) {
@@ -56,7 +75,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
                 } else {
                     return result;
                 }
-            } catch (InvalidStateStoreException e) {
+            } catch (final InvalidStateStoreException e) {
                 throw new InvalidStateStoreException(
                         "State store is not available anymore and may have been migrated to another instance; " +
                                 "please re-discover its location from the state metadata.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index e890005..15961e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -91,7 +91,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
 
     @Override
     public void put(final K key, final V value, final long timestamp) {
-        long startNs = time.nanoseconds();
+        final long startNs = time.nanoseconds();
         try {
             inner.put(keyBytes(key), serdes.rawValue(value), timestamp);
         } finally {
@@ -104,6 +104,20 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     }
 
     @Override
+    public V fetch(final K key, final long timestamp) {
+        final long startNs = time.nanoseconds();
+        V ret;
+        try {
+            final byte[] result = inner.fetch(keyBytes(key), timestamp);
+            ret = serdes.valueFrom(result);
+        } finally {
+            metrics.recordLatency(this.fetchTime, startNs, time.nanoseconds());
+        }
+
+        return ret;
+    }
+
+    @Override
     public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
         return new MeteredWindowStoreIterator<>(inner.fetch(keyBytes(key), timeFrom, timeTo),
                                                 fetchTime,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index f1a9c63..58c345a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -96,25 +96,31 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public void put(K key, V value) {
+    public void put(final K key, final V value) {
         put(key, value, context.timestamp());
     }
 
     @Override
-    public void put(K key, V value, long timestamp) {
+    public void put(final K key, final V value, final long timestamp) {
         maybeUpdateSeqnumForDups();
 
         bytesStore.put(WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes), serdes.rawValue(value));
     }
 
     @Override
-    public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
+    public V fetch(final K key, final long timestamp) {
+        final byte[] bytesValue = bytesStore.get(WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes));
+        return serdes.valueFrom(bytesValue);
+    }
+
+    @Override
+    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).valuesIterator();
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) {
+    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo);
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
     }
@@ -126,7 +132,7 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     }
     
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) {
+    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetchAll(timeFrom, timeTo);
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 739792f..e432baa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -16,10 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -29,11 +27,9 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
     private static final int SUFFIX_SIZE = WindowStoreUtils.TIMESTAMP_SIZE + WindowStoreUtils.SEQNUM_SIZE;
     private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];
 
-    private StateSerdes<Bytes, byte[]> serdes;
-
     @Override
     public void init(final String topic) {
-        serdes = new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray());
+        // nothing to do
     }
 
     @Override
@@ -53,12 +49,12 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
 
     @Override
     public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
-        return WindowStoreUtils.toBinaryKey(key, Math.max(0, from), 0, serdes);
+        return WindowStoreUtils.toBinaryKey(key.get(), Math.max(0, from), 0);
     }
 
     @Override
     public Bytes upperRangeFixedSize(final Bytes key, final long to) {
-        return WindowStoreUtils.toBinaryKey(key, to, Integer.MAX_VALUE, serdes);
+        return WindowStoreUtils.toBinaryKey(key.get(), to, Integer.MAX_VALUE);
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
index 1ded31f..05016bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
@@ -83,6 +83,11 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
     }
 
     @Override
+    public Object fetch(final Object key, final long time) {
+        return null;
+    }
+
+    @Override
     public WindowStoreIterator fetch(final Object key, final long timeFrom, final long timeTo) {
         return EMPTY_WINDOW_STORE_ITERATOR;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 239a007..5f934b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -96,6 +96,11 @@ public class CachingWindowStoreTest {
         cachingStore.put(bytesKey("a"), bytesValue("a"));
         cachingStore.put(bytesKey("b"), bytesValue("b"));
 
+        assertThat(cachingStore.fetch(bytesKey("a"), 10), equalTo(bytesValue("a")));
+        assertThat(cachingStore.fetch(bytesKey("b"), 10), equalTo(bytesValue("b")));
+        assertThat(cachingStore.fetch(bytesKey("c"), 10), equalTo(null));
+        assertThat(cachingStore.fetch(bytesKey("a"), 0), equalTo(null));
+
         final WindowStoreIterator<byte[]> a = cachingStore.fetch(bytesKey("a"), 10, 10);
         final WindowStoreIterator<byte[]> b = cachingStore.fetch(bytesKey("b"), 10, 10);
         verifyKeyValue(a.next(), DEFAULT_TIMESTAMP, "a");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 58fddaa..a241510 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -169,8 +169,7 @@ public class CompositeReadOnlyWindowStoreTest {
 
     @Test
     public void shouldFetchKeyRangeAcrossStores() {
-        final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new
-                ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+        final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
         stubProviderTwo.addStore(storeName, secondUnderlying);
         underlyingWindowStore.put("a", "a", 0L);
         secondUnderlying.put("b", "b", 10L);
@@ -179,7 +178,20 @@ public class CompositeReadOnlyWindowStoreTest {
                 KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
                 KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"))));
     }
-    
+
+    @Test
+    public void shouldFetchKeyValueAcrossStores() {
+        final ReadOnlyWindowStoreStub<String, String> secondUnderlyingWindowStore = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+        stubProviderTwo.addStore(storeName, secondUnderlyingWindowStore);
+        underlyingWindowStore.put("a", "a", 0L);
+        secondUnderlyingWindowStore.put("b", "b", 10L);
+        assertThat(windowStore.fetch("a", 0L), equalTo("a"));
+        assertThat(windowStore.fetch("b", 10L), equalTo("b"));
+        assertThat(windowStore.fetch("c", 10L), equalTo(null));
+        assertThat(windowStore.fetch("a", 10L), equalTo(null));
+    }
+
+
     @Test
     public void shouldGetAllAcrossStores() {
         final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index 256df33..6d911a3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -47,7 +47,16 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
     public ReadOnlyWindowStoreStub(long windowSize) {
         this.windowSize = windowSize;
     }
-    
+
+    @Override
+    public V fetch(final K key, final long time) {
+        final Map<K, V> kvMap = data.get(time);
+        if (kvMap != null) {
+            return kvMap.get(key);
+        } else {
+            return null;
+        }
+    }
 
     @Override
     public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 39c8f03..f757298 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.MockProcessorContext;
@@ -46,7 +47,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -66,7 +66,7 @@ public class RocksDBWindowStoreTest {
     private static final long DEFAULT_CACHE_SIZE_BYTES = 1024 * 1024L;
 
     private final int numSegments = 3;
-    private final static long WINDOW_SIZE = 3;
+    private final long windowSize = 3L;
     private final String windowName = "window";
     private final long segmentSize = Segments.MIN_SEGMENT_INTERVAL;
     private final long retentionPeriod = segmentSize * (numSegments - 1);
@@ -95,27 +95,37 @@ public class RocksDBWindowStoreTest {
 
     private final File baseDir = TestUtils.tempDirectory("test");
     private final MockProcessorContext context = new MockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
-    private WindowStore windowStore;
+    private WindowStore<Integer, String> windowStore;
+
+    private WindowStore<Integer, String> createWindowStore(final ProcessorContext context, final boolean retainDuplicates) {
+        final WindowStore<Integer, String> store = Stores.windowStoreBuilder(
+                Stores.persistentWindowStore(windowName,
+                        retentionPeriod,
+                        numSegments,
+                        windowSize,
+                        retainDuplicates),
+                Serdes.Integer(),
+                Serdes.String()).build();
 
-    @SuppressWarnings("unchecked")
-    private <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, final boolean enableCaching, final boolean retainDuplicates) {
-        final RocksDBWindowStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, retainDuplicates, Serdes.Integer(), Serdes.String(),
-                                                                                     WINDOW_SIZE, true, Collections.<String, String>emptyMap(), enableCaching);
-        final WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
         store.init(context, store);
         return store;
     }
 
+    private WindowStore<Integer, String> createWindowStore(final ProcessorContext context) {
+        return createWindowStore(context, false);
+    }
+
     @After
     public void closeStore() {
         context.close();
-        windowStore.close();
+        if (windowStore != null) {
+            windowStore.close();
+        }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldOnlyIterateOpenSegments() {
-        windowStore = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context);
         long currentTime = 0;
         context.setRecordContext(createRecordContext(currentTime));
         windowStore.put(1, "one");
@@ -145,38 +155,50 @@ public class RocksDBWindowStoreTest {
         return new ProcessorRecordContext(time, 0, 0, "topic");
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void testPutAndFetch() throws IOException {
-        windowStore = createWindowStore(context, false, true);
+    public void testRangeAndSinglePointFetch() {
+        windowStore = createWindowStore(context);
         long startTime = segmentSize - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
-        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - WINDOW_SIZE, startTime + 1L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - WINDOW_SIZE, startTime + 3L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - WINDOW_SIZE, startTime + 4L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - WINDOW_SIZE, startTime + 5L + WINDOW_SIZE)));
+        assertEquals("zero", windowStore.fetch(0, startTime));
+        assertEquals("one", windowStore.fetch(1, startTime + 1L));
+        assertEquals("two", windowStore.fetch(2, startTime + 2L));
+        assertEquals("four", windowStore.fetch(4, startTime + 4L));
+        assertEquals("five", windowStore.fetch(5, startTime + 5L));
+
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize)));
+        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize)));
 
         putSecondBatch(windowStore, startTime, context);
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L - WINDOW_SIZE, startTime - 2L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L - WINDOW_SIZE, startTime - 1L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L - WINDOW_SIZE, startTime + 1L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L - WINDOW_SIZE, startTime + 3L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L - WINDOW_SIZE, startTime + 4L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L - WINDOW_SIZE, startTime + 5L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L - WINDOW_SIZE, startTime + 6L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L - WINDOW_SIZE, startTime + 7L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - WINDOW_SIZE, startTime + 8L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - WINDOW_SIZE, startTime + 9L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - WINDOW_SIZE, startTime + 10L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - WINDOW_SIZE, startTime + 11L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - WINDOW_SIZE, startTime + 12L + WINDOW_SIZE)));
+        assertEquals("two+1", windowStore.fetch(2, startTime + 3L));
+        assertEquals("two+2", windowStore.fetch(2, startTime + 4L));
+        assertEquals("two+3", windowStore.fetch(2, startTime + 5L));
+        assertEquals("two+4", windowStore.fetch(2, startTime + 6L));
+        assertEquals("two+5", windowStore.fetch(2, startTime + 7L));
+        assertEquals("two+6", windowStore.fetch(2, startTime + 8L));
+
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
+        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
+        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize)));
+        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize)));
+        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize)));
+        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize)));
+        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize)));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -192,10 +214,9 @@ public class RocksDBWindowStoreTest {
         assertNull(entriesByKey.get(6));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldGetAll() throws IOException {
-        windowStore = createWindowStore(context, false, true);
+    public void shouldGetAll() {
+        windowStore = createWindowStore(context);
         long startTime = segmentSize - 4L;
 
         putFirstBatch(windowStore, startTime, context);
@@ -212,10 +233,9 @@ public class RocksDBWindowStoreTest {
         );
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldFetchAllInTimeRange() throws IOException {
-        windowStore = createWindowStore(context, false, true);
+    public void shouldFetchAllInTimeRange() {
+        windowStore = createWindowStore(context);
         long startTime = segmentSize - 4L;
 
         putFirstBatch(windowStore, startTime, context);
@@ -242,10 +262,9 @@ public class RocksDBWindowStoreTest {
         );
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void testFetchRange() throws IOException {
-        windowStore = createWindowStore(context, false, true);
+    public void testFetchRange() {
+        windowStore = createWindowStore(context);
         long startTime = segmentSize - 4L;
 
         putFirstBatch(windowStore, startTime, context);
@@ -258,71 +277,70 @@ public class RocksDBWindowStoreTest {
 
         assertEquals(
             Utils.mkList(zero, one),
-            StreamsTestUtils.toList(windowStore.fetch(0, 1, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE))
+            StreamsTestUtils.toList(windowStore.fetch(0, 1, startTime + 0L - windowSize, startTime + 0L + windowSize))
         );
         assertEquals(
             Utils.mkList(one),
-            StreamsTestUtils.toList(windowStore.fetch(1, 1, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE))
+            StreamsTestUtils.toList(windowStore.fetch(1, 1, startTime + 0L - windowSize, startTime + 0L + windowSize))
         );
         assertEquals(
             Utils.mkList(one, two),
-            StreamsTestUtils.toList(windowStore.fetch(1, 3, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE))
+            StreamsTestUtils.toList(windowStore.fetch(1, 3, startTime + 0L - windowSize, startTime + 0L + windowSize))
         );
         assertEquals(
             Utils.mkList(zero, one, two),
-            StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE))
+            StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 0L - windowSize, startTime + 0L + windowSize))
         );
         assertEquals(
             Utils.mkList(zero, one, two,
                          four, five),
-            StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE + 5L))
+            StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 0L - windowSize, startTime + 0L + windowSize + 5L))
         );
         assertEquals(
             Utils.mkList(two, four, five),
-            StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 2L, startTime + 0L + WINDOW_SIZE + 5L))
+            StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 2L, startTime + 0L + windowSize + 5L))
         );
         assertEquals(
             Utils.mkList(),
-            StreamsTestUtils.toList(windowStore.fetch(4, 5, startTime + 2L, startTime + WINDOW_SIZE))
+            StreamsTestUtils.toList(windowStore.fetch(4, 5, startTime + 2L, startTime + windowSize))
         );
         assertEquals(
             Utils.mkList(),
-            StreamsTestUtils.toList(windowStore.fetch(0, 3, startTime + 3L, startTime + WINDOW_SIZE + 5))
+            StreamsTestUtils.toList(windowStore.fetch(0, 3, startTime + 3L, startTime + windowSize + 5))
         );
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void testPutAndFetchBefore() throws IOException {
-        windowStore = createWindowStore(context, false, true);
+    public void testPutAndFetchBefore() {
+        windowStore = createWindowStore(context);
         long startTime = segmentSize - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
-        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - WINDOW_SIZE, startTime + 0L)));
-        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - WINDOW_SIZE, startTime + 1L)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - WINDOW_SIZE, startTime + 3L)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - WINDOW_SIZE, startTime + 4L)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - WINDOW_SIZE, startTime + 5L)));
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L)));
+        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L)));
 
         putSecondBatch(windowStore, startTime, context);
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 1L - WINDOW_SIZE, startTime - 1L)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 0L - WINDOW_SIZE, startTime + 0L)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 1L - WINDOW_SIZE, startTime + 1L)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L)));
-        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime + 3L - WINDOW_SIZE, startTime + 3L)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 4L - WINDOW_SIZE, startTime + 4L)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 5L - WINDOW_SIZE, startTime + 5L)));
-        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 6L - WINDOW_SIZE, startTime + 6L)));
-        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 7L - WINDOW_SIZE, startTime + 7L)));
-        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - WINDOW_SIZE, startTime + 8L)));
-        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - WINDOW_SIZE, startTime + 9L)));
-        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - WINDOW_SIZE, startTime + 10L)));
-        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - WINDOW_SIZE, startTime + 11L)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - WINDOW_SIZE, startTime + 12L)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 13L - WINDOW_SIZE, startTime + 13L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 0L - windowSize, startTime + 0L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
+        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
+        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
+        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
+        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
+        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
+        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 13L - windowSize, startTime + 13L)));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -338,38 +356,37 @@ public class RocksDBWindowStoreTest {
         assertNull(entriesByKey.get(6));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void testPutAndFetchAfter() throws IOException {
-        windowStore = createWindowStore(context, false, true);
+    public void testPutAndFetchAfter() {
+        windowStore = createWindowStore(context);
         long startTime = segmentSize - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
-        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L, startTime + 0L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L, startTime + 1L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L, startTime + 3L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L, startTime + 4L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L, startTime + 5L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L, startTime + 0L + windowSize)));
+        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L, startTime + 1L + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L, startTime + 3L + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L, startTime + 4L + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L, startTime + 5L + windowSize)));
 
         putSecondBatch(windowStore, startTime, context);
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L, startTime - 2L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L, startTime - 1L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime, startTime + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L, startTime + 1L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L, startTime + 3L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L, startTime + 4L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L, startTime + 5L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L, startTime + 6L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L, startTime + 7L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 8L, startTime + 8L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 9L, startTime + 9L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 10L, startTime + 10L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 11L, startTime + 11L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L, startTime + 12L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L, startTime - 2L + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L, startTime - 1L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime, startTime + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
+        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
+        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
+        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
+        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
+        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 8L, startTime + 8L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 9L, startTime + 9L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 10L, startTime + 10L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 11L, startTime + 11L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L, startTime + 12L + windowSize)));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -385,26 +402,25 @@ public class RocksDBWindowStoreTest {
         assertNull(entriesByKey.get(6));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void testPutSameKeyTimestamp() throws IOException {
-        windowStore = createWindowStore(context, false, true);
+    public void testPutSameKeyTimestamp() {
+        windowStore = createWindowStore(context, true);
         long startTime = segmentSize - 4L;
 
         context.setRecordContext(createRecordContext(startTime));
         windowStore.put(0, "zero");
 
-        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
 
         windowStore.put(0, "zero");
         windowStore.put(0, "zero+");
         windowStore.put(0, "zero++");
 
-        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 1L - WINDOW_SIZE, startTime + 1L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 2L - WINDOW_SIZE, startTime + 2L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 3L - WINDOW_SIZE, startTime + 3L + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime + 4L - WINDOW_SIZE, startTime + 4L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize)));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -414,11 +430,9 @@ public class RocksDBWindowStoreTest {
         assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
     }
 
-
-    @SuppressWarnings("unchecked")
     @Test
-    public void testRolling() throws IOException {
-        windowStore = createWindowStore(context, false, true);
+    public void testRolling() {
+        windowStore = createWindowStore(context);
 
         // to validate segments
         final Segments segments = new Segments(windowName, retentionPeriod, numSegments);
@@ -450,12 +464,12 @@ public class RocksDBWindowStoreTest {
                                  segments.segmentName(3),
                                  segments.segmentName(4)), segmentDirs(baseDir));
 
-        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
 
         context.setRecordContext(createRecordContext(startTime + incr * 6));
         windowStore.put(6, "six");
@@ -464,13 +478,13 @@ public class RocksDBWindowStoreTest {
                                  segments.segmentName(5)), segmentDirs(baseDir));
 
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
 
 
         context.setRecordContext(createRecordContext(startTime + incr * 7));
@@ -479,14 +493,14 @@ public class RocksDBWindowStoreTest {
                                  segments.segmentName(4),
                                  segments.segmentName(5)), segmentDirs(baseDir));
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
 
         context.setRecordContext(createRecordContext(startTime + incr * 8));
         windowStore.put(8, "eight");
@@ -495,15 +509,15 @@ public class RocksDBWindowStoreTest {
                                  segments.segmentName(6)), segmentDirs(baseDir));
 
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - WINDOW_SIZE, startTime + incr * 8 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+        assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
 
         // check segment directories
         windowStore.flush();
@@ -514,14 +528,12 @@ public class RocksDBWindowStoreTest {
 
     }
 
-
-    @SuppressWarnings("unchecked")
     @Test
     public void testRestore() throws IOException {
         long startTime = segmentSize * 2;
         long incr = segmentSize / 2;
 
-        windowStore = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context);
         context.setRecordContext(createRecordContext(startTime));
         windowStore.put(0, "zero");
         context.setRecordContext(createRecordContext(startTime + incr));
@@ -547,28 +559,28 @@ public class RocksDBWindowStoreTest {
         // remove local store image
         Utils.delete(baseDir);
 
-        windowStore = createWindowStore(context, false, true);
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(8, startTime + incr * 8 - WINDOW_SIZE, startTime + incr * 8 + WINDOW_SIZE)));
+        windowStore = createWindowStore(context);
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
 
         context.restore(windowName, changeLog);
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE)));
-        assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - WINDOW_SIZE, startTime + incr * 8 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+        assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
 
         // check segment directories
         windowStore.flush();
@@ -578,10 +590,9 @@ public class RocksDBWindowStoreTest {
         );
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void testSegmentMaintenance() throws IOException {
-        windowStore = createWindowStore(context, false, true);
+    public void testSegmentMaintenance() {
+        windowStore = createWindowStore(context, true);
         context.setTime(0L);
         context.setRecordContext(createRecordContext(0));
         windowStore.put(0, "v");
@@ -655,12 +666,11 @@ public class RocksDBWindowStoreTest {
 
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void testInitialLoading() throws IOException {
+    public void testInitialLoading() {
         File storeDir = new File(baseDir, windowName);
 
-        windowStore = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context);
 
         new File(storeDir, segments.segmentName(0L)).mkdir();
         new File(storeDir, segments.segmentName(1L)).mkdir();
@@ -671,7 +681,7 @@ public class RocksDBWindowStoreTest {
         new File(storeDir, segments.segmentName(6L)).mkdir();
         windowStore.close();
 
-        windowStore = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context);
 
         assertEquals(
                 Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
@@ -690,10 +700,9 @@ public class RocksDBWindowStoreTest {
         );
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() {
-        windowStore = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context);
         context.setRecordContext(createRecordContext(0));
         windowStore.put(1, "one", 1L);
         windowStore.put(1, "two", 2L);
@@ -717,24 +726,19 @@ public class RocksDBWindowStoreTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldFetchAndIterateOverExactKeys() {
         final long windowSize = 0x7a00000000000000L;
         final long retentionPeriod = 0x7a00000000000000L;
-        final RocksDBWindowStoreSupplier<String, String> supplier =
-                new RocksDBWindowStoreSupplier<>(
-                    "window",
-                    retentionPeriod, 2,
-                    true,
-                    Serdes.String(),
-                    Serdes.String(),
-                    windowSize,
-                    true,
-                    Collections.<String, String>emptyMap(),
-                    false);
-
-        windowStore = supplier.get();
+        final WindowStore<String, String> windowStore = Stores.windowStoreBuilder(
+                Stores.persistentWindowStore(windowName,
+                        retentionPeriod,
+                        2,
+                        windowSize,
+                        true),
+                Serdes.String(),
+                Serdes.String()).build();
+
         windowStore.init(context, windowStore);
 
         windowStore.put("a", "0001", 0);
@@ -763,50 +767,45 @@ public class RocksDBWindowStoreTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerExceptionOnPutNullKey() {
-        windowStore = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context);
         windowStore.put(null, "anyValue");
     }
 
     @Test
     public void shouldNotThrowNullPointerExceptionOnPutNullValue() {
-        windowStore = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context);
         windowStore.put(1, null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerExceptionOnGetNullKey() {
-        windowStore = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context);
         windowStore.fetch(null, 1L, 2L);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
-        windowStore = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context);
         windowStore.fetch(null, 2, 1L, 2L);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
-        windowStore = createWindowStore(context, false, true);
+        windowStore = createWindowStore(context);
         windowStore.fetch(1, null, 1L, 2L);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldFetchAndIterateOverExactBinaryKeys() {
-        final RocksDBWindowStoreSupplier<Bytes, String> supplier =
-                new RocksDBWindowStoreSupplier<>(
-                        "window",
-                        60000, 2,
-                        true,
-                        Serdes.Bytes(),
-                        Serdes.String(),
+        final WindowStore<Bytes, String> windowStore = Stores.windowStoreBuilder(
+                Stores.persistentWindowStore(windowName,
+                        60000,
+                        2,
                         60000,
-                        true,
-                        Collections.<String, String>emptyMap(),
-                        false);
+                        true),
+                Serdes.Bytes(),
+                Serdes.String()).build();
 
-        windowStore = supplier.get();
         windowStore.init(context, windowStore);
 
         final Bytes key1 = Bytes.wrap(new byte[]{0});
@@ -891,11 +890,11 @@ public class RocksDBWindowStoreTest {
         return entriesByKey;
     }
 
-    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) {
-        return windowedPair(key, value, timestamp, WINDOW_SIZE);
+    private <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) {
+        return windowedPair(key, value, timestamp, windowSize);
     }
 
-    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp, long windowSize) {
+    private <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp, long windowSize) {
         return KeyValue.pair(new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)), value);
     }
 }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.