You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/14 22:54:35 UTC

[2/2] kafka git commit: KAFKA-2593: Key value stores can use specified serializers and deserializers

KAFKA-2593: Key value stores can use specified serializers and deserializers

Add support for the key value stores to use specified serializers and deserializers (aka, "serdes"). Prior to this change, the stores were limited to only the default serdes specified in the topology's configuration and exposed to the processors via the ProcessorContext.

Now, using InMemoryKeyValueStore and RocksDBKeyValueStore are similar: both are parameterized on the key and value types, and both have similar multiple static factory methods. The static factory methods either take explicit key and value serdes, take key and value class types so the serdes can be inferred (only for the built-in serdes for string, integer, long, and byte array types), or use the default serdes on the ProcessorContext.

Author: Randall Hauch <rh...@gmail.com>

Reviewers: Guozhang Wang

Closes #255 from rhauch/kafka-2593


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6e571225
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6e571225
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6e571225

Branch: refs/heads/trunk
Commit: 6e571225d51f5d71ffcfc4d22d108e89ae2f46ec
Parents: f13d115
Author: Randall Hauch <rh...@gmail.com>
Authored: Wed Oct 14 13:59:10 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 14 13:59:10 2015 -0700

----------------------------------------------------------------------
 .../kafka/streams/examples/ProcessorJob.java    |  12 +-
 .../streams/kstream/SlidingWindowSupplier.java  |   3 +-
 .../internals/ProcessorContextImpl.java         |   3 +-
 .../processor/internals/RecordCollector.java    |  11 +
 .../streams/processor/internals/SinkNode.java   |   2 +-
 .../streams/state/InMemoryKeyValueStore.java    |  21 +-
 .../streams/state/MeteredKeyValueStore.java     |  20 +-
 .../streams/state/RocksDBKeyValueStore.java     |  94 ++--
 .../org/apache/kafka/streams/state/Serdes.java  | 164 +++++++
 .../org/apache/kafka/streams/state/Stores.java  | 257 +++++++++++
 .../internals/ProcessorTopologyTest.java        |  56 +--
 .../state/AbstractKeyValueStoreTest.java        | 191 ++++++++
 .../state/InMemoryKeyValueStoreTest.java        |  38 ++
 .../streams/state/KeyValueStoreTestDriver.java  | 441 +++++++++++++++++++
 .../streams/state/RocksDBKeyValueStoreTest.java |  38 ++
 .../apache/kafka/streams/state/StateUtils.java  |  77 ++++
 .../apache/kafka/test/MockProcessorContext.java |  54 ++-
 17 files changed, 1359 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 92e6284..0317b9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -17,20 +17,20 @@
 
 package org.apache.kafka.streams.examples;
 
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreaming;
+import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.state.Entry;
-import org.apache.kafka.streams.state.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
 
 import java.util.Properties;
 
@@ -48,7 +48,7 @@ public class ProcessorJob {
                 public void init(ProcessorContext context) {
                     this.context = context;
                     this.context.schedule(1000);
-                    this.kvStore = new InMemoryKeyValueStore<>("local-state", context);
+                    this.kvStore = Stores.create("local-state", context).withStringKeys().withIntegerValues().inMemory().build();
                 }
 
                 @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
index 0110c87..bf6b4dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.FilteredIterator;
 import org.apache.kafka.streams.kstream.internals.WindowSupport;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.Stamped;
@@ -186,7 +185,7 @@ public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> {
             IntegerSerializer intSerializer = new IntegerSerializer();
             ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
 
-            RecordCollector collector = ((ProcessorContextImpl) context).recordCollector();
+            RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
 
             for (Map.Entry<K, ValueList<V>> entry : map.entrySet()) {
                 ValueList<V> values = entry.getValue();

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 60ac1df..5cb53a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -37,7 +37,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public class ProcessorContextImpl implements ProcessorContext {
+public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier {
 
     private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class);
 
@@ -75,6 +75,7 @@ public class ProcessorContextImpl implements ProcessorContext {
         this.initialized = false;
     }
 
+    @Override
     public RecordCollector recordCollector() {
         return this.collector;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index ad2f647..f0dbf35 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -31,6 +31,17 @@ import java.util.Map;
 
 public class RecordCollector {
 
+    /**
+     * A supplier of a {@link RecordCollector} instance.
+     */
+    public static interface Supplier {
+        /**
+         * Get the record collector.
+         * @return the record collector
+         */
+        public RecordCollector recordCollector();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(RecordCollector.class);
 
     private final Producer<byte[], byte[]> producer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index e2d881c..9f01727 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -53,7 +53,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     @Override
     public void process(K key, V value) {
         // send to all the registered topics
-        RecordCollector collector = ((ProcessorContextImpl) context).recordCollector();
+        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
         collector.send(new ProducerRecord<>(topic, key, value), keySerializer, valSerializer);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
index 59a8496..1eb526f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
@@ -17,9 +17,9 @@
 
 package org.apache.kafka.streams.state;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
 
 import java.util.Iterator;
 import java.util.List;
@@ -28,33 +28,28 @@ import java.util.NavigableMap;
 import java.util.TreeMap;
 
 /**
- * An in-memory key-value store based on a TreeMap
+ * An in-memory key-value store based on a TreeMap.
  *
  * @param <K> The key type
  * @param <V> The value type
+ * 
+ * @see Stores#create(String, ProcessorContext)
  */
 public class InMemoryKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
 
-    public InMemoryKeyValueStore(String name, ProcessorContext context) {
-        this(name, context, new SystemTime());
-    }
-
-    public InMemoryKeyValueStore(String name, ProcessorContext context, Time time) {
-        super(name, new MemoryStore<K, V>(name, context), context, "in-memory-state", time);
+    protected InMemoryKeyValueStore(String name, ProcessorContext context, Serdes<K, V> serdes, Time time) {
+        super(name, new MemoryStore<K, V>(name), context, serdes, "in-memory-state", time != null ? time : new SystemTime());
     }
 
     private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
 
         private final String name;
         private final NavigableMap<K, V> map;
-        private final ProcessorContext context;
 
-        @SuppressWarnings("unchecked")
-        public MemoryStore(String name, ProcessorContext context) {
+        public MemoryStore(String name) {
             super();
             this.name = name;
             this.map = new TreeMap<>();
-            this.context = context;
         }
 
         @Override
@@ -137,4 +132,4 @@ public class InMemoryKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
 
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index 68333d5..90eee05 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 
 import java.util.HashSet;
@@ -35,6 +34,7 @@ import java.util.Set;
 public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     protected final KeyValueStore<K, V> inner;
+    protected final Serdes<K, V> serialization;
 
     private final Time time;
     private final Sensor putTime;
@@ -54,8 +54,10 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private final ProcessorContext context;
 
     // always wrap the logged store with the metered store
-    public MeteredKeyValueStore(final String name, final KeyValueStore<K, V> inner, ProcessorContext context, String metricGrp, Time time) {
+    public MeteredKeyValueStore(final String name, final KeyValueStore<K, V> inner, ProcessorContext context,
+                                Serdes<K, V> serialization, String metricGrp, Time time) {
         this.inner = inner;
+        this.serialization = serialization;
 
         this.time = time;
         this.metrics = context.metrics();
@@ -79,8 +81,8 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         // register and possibly restore the state from the logs
         long startNs = time.nanoseconds();
         try {
-            final Deserializer<K> keyDeserializer = (Deserializer<K>) context.keyDeserializer();
-            final Deserializer<V> valDeserializer = (Deserializer<V>) context.valueDeserializer();
+            final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
+            final Deserializer<V> valDeserializer = serialization.valueDeserializer();
 
             context.register(this, new StateRestoreCallback() {
                 @Override
@@ -188,11 +190,11 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     private void logChange() {
-        RecordCollector collector = ((ProcessorContextImpl) context).recordCollector();
-        Serializer<K> keySerializer = (Serializer<K>) context.keySerializer();
-        Serializer<V> valueSerializer = (Serializer<V>) context.valueSerializer();
-
+        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
         if (collector != null) {
+            Serializer<K> keySerializer = serialization.keySerializer();
+            Serializer<V> valueSerializer = serialization.valueSerializer();
+
             for (K k : this.dirty) {
                 V v = this.inner.get(k);
                 collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
@@ -239,4 +241,4 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
index 373bba0..32897ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
@@ -17,11 +17,10 @@
 
 package org.apache.kafka.streams.state;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.SystemTime;
-
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.CompressionType;
@@ -37,17 +36,21 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
 
-public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
-
-    public RocksDBKeyValueStore(String name, ProcessorContext context) {
-        this(name, context, new SystemTime());
-    }
+/**
+ * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ * 
+ * @see Stores#create(String, ProcessorContext)
+ */
+public class RocksDBKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
 
-    public RocksDBKeyValueStore(String name, ProcessorContext context, Time time) {
-        super(name, new RocksDBStore(name, context), context, "rocksdb-state", time);
+    protected RocksDBKeyValueStore(String name, ProcessorContext context, Serdes<K, V> serdes, Time time) {
+        super(name, new RocksDBStore<K, V>(name, context, serdes), context, serdes, "rocksdb-state", time != null ? time : new SystemTime());
     }
 
-    private static class RocksDBStore implements KeyValueStore<byte[], byte[]> {
+    private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
         private static final int TTL_NOT_USED = -1;
 
@@ -61,6 +64,8 @@ public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
         private static final int MAX_WRITE_BUFFERS = 3;
         private static final String DB_FILE_DIR = "rocksdb";
 
+        private final Serdes<K, V> serdes;
+
         private final String topic;
         private final int partition;
         private final ProcessorContext context;
@@ -74,11 +79,11 @@ public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
 
         private RocksDB db;
 
-        @SuppressWarnings("unchecked")
-        public RocksDBStore(String name, ProcessorContext context) {
+        public RocksDBStore(String name, ProcessorContext context, Serdes<K, V> serdes) {
             this.topic = name;
             this.partition = context.id();
             this.context = context;
+            this.serdes = serdes;
 
             // initialize the rocksdb options
             BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
@@ -109,6 +114,7 @@ public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
         private RocksDB openDB(File dir, Options options, int ttl) {
             try {
                 if (ttl == TTL_NOT_USED) {
+                    dir.getParentFile().mkdirs();
                     return RocksDB.open(options, dir.toString());
                 } else {
                     throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
@@ -132,9 +138,9 @@ public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
         }
 
         @Override
-        public byte[] get(byte[] key) {
+        public V get(K key) {
             try {
-                return this.db.get(key);
+                return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
             } catch (RocksDBException e) {
                 // TODO: this needs to be handled more accurately
                 throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
@@ -142,12 +148,12 @@ public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
         }
 
         @Override
-        public void put(byte[] key, byte[] value) {
+        public void put(K key, V value) {
             try {
                 if (value == null) {
-                    db.remove(wOptions, key);
+                    db.remove(wOptions, serdes.rawKey(key));
                 } else {
-                    db.put(wOptions, key, value);
+                    db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
                 }
             } catch (RocksDBException e) {
                 // TODO: this needs to be handled more accurately
@@ -156,28 +162,28 @@ public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
         }
 
         @Override
-        public void putAll(List<Entry<byte[], byte[]>> entries) {
-            for (Entry<byte[], byte[]> entry : entries)
+        public void putAll(List<Entry<K, V>> entries) {
+            for (Entry<K, V> entry : entries)
                 put(entry.key(), entry.value());
         }
-
+        
         @Override
-        public byte[] delete(byte[] key) {
-            byte[] value = get(key);
+        public V delete(K key) {
+            V value = get(key);
             put(key, null);
             return value;
         }
 
         @Override
-        public KeyValueIterator<byte[], byte[]> range(byte[] from, byte[] to) {
-            return new RocksDBRangeIterator(db.newIterator(), from, to);
+        public KeyValueIterator<K, V> range(K from, K to) {
+            return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
         }
 
         @Override
-        public KeyValueIterator<byte[], byte[]> all() {
+        public KeyValueIterator<K, V> all() {
             RocksIterator innerIter = db.newIterator();
             innerIter.seekToFirst();
-            return new RocksDbIterator(innerIter);
+            return new RocksDbIterator<K, V>(innerIter, serdes);
         }
 
         @Override
@@ -196,19 +202,21 @@ public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
             db.close();
         }
 
-        private static class RocksDbIterator implements KeyValueIterator<byte[], byte[]> {
+        private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
             private final RocksIterator iter;
+            private final Serdes<K, V> serdes;
 
-            public RocksDbIterator(RocksIterator iter) {
+            public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
                 this.iter = iter;
+                this.serdes = serdes;
             }
 
-            protected byte[] peekKey() {
-                return this.getEntry().key();
+            protected byte[] peekRawKey() {
+                return iter.key();
             }
 
-            protected Entry<byte[], byte[]> getEntry() {
-                return new Entry<>(iter.key(), iter.value());
+            protected Entry<K, V> getEntry() {
+                return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
             }
 
             @Override
@@ -217,13 +225,12 @@ public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
             }
 
             @Override
-            public Entry<byte[], byte[]> next() {
+            public Entry<K, V> next() {
                 if (!hasNext())
                     throw new NoSuchElementException();
 
-                Entry<byte[], byte[]> entry = this.getEntry();
+                Entry<K, V> entry = this.getEntry();
                 iter.next();
-
                 return entry;
             }
 
@@ -253,24 +260,25 @@ public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
             }
         }
 
-        private static class RocksDBRangeIterator extends RocksDbIterator {
+        private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
             // RocksDB's JNI interface does not expose getters/setters that allow the
             // comparator to be pluggable, and the default is lexicographic, so it's
             // safe to just force lexicographic comparator here for now.
             private final Comparator<byte[]> comparator = new LexicographicComparator();
-            byte[] to;
+            byte[] rawToKey;
 
-            public RocksDBRangeIterator(RocksIterator iter, byte[] from, byte[] to) {
-                super(iter);
-                iter.seek(from);
-                this.to = to;
+            public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
+                    K from, K to) {
+                super(iter, serdes);
+                iter.seek(serdes.rawKey(from));
+                this.rawToKey = serdes.rawKey(to);
             }
 
             @Override
             public boolean hasNext() {
-                return super.hasNext() && comparator.compare(super.peekKey(), this.to) < 0;
+                return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
             }
         }
 
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
new file mode 100644
index 0000000..540d763
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+final class Serdes<K, V> {
+
+    public static <K, V> Serdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) {
+        Serializer<K> keySerializer = serializer(keyClass);
+        Deserializer<K> keyDeserializer = deserializer(keyClass);
+        Serializer<V> valueSerializer = serializer(valueClass);
+        Deserializer<V> valueDeserializer = deserializer(valueClass);
+        return new Serdes<>(topic, keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
+    }
+
+    @SuppressWarnings("unchecked")
+    static <T> Serializer<T> serializer(Class<T> type) {
+        if (String.class.isAssignableFrom(type)) return (Serializer<T>) new StringSerializer();
+        if (Integer.class.isAssignableFrom(type)) return (Serializer<T>) new IntegerSerializer();
+        if (Long.class.isAssignableFrom(type)) return (Serializer<T>) new LongSerializer();
+        if (byte[].class.isAssignableFrom(type)) return (Serializer<T>) new ByteArraySerializer();
+        throw new IllegalArgumentException("Unknown class for built-in serializer");
+    }
+
+    @SuppressWarnings("unchecked")
+    static <T> Deserializer<T> deserializer(Class<T> type) {
+        if (String.class.isAssignableFrom(type)) return (Deserializer<T>) new StringDeserializer();
+        if (Integer.class.isAssignableFrom(type)) return (Deserializer<T>) new IntegerDeserializer();
+        if (Long.class.isAssignableFrom(type)) return (Deserializer<T>) new LongDeserializer();
+        if (byte[].class.isAssignableFrom(type)) return (Deserializer<T>) new ByteArrayDeserializer();
+        throw new IllegalArgumentException("Unknown class for built-in serializer");
+    }
+
+    private final String topic;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valueSerializer;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+
+    /**
+     * Create a context for serialization using the specified serializers and deserializers.
+     * 
+     * @param topic the name of the topic
+     * @param keySerializer the serializer for keys; may not be null
+     * @param keyDeserializer the deserializer for keys; may not be null
+     * @param valueSerializer the serializer for values; may not be null
+     * @param valueDeserializer the deserializer for values; may not be null
+     */
+    public Serdes(String topic,
+            Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
+            Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) {
+        this.topic = topic;
+        this.keySerializer = keySerializer;
+        this.keyDeserializer = keyDeserializer;
+        this.valueSerializer = valueSerializer;
+        this.valueDeserializer = valueDeserializer;
+    }
+
+    /**
+     * Create a context for serialization using the specified serializers and deserializers, or if any of them are null the
+     * corresponding {@link ProcessorContext}'s default serializer or deserializer, which
+     * <em>must</em> match the key and value types used as parameters for this object.
+     * 
+     * @param topic the name of the topic
+     * @param keySerializer the serializer for keys; may be null if the {@link ProcessorContext#keySerializer() default
+     *            key serializer} should be used
+     * @param keyDeserializer the deserializer for keys; may be null if the {@link ProcessorContext#keyDeserializer() default
+     *            key deserializer} should be used
+     * @param valueSerializer the serializer for values; may be null if the {@link ProcessorContext#valueSerializer() default
+     *            value serializer} should be used
+     * @param valueDeserializer the deserializer for values; may be null if the {@link ProcessorContext#valueDeserializer()
+     *            default value deserializer} should be used
+     * @param context the processing context
+     */
+    @SuppressWarnings("unchecked")
+    public Serdes(String topic,
+            Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
+            Serializer<V> valueSerializer, Deserializer<V> valueDeserializer,
+            ProcessorContext context) {
+        this.topic = topic;
+        this.keySerializer = keySerializer != null ? keySerializer : (Serializer<K>) context.keySerializer();
+        this.keyDeserializer = keyDeserializer != null ? keyDeserializer : (Deserializer<K>) context.keyDeserializer();
+        this.valueSerializer = valueSerializer != null ? valueSerializer : (Serializer<V>) context.valueSerializer();
+        this.valueDeserializer = valueDeserializer != null ? valueDeserializer : (Deserializer<V>) context.valueDeserializer();
+    }
+
+    /**
+     * Create a context for serialization using the {@link ProcessorContext}'s default serializers and deserializers, which
+     * <em>must</em> match the key and value types used as parameters for this object.
+     * 
+     * @param topic the name of the topic
+     * @param context the processing context
+     */
+    @SuppressWarnings("unchecked")
+    public Serdes(String topic,
+            ProcessorContext context) {
+        this.topic = topic;
+        this.keySerializer = (Serializer<K>) context.keySerializer();
+        this.keyDeserializer = (Deserializer<K>) context.keyDeserializer();
+        this.valueSerializer = (Serializer<V>) context.valueSerializer();
+        this.valueDeserializer = (Deserializer<V>) context.valueDeserializer();
+    }
+
+    public Deserializer<K> keyDeserializer() {
+        return keyDeserializer;
+    }
+
+    public Serializer<K> keySerializer() {
+        return keySerializer;
+    }
+
+    public Deserializer<V> valueDeserializer() {
+        return valueDeserializer;
+    }
+
+    public Serializer<V> valueSerializer() {
+        return valueSerializer;
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    public K keyFrom(byte[] rawKey) {
+        return keyDeserializer.deserialize(topic, rawKey);
+    }
+
+    public V valueFrom(byte[] rawValue) {
+        return valueDeserializer.deserialize(topic, rawValue);
+    }
+
+    public byte[] rawKey(K key) {
+        return keySerializer.serialize(topic, key);
+    }
+
+    public byte[] rawValue(V value) {
+        return valueSerializer.serialize(topic, value);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
new file mode 100644
index 0000000..171ed44
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+/**
+ * Factory for creating key-value stores.
+ */
+public class Stores {
+
+    /**
+     * Begin to create a new {@link org.apache.kafka.streams.processor.StateStore} instance.
+     * 
+     * @param name the name of the store
+     * @param context the processor context
+     * @return the factory that can be used to specify other options or configurations for the store; never null
+     */
+    public static StoreFactory create(final String name, final ProcessorContext context) {
+        return new StoreFactory() {
+            @Override
+            public <K> ValueFactory<K> withKeys(final Serializer<K> keySerializer, final Deserializer<K> keyDeserializer) {
+                return new ValueFactory<K>() {
+                    @Override
+                    public <V> KeyValueFactory<K, V> withValues(final Serializer<V> valueSerializer,
+                                                                final Deserializer<V> valueDeserializer) {
+                        final Serdes<K, V> serdes = new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer,
+                                context);
+                        return new KeyValueFactory<K, V>() {
+                            @Override
+                            public InMemoryKeyValueFactory<K, V> inMemory() {
+                                return new InMemoryKeyValueFactory<K, V>() {
+                                    @Override
+                                    public KeyValueStore<K, V> build() {
+                                        return new InMemoryKeyValueStore<>(name, context, serdes, null);
+                                    }
+                                };
+                            }
+
+                            @Override
+                            public LocalDatabaseKeyValueFactory<K, V> localDatabase() {
+                                return new LocalDatabaseKeyValueFactory<K, V>() {
+                                    @Override
+                                    public KeyValueStore<K, V> build() {
+                                        return new RocksDBKeyValueStore<>(name, context, serdes, null);
+                                    }
+                                };
+                            }
+                        };
+                    }
+                };
+            }
+        };
+    }
+
+    public static abstract class StoreFactory {
+        /**
+         * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link String}s.
+         * 
+         * @return the interface used to specify the type of values; never null
+         */
+        public ValueFactory<String> withStringKeys() {
+            return withKeys(new StringSerializer(), new StringDeserializer());
+        }
+
+        /**
+         * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Integer}s.
+         * 
+         * @return the interface used to specify the type of values; never null
+         */
+        public ValueFactory<Integer> withIntegerKeys() {
+            return withKeys(new IntegerSerializer(), new IntegerDeserializer());
+        }
+
+        /**
+         * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Long}s.
+         * 
+         * @return the interface used to specify the type of values; never null
+         */
+        public ValueFactory<Long> withLongKeys() {
+            return withKeys(new LongSerializer(), new LongDeserializer());
+        }
+
+        /**
+         * Begin to create a {@link KeyValueStore} by specifying the keys will be byte arrays.
+         * 
+         * @return the interface used to specify the type of values; never null
+         */
+        public ValueFactory<byte[]> withByteArrayKeys() {
+            return withKeys(new ByteArraySerializer(), new ByteArrayDeserializer());
+        }
+
+        /**
+         * Begin to create a {@link KeyValueStore} by specifying the keys will be either {@link String}, {@link Integer},
+         * {@link Long}, or {@code byte[]}.
+         * 
+         * @param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serializers and
+         *            deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or
+         *            {@code byte[].class})
+         * @return the interface used to specify the type of values; never null
+         */
+        public <K> ValueFactory<K> withKeys(Class<K> keyClass) {
+            return withKeys(Serdes.serializer(keyClass), Serdes.deserializer(keyClass));
+        }
+
+        /**
+         * Begin to create a {@link KeyValueStore} by specifying the serializer and deserializer for the keys.
+         * 
+         * @param keySerializer the serializer for keys; may not be null
+         * @param keyDeserializer the deserializer for keys; may not be null
+         * @return the interface used to specify the type of values; never null
+         */
+        public abstract <K> ValueFactory<K> withKeys(Serializer<K> keySerializer, Deserializer<K> keyDeserializer);
+    }
+
+    /**
+     * The interface used to specify the type of values for key-value stores.
+     * 
+     * @param <K> the type of keys
+     */
+    public static abstract class ValueFactory<K> {
+        /**
+         * Use {@link String} values.
+         * 
+         * @return the interface used to specify the remaining key-value store options; never null
+         */
+        public KeyValueFactory<K, String> withStringValues() {
+            return withValues(new StringSerializer(), new StringDeserializer());
+        }
+
+        /**
+         * Use {@link Integer} values.
+         * 
+         * @return the interface used to specify the remaining key-value store options; never null
+         */
+        public KeyValueFactory<K, Integer> withIntegerValues() {
+            return withValues(new IntegerSerializer(), new IntegerDeserializer());
+        }
+
+        /**
+         * Use {@link Long} values.
+         * 
+         * @return the interface used to specify the remaining key-value store options; never null
+         */
+        public KeyValueFactory<K, Long> withLongValues() {
+            return withValues(new LongSerializer(), new LongDeserializer());
+        }
+
+        /**
+         * Use byte arrays for values.
+         * 
+         * @return the interface used to specify the remaining key-value store options; never null
+         */
+        public KeyValueFactory<K, byte[]> withByteArrayValues() {
+            return withValues(new ByteArraySerializer(), new ByteArrayDeserializer());
+        }
+
+        /**
+         * Use values of the specified type, which must be either {@link String}, {@link Integer}, {@link Long}, or {@code byte[]}
+         * .
+         * 
+         * @param valueClass the class for the values, which must be one of the types for which Kafka has built-in serializers and
+         *            deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or
+         *            {@code byte[].class})
+         * @return the interface used to specify the remaining key-value store options; never null
+         */
+        public <V> KeyValueFactory<K, V> withValues(Class<V> valueClass) {
+            return withValues(Serdes.serializer(valueClass), Serdes.deserializer(valueClass));
+        }
+
+        /**
+         * Use the specified serializer and deserializer for the values.
+         * 
+         * @param valueSerializer the serializer for value; may not be null
+         * @param valueDeserializer the deserializer for values; may not be null
+         * @return the interface used to specify the remaining key-value store options; never null
+         */
+        public abstract <V> KeyValueFactory<K, V> withValues(Serializer<V> valueSerializer, Deserializer<V> valueDeserializer);
+
+    }
+
+    /**
+     * The interface used to specify the different kinds of key-value stores.
+     *
+     * @param <K> the type of keys
+     * @param <V> the type of values
+     */
+    public static interface KeyValueFactory<K, V> {
+        /**
+         * Keep all key-value entries in-memory, although for durability all entries are recorded in a Kafka topic that can be
+         * read to restore the entries if they are lost.
+         * 
+         * @return the factory to create in-memory key-value stores; never null
+         */
+        InMemoryKeyValueFactory<K, V> inMemory();
+
+        /**
+         * Keep all key-value entries off-heap in a local database, although for durability all entries are recorded in a Kafka
+         * topic that can be read to restore the entries if they are lost.
+         * 
+         * @return the factory to create in-memory key-value stores; never null
+         */
+        LocalDatabaseKeyValueFactory<K, V> localDatabase();
+    }
+
+    /**
+     * The interface used to create in-memory key-value stores.
+     *
+     * @param <K> the type of keys
+     * @param <V> the type of values
+     */
+    public static interface InMemoryKeyValueFactory<K, V> {
+        /**
+         * Return the new key-value store.
+         * @return the key-value store; never null
+         */
+        KeyValueStore<K, V> build();
+    }
+
+    /**
+     * The interface used to create off-heap key-value stores that use a local database.
+     *
+     * @param <K> the type of keys
+     * @param <V> the type of values
+     */
+    public static interface LocalDatabaseKeyValueFactory<K, V> {
+        /**
+         * Return the new key-value store.
+         * @return the key-value store; never null
+         */
+        KeyValueStore<K, V> build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 50a23ec..5f8ca46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -34,9 +34,10 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateUtils;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.ProcessorTopologyTestDriver;
 import org.junit.After;
@@ -44,19 +45,12 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
 import java.util.Properties;
 
 public class ProcessorTopologyTest {
 
     private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
     private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
-    private static final File STATE_DIR = new File("build/data").getAbsoluteFile();
 
     protected static final String INPUT_TOPIC = "input-topic";
     protected static final String OUTPUT_TOPIC_1 = "output-topic-1";
@@ -69,10 +63,11 @@ public class ProcessorTopologyTest {
 
     @Before
     public void setup() {
-        STATE_DIR.mkdirs();
+        // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
+        File localState = StateUtils.tempDir();
         Properties props = new Properties();
         props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamingConfig.STATE_DIR_CONFIG, STATE_DIR.getAbsolutePath());
+        props.setProperty(StreamingConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
         props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
         props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
@@ -87,36 +82,16 @@ public class ProcessorTopologyTest {
             driver.close();
         }
         driver = null;
-        if (STATE_DIR.exists()) {
-            try {
-                Files.walkFileTree(STATE_DIR.toPath(), new SimpleFileVisitor<Path>() {
-                    @Override
-                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
-                        Files.delete(file);
-                        return FileVisitResult.CONTINUE;
-                    }
-    
-                    @Override
-                    public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
-                        Files.delete(dir);
-                        return FileVisitResult.CONTINUE;
-                    }
-    
-                });
-            } catch (IOException e) {
-                // do nothing
-            }
-        }
     }
-
+    
     @Test
     public void testTopologyMetadata() {
         final TopologyBuilder builder = new TopologyBuilder();
 
         builder.addSource("source-1", "topic-1");
         builder.addSource("source-2", "topic-2", "topic-3");
-        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
-        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-1", "source-2");
+        builder.addProcessor("processor-1", new MockProcessorSupplier<>(), "source-1");
+        builder.addProcessor("processor-2", new MockProcessorSupplier<>(), "source-1", "source-2");
         builder.addSink("sink-1", "topic-3", "processor-1");
         builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
 
@@ -289,7 +264,7 @@ public class ProcessorTopologyTest {
         @Override
         public void init(ProcessorContext context) {
             super.init(context);
-            store = new InMemoryKeyValueStore<>(storeName, context);
+            store = Stores.create(storeName, context).withStringKeys().withStringValues().inMemory().build();
         }
 
         @Override
@@ -306,12 +281,17 @@ public class ProcessorTopologyTest {
             }
             context().forward(Long.toString(streamTime), count);
         }
+        
+        @Override
+        public void close() {
+            store.close();
+        }
     }
 
-    protected ProcessorSupplier define(final Processor processor) {
-        return new ProcessorSupplier() {
+    protected <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) {
+        return new ProcessorSupplier<K, V>() {
             @Override
-            public Processor get() {
+            public Processor<K, V> get() {
                 return processor;
             }
         };
@@ -323,4 +303,4 @@ public class ProcessorTopologyTest {
             return timestamp;
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
new file mode 100644
index 0000000..d8f06ea
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.junit.Test;
+
+public abstract class AbstractKeyValueStoreTest {
+
+    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
+                                                                      Class<K> keyClass, Class<V> valueClass,
+                                                                      boolean useContextSerdes);
+    
+    @Test
+    public void testPutGetRange() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        try {
+
+            // Verify that the store reads and writes correctly ...
+            store.put(0, "zero");
+            store.put(1, "one");
+            store.put(2, "two");
+            store.put(4, "four");
+            store.put(5, "five");
+            assertEquals(5, driver.sizeOf(store));
+            assertEquals("zero", store.get(0));
+            assertEquals("one", store.get(1));
+            assertEquals("two", store.get(2));
+            assertNull(store.get(3));
+            assertEquals("four", store.get(4));
+            assertEquals("five", store.get(5));
+            store.delete(5);
+
+            // Flush the store and verify all current entries were properly flushed ...
+            store.flush();
+            assertEquals("zero", driver.flushedEntryStored(0));
+            assertEquals("one", driver.flushedEntryStored(1));
+            assertEquals("two", driver.flushedEntryStored(2));
+            assertEquals("four", driver.flushedEntryStored(4));
+            assertEquals(null, driver.flushedEntryStored(5));
+
+            assertEquals(false, driver.flushedEntryRemoved(0));
+            assertEquals(false, driver.flushedEntryRemoved(1));
+            assertEquals(false, driver.flushedEntryRemoved(2));
+            assertEquals(false, driver.flushedEntryRemoved(4));
+            assertEquals(true, driver.flushedEntryRemoved(5));
+
+            // Check range iteration ...
+            try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
+                while (iter.hasNext()) {
+                    Entry<Integer, String> entry = iter.next();
+                    if (entry.key().equals(2))
+                        assertEquals("two", entry.value());
+                    else if (entry.key().equals(4))
+                        assertEquals("four", entry.value());
+                    else
+                        fail("Unexpected entry: " + entry);
+                }
+            }
+
+            // Check range iteration ...
+            try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
+                while (iter.hasNext()) {
+                    Entry<Integer, String> entry = iter.next();
+                    if (entry.key().equals(2))
+                        assertEquals("two", entry.value());
+                    else if (entry.key().equals(4))
+                        assertEquals("four", entry.value());
+                    else
+                        fail("Unexpected entry: " + entry);
+                }
+            }
+        } finally {
+            store.close();
+        }
+    }
+
+    @Test
+    public void testPutGetRangeWithDefaultSerdes() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        try {
+
+            // Verify that the store reads and writes correctly ...
+            store.put(0, "zero");
+            store.put(1, "one");
+            store.put(2, "two");
+            store.put(4, "four");
+            store.put(5, "five");
+            assertEquals(5, driver.sizeOf(store));
+            assertEquals("zero", store.get(0));
+            assertEquals("one", store.get(1));
+            assertEquals("two", store.get(2));
+            assertNull(store.get(3));
+            assertEquals("four", store.get(4));
+            assertEquals("five", store.get(5));
+            store.delete(5);
+
+            // Flush the store and verify all current entries were properly flushed ...
+            store.flush();
+            assertEquals("zero", driver.flushedEntryStored(0));
+            assertEquals("one", driver.flushedEntryStored(1));
+            assertEquals("two", driver.flushedEntryStored(2));
+            assertEquals("four", driver.flushedEntryStored(4));
+            assertEquals(null, driver.flushedEntryStored(5));
+
+            assertEquals(false, driver.flushedEntryRemoved(0));
+            assertEquals(false, driver.flushedEntryRemoved(1));
+            assertEquals(false, driver.flushedEntryRemoved(2));
+            assertEquals(false, driver.flushedEntryRemoved(4));
+            assertEquals(true, driver.flushedEntryRemoved(5));
+        } finally {
+            store.close();
+        }
+    }
+
+    @Test
+    public void testRestore() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+
+        // Add any entries that will be restored to any store
+        // that uses the driver's context ...
+        driver.addEntryToRestoreLog(0, "zero");
+        driver.addEntryToRestoreLog(1, "one");
+        driver.addEntryToRestoreLog(2, "two");
+        driver.addEntryToRestoreLog(4, "four");
+
+        // Create the store, which should register with the context and automatically
+        // receive the restore entries ...
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        try {
+            // Verify that the store's contents were properly restored ...
+            assertEquals(0, driver.checkForRestoredEntries(store));
+
+            // and there are no other entries ...
+            assertEquals(4, driver.sizeOf(store));
+        } finally {
+            store.close();
+        }
+    }
+
+    @Test
+    public void testRestoreWithDefaultSerdes() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+
+        // Add any entries that will be restored to any store
+        // that uses the driver's context ...
+        driver.addEntryToRestoreLog(0, "zero");
+        driver.addEntryToRestoreLog(1, "one");
+        driver.addEntryToRestoreLog(2, "two");
+        driver.addEntryToRestoreLog(4, "four");
+
+        // Create the store, which should register with the context and automatically
+        // receive the restore entries ...
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        try {
+            // Verify that the store's contents were properly restored ...
+            assertEquals(0, driver.checkForRestoredEntries(store));
+
+            // and there are no other entries ...
+            assertEquals(4, driver.sizeOf(store));
+        } finally {
+            store.close();
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
new file mode 100644
index 0000000..bee9967
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context, Class<K> keyClass, Class<V> valueClass,
+                                                             boolean useContextSerdes) {
+        if (useContextSerdes) {
+            Serializer<K> keySer = (Serializer<K>) context.keySerializer();
+            Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
+            Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
+            Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
+            return Stores.create("my-store", context).withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
+        }
+        return Stores.create("my-store", context).withKeys(keyClass).withValues(valueClass).inMemory().build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
new file mode 100644
index 0000000..8fdbfff
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -0,0 +1,441 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.test.MockProcessorContext;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that
+ * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes.
+ * This class simplifies testing of various {@link KeyValueStore} instances, especially those that use
+ * {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka topic.
+ * <p>
+ * <h2>Basic usage</h2>
+ * This component can be used to help test a {@link KeyValueStore}'s ability to read and write entries.
+ * 
+ * <pre>
+ * // Create the test driver ...
+ * KeyValueStoreTestDriver&lt;Integer, String> driver = KeyValueStoreTestDriver.create();
+ * KeyValueStore&lt;Integer, String> store = Stores.create("my-store", driver.context())
+ *                                              .withIntegerKeys().withStringKeys()
+ *                                              .inMemory().build();
+ * 
+ * // Verify that the store reads and writes correctly ...
+ * store.put(0, "zero");
+ * store.put(1, "one");
+ * store.put(2, "two");
+ * store.put(4, "four");
+ * store.put(5, "five");
+ * assertEquals(5, driver.sizeOf(store));
+ * assertEquals("zero", store.get(0));
+ * assertEquals("one", store.get(1));
+ * assertEquals("two", store.get(2));
+ * assertEquals("four", store.get(4));
+ * assertEquals("five", store.get(5));
+ * assertNull(store.get(3));
+ * store.delete(5);
+ * 
+ * // Flush the store and verify all current entries were properly flushed ...
+ * store.flush();
+ * assertEquals("zero", driver.flushedEntryStored(0));
+ * assertEquals("one", driver.flushedEntryStored(1));
+ * assertEquals("two", driver.flushedEntryStored(2));
+ * assertEquals("four", driver.flushedEntryStored(4));
+ * assertEquals(null, driver.flushedEntryStored(5));
+ * 
+ * assertEquals(false, driver.flushedEntryRemoved(0));
+ * assertEquals(false, driver.flushedEntryRemoved(1));
+ * assertEquals(false, driver.flushedEntryRemoved(2));
+ * assertEquals(false, driver.flushedEntryRemoved(4));
+ * assertEquals(true, driver.flushedEntryRemoved(5));
+ * </pre>
+ * 
+ * <p>
+ * <h2>Restoring a store</h2>
+ * This component can be used to test whether a {@link KeyValueStore} implementation properly
+ * {@link ProcessorContext#register(StateStore, StateRestoreCallback) registers itself} with the {@link ProcessorContext}, so that
+ * the persisted contents of a store are properly restored from the flushed entries when the store instance is started.
+ * <p>
+ * To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object, Object) add entries} that will be
+ * passed to the store upon creation (simulating the entries that were previously flushed to the topic), and then create the store
+ * using this driver's {@link #context() ProcessorContext}:
+ * 
+ * <pre>
+ * // Create the test driver ...
+ * KeyValueStoreTestDriver&lt;Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+ * 
+ * // Add any entries that will be restored to any store that uses the driver's context ...
+ * driver.addRestoreEntry(0, "zero");
+ * driver.addRestoreEntry(1, "one");
+ * driver.addRestoreEntry(2, "two");
+ * driver.addRestoreEntry(4, "four");
+ * 
+ * // Create the store, which should register with the context and automatically
+ * // receive the restore entries ...
+ * KeyValueStore&lt;Integer, String> store = Stores.create("my-store", driver.context())
+ *                                              .withIntegerKeys().withStringKeys()
+ *                                              .inMemory().build();
+ * 
+ * // Verify that the store's contents were properly restored ...
+ * assertEquals(0, driver.checkForRestoredEntries(store));
+ * 
+ * // and there are no other entries ...
+ * assertEquals(4, driver.sizeOf(store));
+ * </pre>
+ * 
+ * @param <K> the type of keys placed in the store
+ * @param <V> the type of values placed in the store
+ */
+public class KeyValueStoreTestDriver<K, V> {
+
+    private static <T> Serializer<T> unusableSerializer() {
+        return new Serializer<T>() {
+            @Override
+            public void configure(Map<String, ?> configs, boolean isKey) {
+            }
+
+            @Override
+            public byte[] serialize(String topic, T data) {
+                throw new UnsupportedOperationException("This serializer should not be used");
+            }
+
+            @Override
+            public void close() {
+            }
+        };
+    };
+
+    private static <T> Deserializer<T> unusableDeserializer() {
+        return new Deserializer<T>() {
+            @Override
+            public void configure(Map<String, ?> configs, boolean isKey) {
+            }
+
+            @Override
+            public T deserialize(String topic, byte[] data) {
+                throw new UnsupportedOperationException("This deserializer should not be used");
+            }
+
+            @Override
+            public void close() {
+            }
+        };
+    };
+
+    /**
+     * Create a driver object that will have a {@link #context()} that records messages
+     * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides <em>unusable</em> default key and
+     * value serializers and deserializers. This can be used when the actual serializers and deserializers are supplied to the
+     * store during creation, which should eliminate the need for a store to depend on the ProcessorContext's default key and
+     * value serializers and deserializers.
+     * 
+     * @return the test driver; never null
+     */
+    public static <K, V> KeyValueStoreTestDriver<K, V> create() {
+        Serializer<K> keySerializer = unusableSerializer();
+        Deserializer<K> keyDeserializer = unusableDeserializer();
+        Serializer<V> valueSerializer = unusableSerializer();
+        Deserializer<V> valueDeserializer = unusableDeserializer();
+        Serdes<K, V> serdes = new Serdes<K, V>("unexpected", keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
+        return new KeyValueStoreTestDriver<K, V>(serdes);
+    }
+
+    /**
+     * Create a driver object that will have a {@link #context()} that records messages
+     * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides default serializers and
+     * deserializers for the given built-in key and value types (e.g., {@code String.class}, {@code Integer.class},
+     * {@code Long.class}, and {@code byte[].class}). This can be used when store is created to rely upon the
+     * ProcessorContext's default key and value serializers and deserializers.
+     * 
+     * @param keyClass the class for the keys; must be one of {@code String.class}, {@code Integer.class},
+     *            {@code Long.class}, or {@code byte[].class}
+     * @param valueClass the class for the values; must be one of {@code String.class}, {@code Integer.class},
+     *            {@code Long.class}, or {@code byte[].class}
+     * @return the test driver; never null
+     */
+    public static <K, V> KeyValueStoreTestDriver<K, V> create(Class<K> keyClass, Class<V> valueClass) {
+        Serdes<K, V> serdes = Serdes.withBuiltinTypes("unexpected", keyClass, valueClass);
+        return new KeyValueStoreTestDriver<K, V>(serdes);
+    }
+
+    /**
+     * Create a driver object that will have a {@link #context()} that records messages
+     * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides the specified serializers and
+     * deserializers. This can be used when store is created to rely upon the ProcessorContext's default key and value serializers
+     * and deserializers.
+     * 
+     * @param keySerializer the key serializer for the {@link ProcessorContext}; may not be null
+     * @param keyDeserializer the key deserializer for the {@link ProcessorContext}; may not be null
+     * @param valueSerializer the value serializer for the {@link ProcessorContext}; may not be null
+     * @param valueDeserializer the value deserializer for the {@link ProcessorContext}; may not be null
+     * @return the test driver; never null
+     */
+    public static <K, V> KeyValueStoreTestDriver<K, V> create(Serializer<K> keySerializer,
+                                                              Deserializer<K> keyDeserializer,
+                                                              Serializer<V> valueSerializer,
+                                                              Deserializer<V> valueDeserializer) {
+        Serdes<K, V> serdes = new Serdes<K, V>("unexpected", keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
+        return new KeyValueStoreTestDriver<K, V>(serdes);
+    }
+
+    private final Serdes<K, V> serdes;
+    private final Map<K, V> flushedEntries = new HashMap<>();
+    private final Set<K> flushedRemovals = new HashSet<>();
+    private final List<Entry<K, V>> restorableEntries = new LinkedList<>();
+    private final MockProcessorContext context;
+    private final Map<String, StateStore> storeMap = new HashMap<>();
+    private final StreamingMetrics metrics = new StreamingMetrics() {
+        @Override
+        public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
+            return null;
+        }
+
+        @Override
+        public void recordLatency(Sensor sensor, long startNs, long endNs) {
+        }
+    };
+    private final RecordCollector recordCollector;
+    private File stateDir = new File("build/data").getAbsoluteFile();
+
+    protected KeyValueStoreTestDriver(Serdes<K, V> serdes) {
+        this.serdes = serdes;
+        ByteArraySerializer rawSerializer = new ByteArraySerializer();
+        Producer<byte[], byte[]> producer = new MockProducer<byte[], byte[]>(true, rawSerializer, rawSerializer);
+        this.recordCollector = new RecordCollector(producer) {
+            @Override
+            public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                recordFlushed(record.key(), record.value());
+            }
+        };
+        this.context = new MockProcessorContext(null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
+                serdes.valueDeserializer(), recordCollector) {
+            @Override
+            public int id() {
+                return 1;
+            }
+
+            @Override
+            public <K1, V1> void forward(K1 key, V1 value, int childIndex) {
+                forward(key, value);
+            }
+
+            @Override
+            public void register(StateStore store, StateRestoreCallback func) {
+                storeMap.put(store.name(), store);
+                restoreEntries(func);
+            }
+
+            @Override
+            public StateStore getStateStore(String name) {
+                return storeMap.get(name);
+            }
+
+            @Override
+            public StreamingMetrics metrics() {
+                return metrics;
+            }
+
+            @Override
+            public File stateDir() {
+                if (stateDir == null) {
+                    stateDir = StateUtils.tempDir();
+                }
+                stateDir.mkdirs();
+                return stateDir;
+            }
+        };
+    }
+
+    /**
+     * Set the directory that should be used by the store for local disk storage.
+     * 
+     * @param dir the directory; may be null if no local storage is allowed
+     */
+    public void useStateDir(File dir) {
+        this.stateDir = dir;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <K1, V1> void recordFlushed(K1 key, V1 value) {
+        K k = (K) key;
+        if (value == null) {
+            // This is a removal ...
+            flushedRemovals.add(k);
+            flushedEntries.remove(k);
+        } else {
+            // This is a normal add
+            flushedEntries.put(k, (V) value);
+            flushedRemovals.remove(k);
+        }
+    }
+
+    private void restoreEntries(StateRestoreCallback func) {
+        for (Entry<K, V> entry : restorableEntries) {
+            if (entry != null) {
+                byte[] rawKey = serdes.rawKey(entry.key());
+                byte[] rawValue = serdes.rawValue(entry.value());
+                func.restore(rawKey, rawValue);
+            }
+        }
+    }
+
+    /**
+     * This method adds an entry to the "restore log" for the {@link KeyValueStore}, and is used <em>only</em> when testing the
+     * restore functionality of a {@link KeyValueStore} implementation.
+     * <p>
+     * To create such a test, create the test driver, call this method one or more times, and then create the
+     * {@link KeyValueStore}. Your tests can then check whether the store contains the entries from the log.
+     * 
+     * <pre>
+     * // Set up the driver and pre-populate the log ...
+     * KeyValueStoreTestDriver&lt;Integer, String> driver = KeyValueStoreTestDriver.create();
+     * driver.addRestoreEntry(1,"value1");
+     * driver.addRestoreEntry(2,"value2");
+     * driver.addRestoreEntry(3,"value3");
+     * 
+     * // Create the store using the driver's context ...
+     * ProcessorContext context = driver.context();
+     * KeyValueStore&lt;Integer, String> store = ...
+     * 
+     * // Verify that the store's contents were properly restored from the log ...
+     * assertEquals(0, driver.checkForRestoredEntries(store));
+     * 
+     * // and there are no other entries ...
+     * assertEquals(3, driver.sizeOf(store));
+     * </pre>
+     * 
+     * @param key the key for the entry
+     * @param value the value for the entry
+     * @see #checkForRestoredEntries(KeyValueStore)
+     */
+    public void addEntryToRestoreLog(K key, V value) {
+        restorableEntries.add(new Entry<K, V>(key, value));
+    }
+
+    /**
+     * Get the context that should be supplied to a {@link KeyValueStore}'s constructor. This context records any messages
+     * written by the store to the Kafka topic, making them available via the {@link #flushedEntryStored(Object)} and
+     * {@link #flushedEntryRemoved(Object)} methods.
+     * <p>
+     * If the {@link KeyValueStore}'s are to be restored upon its startup, be sure to {@link #addEntryToRestoreLog(Object, Object)
+     * add the restore entries} before creating the store with the {@link ProcessorContext} returned by this method.
+     * 
+     * @return the processing context; never null
+     * @see #addEntryToRestoreLog(Object, Object)
+     */
+    public ProcessorContext context() {
+        return context;
+    }
+
+    /**
+     * Get the entries that are restored to a KeyValueStore when it is constructed with this driver's {@link #context()
+     * ProcessorContext}.
+     * 
+     * @return the restore entries; never null but possibly a null iterator
+     */
+    public Iterable<Entry<K, V>> restoredEntries() {
+        return restorableEntries;
+    }
+
+    /**
+     * Utility method that will count the number of {@link #addEntryToRestoreLog(Object, Object) restore entries} missing from the
+     * supplied store.
+     * 
+     * @param store the store that is to have all of the {@link #restoredEntries() restore entries}
+     * @return the number of restore entries missing from the store, or 0 if all restore entries were found
+     * @see #addEntryToRestoreLog(Object, Object)
+     */
+    public int checkForRestoredEntries(KeyValueStore<K, V> store) {
+        int missing = 0;
+        for (Entry<K, V> entry : restorableEntries) {
+            if (entry != null) {
+                V value = store.get(entry.key());
+                if (!Objects.equals(value, entry.value())) {
+                    ++missing;
+                }
+            }
+        }
+        return missing;
+    }
+
+    /**
+     * Utility method to compute the number of entries within the store.
+     * 
+     * @param store the key value store using this {@link #context()}.
+     * @return the number of entries
+     */
+    public int sizeOf(KeyValueStore<K, V> store) {
+        int size = 0;
+        for (KeyValueIterator<K, V> iterator = store.all(); iterator.hasNext();) {
+            iterator.next();
+            ++size;
+        }
+        return size;
+    }
+
+    /**
+     * Retrieve the value that the store {@link KeyValueStore#flush() flushed} with the given key.
+     * 
+     * @param key the key
+     * @return the value that was flushed with the key, or {@code null} if no such key was flushed or if the entry with this
+     *         key was {@link #flushedEntryStored(Object) removed} upon flush
+     */
+    public V flushedEntryStored(K key) {
+        return flushedEntries.get(key);
+    }
+
+    /**
+     * Determine whether the store {@link KeyValueStore#flush() flushed} the removal of the given key.
+     * 
+     * @param key the key
+     * @return {@code true} if the entry with the given key was removed when flushed, or {@code false} if the entry was not
+     *         removed when last flushed
+     */
+    public boolean flushedEntryRemoved(K key) {
+        return flushedRemovals.contains(key);
+    }
+
+    /**
+     * Remove all {@link #flushedEntryStored(Object) flushed entries}, {@link #flushedEntryRemoved(Object) flushed removals},
+     */
+    public void clear() {
+        restorableEntries.clear();
+        flushedEntries.clear();
+        flushedRemovals.clear();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
new file mode 100644
index 0000000..9ac1740
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context, Class<K> keyClass, Class<V> valueClass,
+                                                             boolean useContextSerdes) {
+        if (useContextSerdes) {
+            Serializer<K> keySer = (Serializer<K>) context.keySerializer();
+            Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
+            Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
+            Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
+            return Stores.create("my-store", context).withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
+        }
+        return Stores.create("my-store", context).withKeys(keyClass).withValues(valueClass).localDatabase().build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
new file mode 100644
index 0000000..c7ea748
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.test.TestUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A utility for tests to create and manage unique and isolated directories on the file system for local state.
+ */
+public class StateUtils {
+
+    private static final AtomicLong INSTANCE_COUNTER = new AtomicLong();
+
+    /**
+     * Create a new temporary directory that will be cleaned up automatically upon shutdown.
+     * @return the new directory that will exist; never null
+     */
+    public static File tempDir() {
+        final File dir = new File(TestUtils.IO_TMP_DIR, "kafka-" + INSTANCE_COUNTER.incrementAndGet());
+        dir.mkdirs();
+        dir.deleteOnExit();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                deleteDirectory(dir);
+            }
+        });
+        return dir;
+    }
+
+    private static void deleteDirectory(File dir) {
+        if (dir != null && dir.exists()) {
+            try {
+                Files.walkFileTree(dir.toPath(), new SimpleFileVisitor<Path>() {
+                    @Override
+                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+                        Files.delete(file);
+                        return FileVisitResult.CONTINUE;
+                    }
+
+                    @Override
+                    public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+                        Files.delete(dir);
+                        return FileVisitResult.CONTINUE;
+                    }
+
+                });
+            } catch (IOException e) {
+                // do nothing
+            }
+        }
+    }
+}
\ No newline at end of file