You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/05 01:47:21 UTC

[1/2] kafka git commit: KAFKA-3016: phase-1. A local store for join window

Repository: kafka
Updated Branches:
  refs/heads/trunk 57df460f8 -> b0b3e5aeb


http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 747b4f1..a6a29cd 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -26,10 +26,12 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.Entry;
 
 import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
@@ -40,20 +42,19 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     private final Deserializer keyDeserializer;
     private final Deserializer valueDeserializer;
     private final RecordCollector.Supplier recordCollectorSupplier;
+    private final File stateDir;
 
     private Map<String, StateStore> storeMap = new HashMap<>();
+    private Map<String, StateRestoreCallback> restoreFuncs = new HashMap<>();
 
     long timestamp = -1L;
 
-    public MockProcessorContext(KStreamTestDriver driver, Serializer<?> serializer, Deserializer<?> deserializer) {
-        this(driver, serializer, deserializer, serializer, deserializer, (RecordCollector.Supplier) null);
-    }
-
-    public MockProcessorContext(KStreamTestDriver driver, Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
-            Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
-            final RecordCollector collector) {
-        this(driver, keySerializer, keyDeserializer, valueSerializer, valueDeserializer,
-                collector == null ? null : new RecordCollector.Supplier() {
+    public MockProcessorContext(KStreamTestDriver driver, File stateDir,
+                                Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
+                                Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
+                                final RecordCollector collector) {
+        this(driver, stateDir, keySerializer, keyDeserializer, valueSerializer, valueDeserializer,
+                new RecordCollector.Supplier() {
                     @Override
                     public RecordCollector recordCollector() {
                         return collector;
@@ -61,10 +62,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
                 });
     }
 
-    public MockProcessorContext(KStreamTestDriver driver, Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
-            Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
-            RecordCollector.Supplier collectorSupplier) {
+    public MockProcessorContext(KStreamTestDriver driver, File stateDir,
+                                Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
+                                Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
+                                RecordCollector.Supplier collectorSupplier) {
         this.driver = driver;
+        this.stateDir = stateDir;
         this.keySerializer = keySerializer;
         this.valueSerializer = valueSerializer;
         this.keyDeserializer = keyDeserializer;
@@ -74,10 +77,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     @Override
     public RecordCollector recordCollector() {
-        if (recordCollectorSupplier == null) {
+        RecordCollector recordCollector = recordCollectorSupplier.recordCollector();
+
+        if (recordCollector == null) {
             throw new UnsupportedOperationException("No RecordCollector specified");
         }
-        return recordCollectorSupplier.recordCollector();
+        return recordCollector;
     }
 
     public void setTime(long timestamp) {
@@ -111,7 +116,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     @Override
     public File stateDir() {
-        return driver.stateDir;
+        if (stateDir == null)
+            throw new UnsupportedOperationException("State directory not specified");
+
+        return stateDir;
     }
 
     @Override
@@ -130,6 +138,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     @Override
     public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func) {
         storeMap.put(store.name(), store);
+        restoreFuncs.put(store.name(), func);
     }
 
     @Override
@@ -182,4 +191,11 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     public Map<String, StateStore> allStateStores() {
         return Collections.unmodifiableMap(storeMap);
     }
+
+    public void restore(String storeName, List<Entry<byte[], byte[]>> changeLog) {
+        StateRestoreCallback restoreCallback = restoreFuncs.get(storeName);
+        for (Entry<byte[], byte[]> entry : changeLog) {
+            restoreCallback.restore(entry.key(), entry.value());
+        }
+    }
 }


[2/2] kafka git commit: KAFKA-3016: phase-1. A local store for join window

Posted by gu...@apache.org.
KAFKA-3016: phase-1. A local store for join window

guozhangwang
An implementation of local store for join window. This implementation uses "rolling" of RocksDB instances for timestamp based truncation.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #726 from ymatsuda/windowed_join


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b0b3e5ae
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b0b3e5ae
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b0b3e5ae

Branch: refs/heads/trunk
Commit: b0b3e5aebf381faf81bd9454ef7b448e2ad922c7
Parents: 57df460
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Jan 4 16:47:17 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Jan 4 16:47:17 2016 -0800

----------------------------------------------------------------------
 .../state/KeyValueStoreChangeLogger.java        |  87 ---
 .../streams/state/MeteredKeyValueStore.java     |  18 +-
 .../kafka/streams/state/MeteredWindowStore.java | 200 ++++++
 .../kafka/streams/state/RocksDBStore.java       |  22 +-
 .../kafka/streams/state/RocksDBWindowStore.java | 283 ++++++++
 .../state/RocksDBWindowStoreSupplier.java       |  60 ++
 .../kafka/streams/state/StoreChangeLogger.java  |  91 +++
 .../apache/kafka/streams/state/WindowStore.java |  36 +
 .../streams/state/WindowStoreIterator.java      |  26 +
 .../kafka/streams/state/WindowStoreUtil.java    |  55 ++
 .../streams/state/KeyValueStoreTestDriver.java  |  22 +-
 .../streams/state/RocksDBWindowStoreTest.java   | 672 +++++++++++++++++++
 .../apache/kafka/streams/state/StateUtils.java  |  28 +-
 .../apache/kafka/test/KStreamTestDriver.java    |   2 +-
 .../apache/kafka/test/MockProcessorContext.java |  46 +-
 15 files changed, 1498 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
deleted file mode 100644
index 2ad1f47..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class KeyValueStoreChangeLogger<K, V> {
-
-    protected final Serdes<K, V> serialization;
-
-    private final Set<K> dirty;
-    private final Set<K> removed;
-    private final int maxDirty;
-    private final int maxRemoved;
-
-    private final String topic;
-    private int partition;
-    private ProcessorContext context;
-
-    // always wrap the logged store with the metered store
-    public KeyValueStoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
-        this.topic = topic;
-        this.serialization = serialization;
-        this.context = context;
-        this.partition = context.id().partition;
-
-        this.dirty = new HashSet<>();
-        this.removed = new HashSet<>();
-        this.maxDirty = 100; // TODO: this needs to be configurable
-        this.maxRemoved = 100; // TODO: this needs to be configurable
-    }
-
-    public void add(K key) {
-        this.dirty.add(key);
-        this.removed.remove(key);
-    }
-
-    public void delete(K key) {
-        this.dirty.remove(key);
-        this.removed.add(key);
-    }
-
-    public void maybeLogChange(KeyValueStore<K, V> kv) {
-        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
-            logChange(kv);
-    }
-
-    public void logChange(KeyValueStore<K, V> kv) {
-        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
-        if (collector != null) {
-            Serializer<K> keySerializer = serialization.keySerializer();
-            Serializer<V> valueSerializer = serialization.valueSerializer();
-
-            for (K k : this.removed) {
-                collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
-            }
-            for (K k : this.dirty) {
-                V v = kv.get(k);
-                collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
-            }
-            this.removed.clear();
-            this.dirty.clear();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index 16f57a0..743a110 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -30,6 +30,7 @@ import java.util.List;
 public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     protected final KeyValueStore<K, V> inner;
+    protected final StoreChangeLogger.ValueGetter getter;
     protected final Serdes<K, V> serialization;
     protected final String metricScope;
     protected final Time time;
@@ -45,11 +46,16 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private StreamingMetrics metrics;
 
     private boolean loggingEnabled = true;
-    private KeyValueStoreChangeLogger<K, V> changeLogger = null;
+    private StoreChangeLogger<K, V> changeLogger = null;
 
     // always wrap the store with the metered store
     public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) {
         this.inner = inner;
+        this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
+            public V get(K key) {
+                return inner.get(key);
+            }
+        };
         this.serialization = serialization;
         this.metricScope = metricScope;
         this.time = time != null ? time : new SystemTime();
@@ -79,7 +85,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
 
         serialization.init(context);
-        this.changeLogger = this.loggingEnabled ? new KeyValueStoreChangeLogger<>(name, context, serialization) : null;
+        this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null;
 
         // register and possibly restore the state from the logs
         long startNs = time.nanoseconds();
@@ -123,7 +129,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
             if (loggingEnabled) {
                 changeLogger.add(key);
-                changeLogger.maybeLogChange(this.inner);
+                changeLogger.maybeLogChange(this.getter);
             }
         } finally {
             this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
@@ -141,7 +147,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
                     K key = entry.key();
                     changeLogger.add(key);
                 }
-                changeLogger.maybeLogChange(this.inner);
+                changeLogger.maybeLogChange(this.getter);
             }
         } finally {
             this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
@@ -171,7 +177,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     protected void removed(K key) {
         if (loggingEnabled) {
             changeLogger.delete(key);
-            changeLogger.maybeLogChange(this.inner);
+            changeLogger.maybeLogChange(this.getter);
         }
     }
 
@@ -197,7 +203,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
             this.inner.flush();
 
             if (loggingEnabled)
-                changeLogger.logChange(this.inner);
+                changeLogger.logChange(this.getter);
         } finally {
             this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
new file mode 100644
index 0000000..d4ed0e7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+
+public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
+
+    protected final WindowStore<K, V> inner;
+    protected final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
+    protected final String metricScope;
+    protected final Time time;
+
+    private Sensor putTime;
+    private Sensor getTime;
+    private Sensor rangeTime;
+    private Sensor flushTime;
+    private Sensor restoreTime;
+    private StreamingMetrics metrics;
+
+    private boolean loggingEnabled = true;
+    private StoreChangeLogger<byte[], byte[]> changeLogger = null;
+
+    // always wrap the store with the metered store
+    public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) {
+        this.inner = inner;
+        this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
+            public byte[] get(byte[] key) {
+                return inner.getInternal(key);
+            }
+        };
+        this.metricScope = metricScope;
+        this.time = time != null ? time : new SystemTime();
+    }
+
+    public MeteredWindowStore<K, V> disableLogging() {
+        loggingEnabled = false;
+        return this;
+    }
+
+    @Override
+    public String name() {
+        return inner.name();
+    }
+
+    @Override
+    public void init(ProcessorContext context) {
+        final String name = name();
+        this.metrics = context.metrics();
+        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
+        this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
+        this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
+        this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
+        this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
+
+        this.changeLogger = this.loggingEnabled ?
+                new StoreChangeLogger<>(name, context, Serdes.withBuiltinTypes("", byte[].class, byte[].class)) : null;
+
+        // register and possibly restore the state from the logs
+        long startNs = time.nanoseconds();
+        inner.init(context);
+        try {
+            context.register(this, loggingEnabled, new StateRestoreCallback() {
+                @Override
+                public void restore(byte[] key, byte[] value) {
+                    inner.putInternal(key, value);
+                }
+            });
+        } finally {
+            this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return inner.persistent();
+    }
+
+    @Override
+    public WindowStoreIterator<V> fetch(K key, long timestamp) {
+        return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timestamp), this.rangeTime);
+    }
+
+    @Override
+    public void put(K key, V value) {
+        putAndReturnInternalKey(key, value);
+    }
+
+    @Override
+    public byte[] putAndReturnInternalKey(K key, V value) {
+        long startNs = time.nanoseconds();
+        try {
+            byte[] binKey = this.inner.putAndReturnInternalKey(key, value);
+
+            if (loggingEnabled) {
+                changeLogger.add(binKey);
+                changeLogger.maybeLogChange(this.getter);
+            }
+
+            return binKey;
+        } finally {
+            this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public void putInternal(byte[] binaryKey, byte[] binaryValue) {
+        inner.putInternal(binaryKey, binaryValue);
+    }
+
+    @Override
+    public byte[] getInternal(byte[] binaryKey) {
+        long startNs = time.nanoseconds();
+        try {
+            return this.inner.getInternal(binaryKey);
+        } finally {
+            this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+
+    @Override
+    public void flush() {
+        long startNs = time.nanoseconds();
+        try {
+            this.inner.flush();
+
+            if (loggingEnabled)
+                changeLogger.logChange(this.getter);
+        } finally {
+            this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
+        }
+    }
+
+    private class MeteredWindowStoreIterator<E> implements WindowStoreIterator<E> {
+
+        private final WindowStoreIterator<E> iter;
+        private final Sensor sensor;
+        private final long startNs;
+
+        public MeteredWindowStoreIterator(WindowStoreIterator<E> iter, Sensor sensor) {
+            this.iter = iter;
+            this.sensor = sensor;
+            this.startNs = time.nanoseconds();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public E next() {
+            return iter.next();
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+
+        @Override
+        public void close() {
+            try {
+                iter.close();
+            } finally {
+                metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
+            }
+        }
+
+    }
+
+    WindowStore<K, V> inner() {
+        return inner;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
index 029d72f..a32faf4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
@@ -48,7 +48,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private static final int MAX_WRITE_BUFFERS = 3;
     private static final String DB_FILE_DIR = "rocksdb";
 
-    private final String topic;
+    private final String name;
 
     private final Options options;
     private final WriteOptions wOptions;
@@ -56,11 +56,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     private Serdes<K, V> serdes;
     private ProcessorContext context;
-    private String dirName;
+    protected File dbDir;
     private RocksDB db;
 
     public RocksDBStore(String name, Serdes<K, V> serdes) {
-        this.topic = name;
+        this.name = name;
         this.serdes = serdes;
 
         // initialize the rocksdb options
@@ -88,8 +88,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         serdes.init(context);
 
         this.context = context;
-        this.dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
-        this.db = openDB(new File(this.dirName, this.topic), this.options, TTL_SECONDS);
+        this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
+        this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
     }
 
     private RocksDB openDB(File dir, Options options, int ttl) {
@@ -98,19 +98,19 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
                 dir.getParentFile().mkdirs();
                 return RocksDB.open(options, dir.toString());
             } else {
-                throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
+                throw new KafkaException("Change log is not supported for store " + this.name + " since it is TTL based.");
                 // TODO: support TTL with change log?
                 // return TtlDB.open(options, dir.toString(), ttl, false);
             }
         } catch (RocksDBException e) {
             // TODO: this needs to be handled more accurately
-            throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
+            throw new KafkaException("Error opening store " + this.name + " at location " + dir.toString(), e);
         }
     }
 
     @Override
     public String name() {
-        return this.topic;
+        return this.name;
     }
 
     @Override
@@ -124,7 +124,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
         } catch (RocksDBException e) {
             // TODO: this needs to be handled more accurately
-            throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
+            throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.name, e);
         }
     }
 
@@ -138,7 +138,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             }
         } catch (RocksDBException e) {
             // TODO: this needs to be handled more accurately
-            throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
+            throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.name, e);
         }
     }
 
@@ -173,7 +173,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             db.flush(fOptions);
         } catch (RocksDBException e) {
             // TODO: this needs to be handled more accurately
-            throw new KafkaException("Error while executing flush from store " + this.topic, e);
+            throw new KafkaException("Error while executing flush from store " + this.name, e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
new file mode 100644
index 0000000..5189318
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.SimpleTimeZone;
+
+public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
+
+    public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute
+
+    private static class Segment extends RocksDBStore<byte[], byte[]> {
+        public final long id;
+
+        Segment(String name, long id) {
+            super(name, WindowStoreUtil.INNER_SERDES);
+            this.id = id;
+        }
+
+        public void destroy() {
+            Utils.delete(dbDir);
+        }
+    }
+
+    private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
+        private final Serdes<?, V> serdes;
+        private final KeyValueIterator<byte[], byte[]>[] iterators;
+        private int index = 0;
+
+        RocksDBWindowStoreIterator(Serdes<?, V> serdes) {
+            this(serdes, WindowStoreUtil.NO_ITERATORS);
+        }
+
+        RocksDBWindowStoreIterator(Serdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) {
+            this.serdes = serdes;
+            this.iterators = iterators;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (index < iterators.length) {
+                if (iterators[index].hasNext())
+                    return true;
+
+                index++;
+            }
+            return false;
+        }
+
+        @Override
+        public V next() {
+            if (index >= iterators.length)
+                throw new NoSuchElementException();
+
+            return serdes.valueFrom(iterators[index].next().value());
+        }
+
+        @Override
+        public void remove() {
+            if (index < iterators.length)
+                iterators[index].remove();
+        }
+
+        public void close() {
+            for (KeyValueIterator<byte[], byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    private final String name;
+    private final long windowBefore;
+    private final long windowAfter;
+    private final long segmentInterval;
+    private final Segment[] segments;
+    private final Serdes<K, V> serdes;
+    private final SimpleDateFormat formatter;
+
+    private ProcessorContext context;
+    private long currentSegmentId = -1L;
+    private int seqnum = 0;
+
+    public RocksDBWindowStore(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes) {
+        this.name = name;
+        this.windowBefore = windowBefore;
+        this.windowAfter = windowAfter;
+
+        // The retention period must be at least two times as long as the total window size
+        if ((this.windowBefore + this.windowAfter + 1) * 2 > retentionPeriod)
+            retentionPeriod = (this.windowBefore + this.windowAfter + 1) * 2;
+
+        // The segment interval must be greater than MIN_SEGMENT_INTERVAL
+        this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
+
+        this.segments = new Segment[numSegments];
+        this.serdes = serdes;
+
+        // Create a date formatter. Formatted timestamps are used as segment name suffixes
+        this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
+        this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void init(ProcessorContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public void flush() {
+        for (Segment segment : segments) {
+            if (segment != null)
+                segment.flush();
+        }
+    }
+
+    @Override
+    public void close() {
+        for (Segment segment : segments) {
+            if (segment != null)
+                segment.close();
+        }
+    }
+
+    @Override
+    public void put(K key, V value) {
+        putAndReturnInternalKey(key, value);
+    }
+
+    @Override
+    public byte[] putAndReturnInternalKey(K key, V value) {
+        long timestamp = context.timestamp();
+        long segmentId = segmentId(timestamp);
+
+        if (segmentId > currentSegmentId) {
+            // A new segment will be created. Clean up old segments first.
+            currentSegmentId = segmentId;
+            cleanup();
+        }
+
+        // If the record is within the retention period, put it in the store.
+        if (segmentId > currentSegmentId - segments.length) {
+            seqnum = (seqnum + 1) & 0x7FFFFFFF;
+            byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes);
+            getSegment(segmentId).put(binaryKey, serdes.rawValue(value));
+            return binaryKey;
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public void putInternal(byte[] binaryKey, byte[] binaryValue) {
+        long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
+
+        if (segmentId > currentSegmentId) {
+            // A new segment will be created. Clean up old segments first.
+            currentSegmentId = segmentId;
+            cleanup();
+        }
+
+        // If the record is within the retention period, put it in the store.
+        if (segmentId > currentSegmentId - segments.length)
+            getSegment(segmentId).put(binaryKey, binaryValue);
+    }
+
+    @Override
+    public byte[] getInternal(byte[] binaryKey) {
+        long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
+
+        Segment segment = segments[(int) (segmentId % segments.length)];
+
+        if (segment != null && segment.id == segmentId) {
+            return segment.get(binaryKey);
+        } else {
+            return null;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public WindowStoreIterator<V> fetch(K key, long timestamp) {
+        long timeFrom = Math.max(0L, timestamp - windowBefore);
+        long timeTo = Math.max(0L, timestamp + windowAfter);
+
+        long segFrom = segmentId(timeFrom);
+        long segTo = segmentId(Math.max(0L, timeTo));
+
+        byte[] binaryFrom = WindowStoreUtil.toBinaryKey(key, timeFrom, 0, serdes);
+        byte[] binaryUntil = WindowStoreUtil.toBinaryKey(key, timeTo + 1L, 0, serdes);
+
+        ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
+
+        for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
+            Segment segment = segments[(int) (segmentId % segments.length)];
+
+            if (segment != null && segment.id == segmentId)
+                iterators.add(segment.range(binaryFrom, binaryUntil));
+        }
+
+        if (iterators.size() > 0) {
+            return new RocksDBWindowStoreIterator<>(serdes, iterators.toArray(new KeyValueIterator[iterators.size()]));
+        } else {
+            return new RocksDBWindowStoreIterator<>(serdes);
+        }
+    }
+
+    private Segment getSegment(long segmentId) {
+        int index = (int) (segmentId % segments.length);
+
+        if (segments[index] == null) {
+            segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId);
+            segments[index].init(context);
+        }
+
+        return segments[index];
+    }
+
+    private void cleanup() {
+        for (int i = 0; i < segments.length; i++) {
+            if (segments[i] != null && segments[i].id <= currentSegmentId - segments.length) {
+                segments[i].close();
+                segments[i].destroy();
+                segments[i] = null;
+            }
+        }
+    }
+
+    public long segmentId(long timestamp) {
+        return timestamp / segmentInterval;
+    }
+
+    public String directorySuffix(long segmentId) {
+        return formatter.format(new Date(segmentId * segmentInterval));
+    }
+
+    // this method is used by a test
+    public Set<Long> segmentIds() {
+        HashSet<Long> segmentIds = new HashSet<>();
+
+        for (Segment segment : segments) {
+            if (segment != null)
+                segmentIds.add(segment.id);
+        }
+
+        return segmentIds;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
new file mode 100644
index 0000000..41c725d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+/**
+ * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ *
+ * @see Stores#create(String)
+ */
+public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
+
+    private final String name;
+    private final long windowBefore;
+    private final long windowAfter;
+    private final long retentionPeriod;
+    private final int numSegments;
+    private final Serdes serdes;
+    private final Time time;
+
+    protected RocksDBWindowStoreSupplier(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes, Time time) {
+        this.name = name;
+        this.windowBefore = windowBefore;
+        this.windowAfter = windowAfter;
+        this.retentionPeriod = retentionPeriod;
+        this.numSegments = numSegments;
+        this.serdes = serdes;
+        this.time = time;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public StateStore get() {
+        return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, windowBefore, windowAfter, retentionPeriod, numSegments, serdes), "rocksdb-window", time);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
new file mode 100644
index 0000000..ee6624e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class StoreChangeLogger<K, V> {
+
+    public interface ValueGetter<K, V> {
+        V get(K key);
+    }
+
+    protected final Serdes<K, V> serialization;
+
+    private final Set<K> dirty;
+    private final Set<K> removed;
+    private final int maxDirty;
+    private final int maxRemoved;
+
+    private final String topic;
+    private int partition;
+    private ProcessorContext context;
+
+    // always wrap the logged store with the metered store
+    public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
+        this.topic = topic;
+        this.serialization = serialization;
+        this.context = context;
+        this.partition = context.id().partition;
+
+        this.dirty = new HashSet<>();
+        this.removed = new HashSet<>();
+        this.maxDirty = 100; // TODO: this needs to be configurable
+        this.maxRemoved = 100; // TODO: this needs to be configurable
+    }
+
+    public void add(K key) {
+        this.dirty.add(key);
+        this.removed.remove(key);
+    }
+
+    public void delete(K key) {
+        this.dirty.remove(key);
+        this.removed.add(key);
+    }
+
+    public void maybeLogChange(ValueGetter<K, V> getter) {
+        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
+            logChange(getter);
+    }
+
+    public void logChange(ValueGetter<K, V> getter) {
+        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
+        if (collector != null) {
+            Serializer<K> keySerializer = serialization.keySerializer();
+            Serializer<V> valueSerializer = serialization.valueSerializer();
+
+            for (K k : this.removed) {
+                collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
+            }
+            for (K k : this.dirty) {
+                V v = getter.get(k);
+                collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
+            }
+            this.removed.clear();
+            this.dirty.clear();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
new file mode 100644
index 0000000..344aa91
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.processor.StateStore;
+
+public interface WindowStore<K, V> extends StateStore {
+
+    void put(K key, V value);
+
+    byte[] putAndReturnInternalKey(K key, V value);
+
+    WindowStoreIterator<V> fetch(K key, long timestamp);
+
+    void putInternal(byte[] binaryKey, byte[] binaryValue);
+
+    byte[] getInternal(byte[] binaryKey);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
new file mode 100644
index 0000000..e57a00f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import java.util.Iterator;
+
+public interface WindowStoreIterator<E> extends Iterator<E> {
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java
new file mode 100644
index 0000000..b11a206
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import java.nio.ByteBuffer;
+
+public class WindowStoreUtil<K, V> {
+
+    public static final int TIMESTAMP_SIZE = 8;
+    public static final int SEQNUM_SIZE = 4;
+    public static final Serdes<byte[], byte[]> INNER_SERDES = Serdes.withBuiltinTypes("", byte[].class, byte[].class);
+    @SuppressWarnings("unchecked")
+    public static final KeyValueIterator<byte[], byte[]>[] NO_ITERATORS = (KeyValueIterator<byte[], byte[]>[]) new KeyValueIterator[0];
+
+    public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, Serdes<K, ?> serdes) {
+        byte[] serializedKey = serdes.rawKey(key);
+
+        ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);
+        buf.put(serializedKey);
+        buf.putLong(timestamp);
+        buf.putInt(seqnum);
+
+        return buf.array();
+    }
+
+    public static <K> K keyFromBinaryKey(byte[] binaryKey, Serdes<K, ?> serdes) {
+        byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
+
+        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
+
+        return serdes.keyFrom(bytes);
+    }
+
+    public static long timestampFromBinaryKey(byte[] binaryKey) {
+        return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 9e24741..6f74da8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -221,7 +221,6 @@ public class KeyValueStoreTestDriver<K, V> {
     private final Map<K, V> flushedEntries = new HashMap<>();
     private final Set<K> flushedRemovals = new HashSet<>();
     private final List<Entry<K, V>> restorableEntries = new LinkedList<>();
-    private final StreamingConfig config;
     private final MockProcessorContext context;
     private final Map<String, StateStore> storeMap = new HashMap<>();
     private final StreamingMetrics metrics = new StreamingMetrics() {
@@ -235,7 +234,7 @@ public class KeyValueStoreTestDriver<K, V> {
         }
     };
     private final RecordCollector recordCollector;
-    private File stateDir = new File("build/data").getAbsoluteFile();
+    private File stateDir = null;
 
     protected KeyValueStoreTestDriver(Serdes<K, V> serdes) {
         this.serdes = serdes;
@@ -247,6 +246,9 @@ public class KeyValueStoreTestDriver<K, V> {
                 recordFlushed(record.key(), record.value());
             }
         };
+        this.stateDir = StateUtils.tempDir();
+        this.stateDir.mkdirs();
+
         Properties props = new Properties();
         props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
@@ -254,9 +256,8 @@ public class KeyValueStoreTestDriver<K, V> {
         props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
         props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
         props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
-        this.config = new StreamingConfig(props);
 
-        this.context = new MockProcessorContext(null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
+        this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
                 serdes.valueDeserializer(), recordCollector) {
             @Override
             public TaskId id() {
@@ -286,24 +287,11 @@ public class KeyValueStoreTestDriver<K, V> {
 
             @Override
             public File stateDir() {
-                if (stateDir == null) {
-                    stateDir = StateUtils.tempDir();
-                }
-                stateDir.mkdirs();
                 return stateDir;
             }
         };
     }
 
-    /**
-     * Set the directory that should be used by the store for local disk storage.
-     *
-     * @param dir the directory; may be null if no local storage is allowed
-     */
-    public void useStateDir(File dir) {
-        this.stateDir = dir;
-    }
-
     @SuppressWarnings("unchecked")
     protected <K1, V1> void recordFlushed(K1 key, V1 value) {
         K k = (K) key;

http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
new file mode 100644
index 0000000..6bfddfe
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.test.MockProcessorContext;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RocksDBWindowStoreTest {
+
+    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
+    private final ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer();
+    private final int numSegments = 3;
+    private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL;
+    private final long retentionPeriod = segmentSize * (numSegments - 1);
+    private final long windowSize = 3;
+    private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class, String.class);
+
+
+    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, long windowBefore, long windowAfter, Serdes<K, V> serdes) {
+        StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", windowBefore, windowAfter, retentionPeriod, numSegments, serdes, null);
+        WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
+        store.init(context);
+        return store;
+
+    }
+
+    @Test
+    public void testPutAndFetch() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            try {
+                long startTime = segmentSize - 4L;
+
+                context.setTime(startTime + 0L);
+                store.put(0, "zero");
+                context.setTime(startTime + 1L);
+                store.put(1, "one");
+                context.setTime(startTime + 2L);
+                store.put(2, "two");
+                context.setTime(startTime + 3L);
+                // (3, "three") is not put
+                context.setTime(startTime + 4L);
+                store.put(4, "four");
+                context.setTime(startTime + 5L);
+                store.put(5, "five");
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L)));
+
+                context.setTime(startTime + 3L);
+                store.put(2, "two+1");
+                context.setTime(startTime + 4L);
+                store.put(2, "two+2");
+                context.setTime(startTime + 5L);
+                store.put(2, "two+3");
+                context.setTime(startTime + 6L);
+                store.put(2, "two+4");
+                context.setTime(startTime + 7L);
+                store.put(2, "two+5");
+                context.setTime(startTime + 8L);
+                store.put(2, "two+6");
+
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L)));
+                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L)));
+                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L)));
+
+                // Flush the store and verify all current entries were properly flushed ...
+                store.flush();
+
+                Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+                assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+                assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+                assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+                assertNull(entriesByKey.get(3));
+                assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+                assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+                assertNull(entriesByKey.get(6));
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testPutAndFetchBefore() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, windowSize, 0, serdes);
+            try {
+                long startTime = segmentSize - 4L;
+
+                context.setTime(startTime + 0L);
+                store.put(0, "zero");
+                context.setTime(startTime + 1L);
+                store.put(1, "one");
+                context.setTime(startTime + 2L);
+                store.put(2, "two");
+                context.setTime(startTime + 3L);
+                // (3, "three") is not put
+                context.setTime(startTime + 4L);
+                store.put(4, "four");
+                context.setTime(startTime + 5L);
+                store.put(5, "five");
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L)));
+
+                context.setTime(startTime + 3L);
+                store.put(2, "two+1");
+                context.setTime(startTime + 4L);
+                store.put(2, "two+2");
+                context.setTime(startTime + 5L);
+                store.put(2, "two+3");
+                context.setTime(startTime + 6L);
+                store.put(2, "two+4");
+                context.setTime(startTime + 7L);
+                store.put(2, "two+5");
+                context.setTime(startTime + 8L);
+                store.put(2, "two+6");
+
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
+                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L)));
+                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L)));
+
+                // Flush the store and verify all current entries were properly flushed ...
+                store.flush();
+
+                Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+                assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+                assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+                assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+                assertNull(entriesByKey.get(3));
+                assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+                assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+                assertNull(entriesByKey.get(6));
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testPutAndFetchAfter() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, 0, windowSize, serdes);
+            try {
+                long startTime = segmentSize - 4L;
+
+                context.setTime(startTime + 0L);
+                store.put(0, "zero");
+                context.setTime(startTime + 1L);
+                store.put(1, "one");
+                context.setTime(startTime + 2L);
+                store.put(2, "two");
+                context.setTime(startTime + 3L);
+                // (3, "three") is not put
+                context.setTime(startTime + 4L);
+                store.put(4, "four");
+                context.setTime(startTime + 5L);
+                store.put(5, "five");
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L)));
+
+                context.setTime(startTime + 3L);
+                store.put(2, "two+1");
+                context.setTime(startTime + 4L);
+                store.put(2, "two+2");
+                context.setTime(startTime + 5L);
+                store.put(2, "two+3");
+                context.setTime(startTime + 6L);
+                store.put(2, "two+4");
+                context.setTime(startTime + 7L);
+                store.put(2, "two+5");
+                context.setTime(startTime + 8L);
+                store.put(2, "two+6");
+
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L)));
+                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L)));
+                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L)));
+
+                // Flush the store and verify all current entries were properly flushed ...
+                store.flush();
+
+                Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+                assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+                assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+                assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+                assertNull(entriesByKey.get(3));
+                assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+                assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+                assertNull(entriesByKey.get(6));
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testPutSameKeyTimestamp() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            try {
+                long startTime = segmentSize - 4L;
+
+                context.setTime(startTime);
+                store.put(0, "zero");
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime)));
+
+                context.setTime(startTime);
+                store.put(0, "zero");
+                context.setTime(startTime);
+                store.put(0, "zero+");
+                context.setTime(startTime);
+                store.put(0, "zero++");
+
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L)));
+
+                // Flush the store and verify all current entries were properly flushed ...
+                store.flush();
+
+                Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+                assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testRolling() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            RocksDBWindowStore<Integer, String> inner =
+                    (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
+            try {
+                long startTime = segmentSize * 2;
+                long incr = segmentSize / 2;
+
+                context.setTime(startTime);
+                store.put(0, "zero");
+                assertEquals(Utils.mkSet(2L), inner.segmentIds());
+
+                context.setTime(startTime + incr);
+                store.put(1, "one");
+                assertEquals(Utils.mkSet(2L), inner.segmentIds());
+
+                context.setTime(startTime + incr * 2);
+                store.put(2, "two");
+                assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
+
+                context.setTime(startTime + incr * 3);
+                // (3, "three") is not put
+                assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
+
+                context.setTime(startTime + incr * 4);
+                store.put(4, "four");
+                assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
+
+                context.setTime(startTime + incr * 5);
+                store.put(5, "five");
+                assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
+
+                context.setTime(startTime + incr * 6);
+                store.put(6, "six");
+                assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
+
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
+
+                context.setTime(startTime + incr * 7);
+                store.put(7, "seven");
+                assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
+
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7)));
+
+                context.setTime(startTime + incr * 8);
+                store.put(8, "eight");
+                assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
+
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7)));
+                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8)));
+
+                // check segment directories
+                store.flush();
+                assertEquals(
+                        Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)),
+                        segmentDirs(baseDir)
+                );
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testRestore() throws IOException {
+        final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+        long startTime = segmentSize * 2;
+        long incr = segmentSize / 2;
+
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            try {
+                context.setTime(startTime);
+                store.put(0, "zero");
+                context.setTime(startTime + incr);
+                store.put(1, "one");
+                context.setTime(startTime + incr * 2);
+                store.put(2, "two");
+                context.setTime(startTime + incr * 3);
+                store.put(3, "three");
+                context.setTime(startTime + incr * 4);
+                store.put(4, "four");
+                context.setTime(startTime + incr * 5);
+                store.put(5, "five");
+                context.setTime(startTime + incr * 6);
+                store.put(6, "six");
+                context.setTime(startTime + incr * 7);
+                store.put(7, "seven");
+                context.setTime(startTime + incr * 8);
+                store.put(8, "eight");
+                store.flush();
+
+            } finally {
+                store.close();
+            }
+
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+
+        File baseDir2 = Files.createTempDirectory("test").toFile();
+        try {
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            RocksDBWindowStore<Integer, String> inner =
+                    (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
+
+            try {
+                context.restore("window", changeLog);
+
+                assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
+
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7)));
+                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8)));
+
+                // check segment directories
+                store.flush();
+                assertEquals(
+                        Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)),
+                        segmentDirs(baseDir)
+                );
+            } finally {
+                store.close();
+            }
+
+
+        } finally {
+            Utils.delete(baseDir2);
+        }
+    }
+
+    private <E> List<E> toList(WindowStoreIterator<E> iterator) {
+        ArrayList<E> list = new ArrayList<>();
+        while (iterator.hasNext()) {
+            list.add(iterator.next());
+        }
+        return list;
+    }
+
+    private Set<String> segmentDirs(File baseDir) {
+        File rocksDbDir = new File(baseDir, "rocksdb");
+        String[] subdirs = rocksDbDir.list();
+
+        HashSet<String> set = new HashSet<>();
+
+        for (String subdir : subdirs) {
+            if (subdir.startsWith("window-"))
+            set.add(subdir.substring(7));
+        }
+        return set;
+    }
+
+    private Map<Integer, Set<String>> entriesByKey(List<Entry<byte[], byte[]>> changeLog, long startTime) {
+        HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
+
+        for (Entry<byte[], byte[]> entry : changeLog) {
+            long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key());
+            Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key(), serdes);
+            String value = entry.value() == null ? null : serdes.valueFrom(entry.value());
+
+            Set<String> entries = entriesByKey.get(key);
+            if (entries == null) {
+                entries = new HashSet<>();
+                entriesByKey.put(key, entries);
+            }
+            entries.add(value + "@" + (timestamp - startTime));
+        }
+
+        return entriesByKey;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
index c7ea748..f342dcd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.state;
 
-import org.apache.kafka.test.TestUtils;
-
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.FileVisitResult;
@@ -39,17 +37,21 @@ public class StateUtils {
      * @return the new directory that will exist; never null
      */
     public static File tempDir() {
-        final File dir = new File(TestUtils.IO_TMP_DIR, "kafka-" + INSTANCE_COUNTER.incrementAndGet());
-        dir.mkdirs();
-        dir.deleteOnExit();
+        try {
+            final File dir = Files.createTempDirectory("test").toFile();
+            dir.mkdirs();
+            dir.deleteOnExit();
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                deleteDirectory(dir);
-            }
-        });
-        return dir;
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    deleteDirectory(dir);
+                }
+            });
+            return dir;
+        } catch (IOException ex) {
+            throw new RuntimeException("failed to create a temp dir", ex);
+        }
     }
 
     private static void deleteDirectory(File dir) {
@@ -74,4 +76,4 @@ public class StateUtils {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index a6c2759..5275545 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -53,8 +53,8 @@ public class KStreamTestDriver {
                              Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
                              Serializer<?> valSerializer, Deserializer<?> valDeserializer) {
         this.topology = builder.build(null);
-        this.context = new MockProcessorContext(this, keySerializer, keyDeserializer, valSerializer, valDeserializer, new MockRecordCollector());
         this.stateDir = stateDir;
+        this.context = new MockProcessorContext(this, stateDir, keySerializer, keyDeserializer, valSerializer, valDeserializer, new MockRecordCollector());
 
         for (StateStoreSupplier stateStoreSupplier : topology.stateStoreSuppliers()) {
             StateStore store = stateStoreSupplier.get();