You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/02 22:19:15 UTC

[2/2] kafka git commit: KAFKA-2706: make state stores first class citizens in the processor topology

KAFKA-2706: make state stores first class citizens in the processor topology

* Added StateStoreSupplier
* StateStore
  * Added init(ProcessorContext context) method
* TopologyBuilder
  * Added addStateStore(StateStoreSupplier supplier, String... processNames)
  * Added connectProessorAndStateStores(String processorName, String... stateStoreNames)
    * This is for the case processors are not created when a store is added to the topology. (used by KStream)
* KStream
  * add stateStoreNames to process(), transform(), transformValues().
* Refactored existing state stores to implement StateStoreSupplier

guozhangwang

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #387 from ymatsuda/state_store_supplier


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75827226
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75827226
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75827226

Branch: refs/heads/trunk
Commit: 758272267c811bf559336ea45571bc420a62a478
Parents: 6383593
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Nov 2 13:24:48 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 2 13:24:48 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/StreamingConfig.java   |  18 ++
 .../kafka/streams/examples/ProcessorJob.java    |   5 +-
 .../apache/kafka/streams/kstream/KStream.java   |   9 +-
 .../streams/kstream/internals/KStreamImpl.java  |   9 +-
 .../kafka/streams/processor/StateStore.java     |   5 +
 .../streams/processor/StateStoreSupplier.java   |  25 ++
 .../streams/processor/TopologyBuilder.java      | 109 +++++--
 .../internals/ProcessorContextImpl.java         |  30 +-
 .../processor/internals/ProcessorNode.java      |   8 +-
 .../processor/internals/ProcessorTopology.java  |  11 +-
 .../streams/processor/internals/StreamTask.java |   8 +
 .../streams/state/InMemoryKeyValueStore.java    | 135 ---------
 .../state/InMemoryKeyValueStoreSupplier.java    | 155 ++++++++++
 .../streams/state/InMemoryLRUCacheStore.java    | 180 -----------
 .../state/InMemoryLRUCacheStoreSupplier.java    | 195 ++++++++++++
 .../streams/state/MeteredKeyValueStore.java     |  68 +++--
 .../streams/state/RocksDBKeyValueStore.java     | 284 ------------------
 .../state/RocksDBKeyValueStoreSupplier.java     | 298 +++++++++++++++++++
 .../org/apache/kafka/streams/state/Serdes.java  |  45 ++-
 .../org/apache/kafka/streams/state/Stores.java  |  66 ++--
 .../streams/processor/TopologyBuilderTest.java  |  79 +++++
 .../internals/ProcessorStateManagerTest.java    |  56 +---
 .../internals/ProcessorTopologyTest.java        |  13 +-
 .../internals/PunctuationQueueTest.java         |   2 +-
 .../processor/internals/StreamTaskTest.java     |  18 +-
 .../state/AbstractKeyValueStoreTest.java        |  16 +-
 .../state/InMemoryKeyValueStoreTest.java        |  22 +-
 .../state/InMemoryLRUCacheStoreTest.java        |  15 +-
 .../streams/state/KeyValueStoreTestDriver.java  |  22 ++
 .../streams/state/RocksDBKeyValueStoreTest.java |  24 +-
 .../kafka/test/MockStateStoreSupplier.java      |  97 ++++++
 .../kafka/test/ProcessorTopologyTestDriver.java |   3 +-
 32 files changed, 1224 insertions(+), 806 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
index a0aef48..88bd844 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
@@ -245,6 +247,22 @@ public class StreamingConfig extends AbstractConfig {
         return props;
     }
 
+    public Serializer keySerializer() {
+        return getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+    }
+
+    public Serializer valueSerializer() {
+        return getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+    }
+
+    public Deserializer keyDeserializer() {
+        return getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+    }
+
+    public Deserializer valueDeserializer() {
+        return getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+    }
+
     public static void main(String[] args) {
         System.out.println(CONFIG.toHtmlTable());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 0317b9d..3274aae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -45,10 +45,11 @@ public class ProcessorJob {
                 private KeyValueStore<String, Integer> kvStore;
 
                 @Override
+                @SuppressWarnings("unchecked")
                 public void init(ProcessorContext context) {
                     this.context = context;
                     this.context.schedule(1000);
-                    this.kvStore = Stores.create("local-state", context).withStringKeys().withIntegerValues().inMemory().build();
+                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("local-state");
                 }
 
                 @Override
@@ -103,6 +104,8 @@ public class ProcessorJob {
         builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
 
         builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
+        builder.addStateStore(Stores.create("local-state", config).withStringKeys().withIntegerValues().inMemory().build());
+        builder.connectProcessorAndStateStores("local-state", "PROCESS");
 
         builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 915cf1c..8f0794c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -154,24 +154,27 @@ public interface KStream<K, V> {
      * Applies a stateful transformation to all elements in this stream.
      *
      * @param transformerSupplier the class of TransformerDef
+     * @param stateStoreNames the names of the state store used by the processor
      * @return KStream
      */
-    <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier);
+    <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames);
 
     /**
      * Applies a stateful transformation to all values in this stream.
      *
      * @param valueTransformerSupplier the class of TransformerDef
+     * @param stateStoreNames the names of the state store used by the processor
      * @return KStream
      */
-    <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier);
+    <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
 
     /**
      * Processes all elements in this stream by applying a processor.
      *
      * @param processorSupplier the supplier of the Processor to use
+     * @param stateStoreNames the names of the state store used by the processor
      * @return the new stream containing the processed output
      */
-    void process(ProcessorSupplier<K, V> processorSupplier);
+    void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 1a2297c..1ea9b1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -201,27 +201,30 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier) {
+    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) {
         String name = TRANSFORM_NAME + INDEX.getAndIncrement();
 
         topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
+        topology.connectProcessorAndStateStores(name, stateStoreNames);
 
         return new KStreamImpl<>(topology, name, null);
     }
 
     @Override
-    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier) {
+    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String... stateStoreNames) {
         String name = TRANSFORMVALUES_NAME + INDEX.getAndIncrement();
 
         topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
+        topology.connectProcessorAndStateStores(name, stateStoreNames);
 
         return new KStreamImpl<>(topology, name, sourceNodes);
     }
 
     @Override
-    public void process(final ProcessorSupplier<K, V> processorSupplier) {
+    public void process(final ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames) {
         String name = PROCESSOR_NAME + INDEX.getAndIncrement();
 
         topology.addProcessor(name, processorSupplier, this.name);
+        topology.connectProcessorAndStateStores(name, stateStoreNames);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 38afe9b..9c085a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -36,6 +36,11 @@ public interface StateStore {
     String name();
 
     /**
+     * Initializes this state store
+     */
+    void init(ProcessorContext context);
+
+    /**
      * Flush any cached data
      */
     void flush();

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
new file mode 100644
index 0000000..11545c5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+public interface StateStoreSupplier {
+
+    String name();
+
+    StateStore get();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 077489c..5b6d4ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -33,6 +33,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -48,10 +50,9 @@ import java.util.Set;
  */
 public class TopologyBuilder {
 
-    // list of node factories in a topological order
-    private final ArrayList<NodeFactory> nodeFactories = new ArrayList<>();
+    // node factories in a topological order
+    private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
 
-    private final Set<String> nodeNames = new HashSet<>();
     private final Set<String> sourceTopicNames = new HashSet<>();
 
     private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
@@ -59,6 +60,9 @@ public class TopologyBuilder {
     private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
     private Map<Integer, Set<String>> nodeGroups = null;
 
+    private Map<String, StateStoreSupplier> stateStores = new HashMap<>();
+    private Map<String, Set<String>> stateStoreUsers = new HashMap();
+
     private interface NodeFactory {
         ProcessorNode build();
     }
@@ -67,6 +71,7 @@ public class TopologyBuilder {
         public final String[] parents;
         private final String name;
         private final ProcessorSupplier supplier;
+        private final Set<String> stateStoreNames = new HashSet<>();
 
         public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) {
             this.name = name;
@@ -74,9 +79,13 @@ public class TopologyBuilder {
             this.supplier = supplier;
         }
 
+        public void addStateStore(String stateStoreName) {
+            stateStoreNames.add(stateStoreName);
+        }
+
         @Override
         public ProcessorNode build() {
-            return new ProcessorNode(name, supplier.get());
+            return new ProcessorNode(name, supplier.get(), stateStoreNames);
         }
     }
 
@@ -155,7 +164,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      */
     public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
-        if (nodeNames.contains(name))
+        if (nodeFactories.containsKey(name))
             throw new TopologyException("Processor " + name + " is already added.");
 
         for (String topic : topics) {
@@ -165,8 +174,7 @@ public class TopologyBuilder {
             sourceTopicNames.add(topic);
         }
 
-        nodeNames.add(name);
-        nodeFactories.add(new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
+        nodeFactories.put(name, new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
         nodeToTopics.put(name, topics.clone());
         nodeGrouper.add(name);
 
@@ -204,7 +212,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      */
     public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
-        if (nodeNames.contains(name))
+        if (nodeFactories.containsKey(name))
             throw new TopologyException("Processor " + name + " is already added.");
 
         if (parentNames != null) {
@@ -212,14 +220,13 @@ public class TopologyBuilder {
                 if (parent.equals(name)) {
                     throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
                 }
-                if (!nodeNames.contains(parent)) {
+                if (!nodeFactories.containsKey(parent)) {
                     throw new TopologyException("Parent processor " + parent + " is not added yet.");
                 }
             }
         }
 
-        nodeNames.add(name);
-        nodeFactories.add(new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
+        nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
         return this;
     }
 
@@ -233,7 +240,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      */
     public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
-        if (nodeNames.contains(name))
+        if (nodeFactories.containsKey(name))
             throw new TopologyException("Processor " + name + " is already added.");
 
         if (parentNames != null) {
@@ -241,20 +248,80 @@ public class TopologyBuilder {
                 if (parent.equals(name)) {
                     throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
                 }
-                if (!nodeNames.contains(parent)) {
+                if (!nodeFactories.containsKey(parent)) {
                     throw new TopologyException("Parent processor " + parent + " is not added yet.");
                 }
             }
         }
 
-        nodeNames.add(name);
-        nodeFactories.add(new ProcessorNodeFactory(name, parentNames, supplier));
+        nodeFactories.put(name, new ProcessorNodeFactory(name, parentNames, supplier));
         nodeGrouper.add(name);
         nodeGrouper.unite(name, parentNames);
         return this;
     }
 
     /**
+     * Adds a state store
+     *
+     * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) {
+        if (stateStores.containsKey(supplier.name())) {
+            throw new TopologyException("StateStore " + supplier.name() + " is already added.");
+        }
+        stateStores.put(supplier.name(), supplier);
+        stateStoreUsers.put(supplier.name(), new HashSet<String>());
+
+        if (processorNames != null) {
+            for (String processorName : processorNames) {
+                connectProcessorAndStateStore(processorName, supplier.name());
+            }
+        }
+
+        return this;
+    }
+
+    /**
+     * Connects the processor and the state stores
+     *
+     * @param processorName the name of the processor
+     * @param stateStoreNames the names of state stores that the processor uses
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public final TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) {
+        if (stateStoreNames != null) {
+            for (String stateStoreName : stateStoreNames) {
+                connectProcessorAndStateStore(processorName, stateStoreName);
+            }
+        }
+
+        return this;
+    }
+
+    private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
+        if (!stateStores.containsKey(stateStoreName))
+            throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
+        if (!nodeFactories.containsKey(processorName))
+            throw new TopologyException("Processor " + processorName + " is not added yet.");
+
+        Set<String> users = stateStoreUsers.get(stateStoreName);
+        Iterator<String> iter = users.iterator();
+        if (iter.hasNext()) {
+            String user = iter.next();
+            nodeGrouper.unite(user, processorName);
+        }
+        users.add(processorName);
+
+        NodeFactory factory = nodeFactories.get(processorName);
+        if (factory instanceof ProcessorNodeFactory) {
+            ((ProcessorNodeFactory) factory).addStateStore(stateStoreName);
+        } else {
+            throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
+        }
+    }
+
+    /**
      * Returns the map of topic groups keyed by the group id.
      * A topic group is a group of topics in the same task.
      *
@@ -301,7 +368,7 @@ public class TopologyBuilder {
         }
 
         // Go through non-source nodes
-        for (String nodeName : Utils.sorted(nodeNames)) {
+        for (String nodeName : Utils.sorted(nodeFactories.keySet())) {
             if (!nodeToTopics.containsKey(nodeName)) {
                 String root = nodeGrouper.root(nodeName);
                 Set<String> nodeGroup = rootToNodeGroup.get(root);
@@ -357,10 +424,11 @@ public class TopologyBuilder {
         List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
         Map<String, ProcessorNode> processorMap = new HashMap<>();
         Map<String, SourceNode> topicSourceMap = new HashMap<>();
+        Map<String, StateStoreSupplier> stateStoreMap = new HashMap<>();
 
         try {
             // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
-            for (NodeFactory factory : nodeFactories) {
+            for (NodeFactory factory : nodeFactories.values()) {
                 ProcessorNode node = factory.build();
                 processorNodes.add(node);
                 processorMap.put(node.name(), node);
@@ -369,6 +437,11 @@ public class TopologyBuilder {
                     for (String parent : ((ProcessorNodeFactory) factory).parents) {
                         processorMap.get(parent).addChild(node);
                     }
+                    for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
+                        if (!stateStoreMap.containsKey(stateStoreName)) {
+                            stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName));
+                        }
+                    }
                 } else if (factory instanceof SourceNodeFactory) {
                     for (String topic : ((SourceNodeFactory) factory).topics) {
                         topicSourceMap.put(topic, (SourceNode) node);
@@ -385,7 +458,7 @@ public class TopologyBuilder {
             throw new KafkaException("ProcessorNode construction failed: this should not happen.");
         }
 
-        return new ProcessorTopology(processorNodes, topicSourceMap);
+        return new ProcessorTopology(processorNodes, topicSourceMap, new ArrayList<>(stateStoreMap.values()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 3c1e059..1321cc5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -62,19 +62,14 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         this.collector = collector;
         this.stateMgr = stateMgr;
 
-        this.keySerializer = config.getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
-        this.valSerializer = config.getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
-        this.keyDeserializer = config.getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
-        this.valDeserializer = config.getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+        this.keySerializer = config.keySerializer();
+        this.valSerializer = config.valueSerializer();
+        this.keyDeserializer = config.keyDeserializer();
+        this.valDeserializer = config.valueDeserializer();
 
         this.initialized = false;
     }
 
-    @Override
-    public RecordCollector recordCollector() {
-        return this.collector;
-    }
-
     public void initialized() {
         this.initialized = true;
     }
@@ -83,6 +78,15 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         return id;
     }
 
+    public ProcessorStateManager getStateMgr() {
+        return stateMgr;
+    }
+
+    @Override
+    public RecordCollector recordCollector() {
+        return this.collector;
+    }
+
     @Override
     public Serializer<?> keySerializer() {
         return this.keySerializer;
@@ -123,6 +127,14 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
 
     @Override
     public StateStore getStateStore(String name) {
+        ProcessorNode node = task.node();
+
+        if (node == null)
+            throw new KafkaException("accessing from an unknown node");
+
+        if (!node.stateStores.contains(name))
+            throw new KafkaException("Processor " + node.name() + " has no access to StateStore " + name);
+
         return stateMgr.getStore(name);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 9127c3f..6db83a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 public class ProcessorNode<K, V> {
 
@@ -30,14 +31,17 @@ public class ProcessorNode<K, V> {
     private final String name;
     private final Processor<K, V> processor;
 
+    public final Set<String> stateStores;
+
     public ProcessorNode(String name) {
-        this(name, null);
+        this(name, null, null);
     }
 
-    public ProcessorNode(String name, Processor<K, V> processor) {
+    public ProcessorNode(String name, Processor<K, V> processor, Set<String> stateStores) {
         this.name = name;
         this.processor = processor;
         this.children = new ArrayList<>();
+        this.stateStores = stateStores;
     }
 
     public final String name() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 3efae65..a70aa70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -27,11 +29,14 @@ public class ProcessorTopology {
 
     private final List<ProcessorNode> processorNodes;
     private final Map<String, SourceNode> sourceByTopics;
+    private final List<StateStoreSupplier> stateStoreSuppliers;
 
     public ProcessorTopology(List<ProcessorNode> processorNodes,
-                             Map<String, SourceNode> sourceByTopics) {
+                             Map<String, SourceNode> sourceByTopics,
+                             List<StateStoreSupplier> stateStoreSuppliers) {
         this.processorNodes = Collections.unmodifiableList(processorNodes);
         this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics);
+        this.stateStoreSuppliers = Collections.unmodifiableList(stateStoreSuppliers);
     }
 
     public Set<String> sourceTopics() {
@@ -50,4 +55,8 @@ public class ProcessorTopology {
         return processorNodes;
     }
 
+    public List<StateStoreSupplier> stateStoreSuppliers() {
+        return stateStoreSuppliers;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f01e00b..a9c14e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -26,6 +26,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.slf4j.Logger;
@@ -126,6 +128,12 @@ public class StreamTask implements Punctuator {
         // initialize the topology with its own context
         this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics);
 
+        // initialize the state stores
+        for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
+            StateStore store = stateStoreSupplier.get();
+            store.init(this.processorContext);
+        }
+
         // initialize the task by initializing all its processor nodes in the topology
         for (ProcessorNode node : this.topology.processors()) {
             this.currNode = node;

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
deleted file mode 100644
index 1eb526f..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-/**
- * An in-memory key-value store based on a TreeMap.
- *
- * @param <K> The key type
- * @param <V> The value type
- * 
- * @see Stores#create(String, ProcessorContext)
- */
-public class InMemoryKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
-
-    protected InMemoryKeyValueStore(String name, ProcessorContext context, Serdes<K, V> serdes, Time time) {
-        super(name, new MemoryStore<K, V>(name), context, serdes, "in-memory-state", time != null ? time : new SystemTime());
-    }
-
-    private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
-
-        private final String name;
-        private final NavigableMap<K, V> map;
-
-        public MemoryStore(String name) {
-            super();
-            this.name = name;
-            this.map = new TreeMap<>();
-        }
-
-        @Override
-        public String name() {
-            return this.name;
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            return this.map.get(key);
-        }
-
-        @Override
-        public void put(K key, V value) {
-            this.map.put(key, value);
-        }
-
-        @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
-        }
-
-        @Override
-        public V delete(K key) {
-            return this.map.remove(key);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
-        }
-
-        @Override
-        public void flush() {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public void close() {
-            // do-nothing
-        }
-
-        private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
-            private final Iterator<Map.Entry<K, V>> iter;
-
-            public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
-                this.iter = iter;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return iter.hasNext();
-            }
-
-            @Override
-            public Entry<K, V> next() {
-                Map.Entry<K, V> entry = iter.next();
-                return new Entry<>(entry.getKey(), entry.getValue());
-            }
-
-            @Override
-            public void remove() {
-                iter.remove();
-            }
-
-            @Override
-            public void close() {
-            }
-
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
new file mode 100644
index 0000000..d1f845c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * An in-memory key-value store based on a TreeMap.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
+ */
+public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
+
+    private final String name;
+    private final Serdes serdes;
+    private final Time time;
+
+    protected InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+        this.name = name;
+        this.serdes = serdes;
+        this.time = time;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public StateStore get() {
+        return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
+    }
+
+    private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
+
+        private final String name;
+        private final NavigableMap<K, V> map;
+
+        public MemoryStore(String name) {
+            super();
+            this.name = name;
+            this.map = new TreeMap<>();
+        }
+
+        @Override
+        public String name() {
+            return this.name;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            // do-nothing since it is in-memory
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+
+        @Override
+        public V get(K key) {
+            return this.map.get(key);
+        }
+
+        @Override
+        public void put(K key, V value) {
+            this.map.put(key, value);
+        }
+
+        @Override
+        public void putAll(List<Entry<K, V>> entries) {
+            for (Entry<K, V> entry : entries)
+                put(entry.key(), entry.value());
+        }
+
+        @Override
+        public V delete(K key) {
+            return this.map.remove(key);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(K from, K to) {
+            return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
+        }
+
+        @Override
+        public void flush() {
+            // do-nothing since it is in-memory
+        }
+
+        @Override
+        public void close() {
+            // do-nothing
+        }
+
+        private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
+            private final Iterator<Map.Entry<K, V>> iter;
+
+            public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
+                this.iter = iter;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iter.hasNext();
+            }
+
+            @Override
+            public Entry<K, V> next() {
+                Map.Entry<K, V> entry = iter.next();
+                return new Entry<>(entry.getKey(), entry.getValue());
+            }
+
+            @Override
+            public void remove() {
+                iter.remove();
+            }
+
+            @Override
+            public void close() {
+            }
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
deleted file mode 100644
index 1b96c59..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
-/**
- * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- */
-public class InMemoryLRUCacheStore<K, V> extends MeteredKeyValueStore<K, V> {
-
-    protected static <K, V> InMemoryLRUCacheStore<K, V> create(String name, int capacity, ProcessorContext context,
-                                                               Serdes<K, V> serdes, Time time) {
-        if (time == null) time = new SystemTime();
-        MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
-        final InMemoryLRUCacheStore<K, V> store = new InMemoryLRUCacheStore<>(name, context, cache, serdes, time);
-        cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
-            @Override
-            public void apply(K key, V value) {
-                store.removed(key);
-            }
-        });
-        return store;
-
-    }
-
-    private InMemoryLRUCacheStore(String name, ProcessorContext context, MemoryLRUCache<K, V> cache, Serdes<K, V> serdes, Time time) {
-        super(name, cache, context, serdes, "kafka-streams", time);
-    }
-
-    private static interface EldestEntryRemovalListener<K, V> {
-        public void apply(K key, V value);
-    }
-
-    protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
-
-        private final String name;
-        private final Map<K, V> map;
-        private final NavigableSet<K> keys;
-        private EldestEntryRemovalListener<K, V> listener;
-
-        public MemoryLRUCache(String name, final int maxCacheSize) {
-            this.name = name;
-            this.keys = new TreeSet<>();
-            // leave room for one extra entry to handle adding an entry before the oldest can be removed
-            this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
-                private static final long serialVersionUID = 1L;
-
-                @Override
-                protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-                    if (size() > maxCacheSize) {
-                        K key = eldest.getKey();
-                        keys.remove(key);
-                        if (listener != null) listener.apply(key, eldest.getValue());
-                        return true;
-                    }
-                    return false;
-                }
-            };
-        }
-
-        protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
-            this.listener = listener;
-        }
-
-        @Override
-        public String name() {
-            return this.name;
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            return this.map.get(key);
-        }
-
-        @Override
-        public void put(K key, V value) {
-            this.map.put(key, value);
-            this.keys.add(key);
-        }
-
-        @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
-        }
-
-        @Override
-        public V delete(K key) {
-            V value = this.map.remove(key);
-            this.keys.remove(key);
-            return value;
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
-        }
-
-        @Override
-        public void flush() {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public void close() {
-            // do-nothing
-        }
-
-        private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
-            private final Iterator<K> keys;
-            private final Map<K, V> entries;
-            private K lastKey;
-
-            public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
-                this.keys = keys;
-                this.entries = entries;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return keys.hasNext();
-            }
-
-            @Override
-            public Entry<K, V> next() {
-                lastKey = keys.next();
-                return new Entry<>(lastKey, entries.get(lastKey));
-            }
-
-            @Override
-            public void remove() {
-                keys.remove();
-                entries.remove(lastKey);
-            }
-
-            @Override
-            public void close() {
-                // do nothing
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
new file mode 100644
index 0000000..a346534
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ */
+public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
+
+    private final String name;
+    private final int capacity;
+    private final Serdes serdes;
+    private final Time time;
+
+    protected InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
+        this.name = name;
+        this.capacity = capacity;
+        this.serdes = serdes;
+        this.time = time;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public StateStore get() {
+        MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
+        final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
+        cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
+            @Override
+            public void apply(K key, V value) {
+                store.removed(key);
+            }
+        });
+        return store;
+    }
+
+    private static interface EldestEntryRemovalListener<K, V> {
+        public void apply(K key, V value);
+    }
+
+    protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
+
+        private final String name;
+        private final Map<K, V> map;
+        private final NavigableSet<K> keys;
+        private EldestEntryRemovalListener<K, V> listener;
+
+        public MemoryLRUCache(String name, final int maxCacheSize) {
+            this.name = name;
+            this.keys = new TreeSet<>();
+            // leave room for one extra entry to handle adding an entry before the oldest can be removed
+            this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+                    if (size() > maxCacheSize) {
+                        K key = eldest.getKey();
+                        keys.remove(key);
+                        if (listener != null) listener.apply(key, eldest.getValue());
+                        return true;
+                    }
+                    return false;
+                }
+            };
+        }
+
+        protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public String name() {
+            return this.name;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            // do-nothing since it is in-memory
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+
+        @Override
+        public V get(K key) {
+            return this.map.get(key);
+        }
+
+        @Override
+        public void put(K key, V value) {
+            this.map.put(key, value);
+            this.keys.add(key);
+        }
+
+        @Override
+        public void putAll(List<Entry<K, V>> entries) {
+            for (Entry<K, V> entry : entries)
+                put(entry.key(), entry.value());
+        }
+
+        @Override
+        public V delete(K key) {
+            V value = this.map.remove(key);
+            this.keys.remove(key);
+            return value;
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(K from, K to) {
+            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
+        }
+
+        @Override
+        public void flush() {
+            // do-nothing since it is in-memory
+        }
+
+        @Override
+        public void close() {
+            // do-nothing
+        }
+
+        private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
+            private final Iterator<K> keys;
+            private final Map<K, V> entries;
+            private K lastKey;
+
+            public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
+                this.keys = keys;
+                this.entries = entries;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return keys.hasNext();
+            }
+
+            @Override
+            public Entry<K, V> next() {
+                lastKey = keys.next();
+                return new Entry<>(lastKey, entries.get(lastKey));
+            }
+
+            @Override
+            public void remove() {
+                keys.remove();
+                entries.remove(lastKey);
+            }
+
+            @Override
+            public void close() {
+                // do nothing
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index a7f4c12..c1ccbd4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -35,33 +36,51 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     protected final KeyValueStore<K, V> inner;
     protected final Serdes<K, V> serialization;
-
-    private final Time time;
-    private final Sensor putTime;
-    private final Sensor getTime;
-    private final Sensor deleteTime;
-    private final Sensor putAllTime;
-    private final Sensor allTime;
-    private final Sensor rangeTime;
-    private final Sensor flushTime;
-    private final Sensor restoreTime;
-    private final StreamingMetrics metrics;
+    protected final String metricGrp;
+    protected final Time time;
 
     private final String topic;
-    private final int partition;
+
+    private Sensor putTime;
+    private Sensor getTime;
+    private Sensor deleteTime;
+    private Sensor putAllTime;
+    private Sensor allTime;
+    private Sensor rangeTime;
+    private Sensor flushTime;
+    private Sensor restoreTime;
+    private StreamingMetrics metrics;
+
     private final Set<K> dirty;
     private final Set<K> removed;
     private final int maxDirty;
     private final int maxRemoved;
-    private final ProcessorContext context;
+
+    private int partition;
+    private ProcessorContext context;
 
     // always wrap the logged store with the metered store
-    public MeteredKeyValueStore(final String name, final KeyValueStore<K, V> inner, ProcessorContext context,
-                                Serdes<K, V> serialization, String metricGrp, Time time) {
+    public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricGrp, Time time) {
         this.inner = inner;
         this.serialization = serialization;
+        this.metricGrp = metricGrp;
+        this.time = time != null ? time : new SystemTime();
+        this.topic = inner.name();
+
+        this.dirty = new HashSet<K>();
+        this.removed = new HashSet<K>();
+        this.maxDirty = 100; // TODO: this needs to be configurable
+        this.maxRemoved = 100; // TODO: this needs to be configurable
+    }
+
+    @Override
+    public String name() {
+        return inner.name();
+    }
 
-        this.time = time;
+    @Override
+    public void init(ProcessorContext context) {
+        String name = name();
         this.metrics = context.metrics();
         this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
         this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
@@ -72,18 +91,12 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name", name);
         this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
 
-        this.topic = name;
-        this.partition = context.id().partition;
-
         this.context = context;
-
-        this.dirty = new HashSet<K>();
-        this.removed = new HashSet<K>();
-        this.maxDirty = 100; // TODO: this needs to be configurable
-        this.maxRemoved = 100; // TODO: this needs to be configurable
+        this.partition = context.id().partition;
 
         // register and possibly restore the state from the logs
         long startNs = time.nanoseconds();
+        inner.init(context);
         try {
             final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
             final Deserializer<V> valDeserializer = serialization.valueDeserializer();
@@ -92,7 +105,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
                 @Override
                 public void restore(byte[] key, byte[] value) {
                     inner.put(keyDeserializer.deserialize(topic, key),
-                        valDeserializer.deserialize(topic, value));
+                            valDeserializer.deserialize(topic, value));
                 }
             });
         } finally {
@@ -101,11 +114,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public String name() {
-        return inner.name();
-    }
-
-    @Override
     public boolean persistent() {
         return inner.persistent();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
deleted file mode 100644
index 1de345e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.util.Comparator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see Stores#create(String, ProcessorContext)
- */
-public class RocksDBKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
-
-    protected RocksDBKeyValueStore(String name, ProcessorContext context, Serdes<K, V> serdes, Time time) {
-        super(name, new RocksDBStore<K, V>(name, context, serdes), context, serdes, "rocksdb-state", time != null ? time : new SystemTime());
-    }
-
-    private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
-
-        private static final int TTL_NOT_USED = -1;
-
-        // TODO: these values should be configurable
-        private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
-        private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
-        private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
-        private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
-        private static final long BLOCK_SIZE = 4096L;
-        private static final int TTL_SECONDS = TTL_NOT_USED;
-        private static final int MAX_WRITE_BUFFERS = 3;
-        private static final String DB_FILE_DIR = "rocksdb";
-
-        private final Serdes<K, V> serdes;
-
-        private final String topic;
-        private final int partition;
-        private final ProcessorContext context;
-
-        private final Options options;
-        private final WriteOptions wOptions;
-        private final FlushOptions fOptions;
-
-        private final String dbName;
-        private final String dirName;
-
-        private RocksDB db;
-
-        public RocksDBStore(String name, ProcessorContext context, Serdes<K, V> serdes) {
-            this.topic = name;
-            this.partition = context.id().partition;
-            this.context = context;
-            this.serdes = serdes;
-
-            // initialize the rocksdb options
-            BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
-            tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
-            tableConfig.setBlockSize(BLOCK_SIZE);
-
-            options = new Options();
-            options.setTableFormatConfig(tableConfig);
-            options.setWriteBufferSize(WRITE_BUFFER_SIZE);
-            options.setCompressionType(COMPRESSION_TYPE);
-            options.setCompactionStyle(COMPACTION_STYLE);
-            options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
-            options.setCreateIfMissing(true);
-            options.setErrorIfExists(false);
-
-            wOptions = new WriteOptions();
-            wOptions.setDisableWAL(true);
-
-            fOptions = new FlushOptions();
-            fOptions.setWaitForFlush(true);
-
-            dbName = this.topic + "." + this.partition;
-            dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
-
-            db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
-        }
-
-        private RocksDB openDB(File dir, Options options, int ttl) {
-            try {
-                if (ttl == TTL_NOT_USED) {
-                    dir.getParentFile().mkdirs();
-                    return RocksDB.open(options, dir.toString());
-                } else {
-                    throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
-                    // TODO: support TTL with change log?
-                    // return TtlDB.open(options, dir.toString(), ttl, false);
-                }
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
-            }
-        }
-
-        @Override
-        public String name() {
-            return this.topic;
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            try {
-                return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void put(K key, V value) {
-            try {
-                if (value == null) {
-                    db.remove(wOptions, serdes.rawKey(key));
-                } else {
-                    db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
-                }
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
-        }
-
-        @Override
-        public V delete(K key) {
-            V value = get(key);
-            put(key, null);
-            return value;
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            RocksIterator innerIter = db.newIterator();
-            innerIter.seekToFirst();
-            return new RocksDbIterator<K, V>(innerIter, serdes);
-        }
-
-        @Override
-        public void flush() {
-            try {
-                db.flush(fOptions);
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing flush from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void close() {
-            flush();
-            db.close();
-        }
-
-        private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
-            private final RocksIterator iter;
-            private final Serdes<K, V> serdes;
-
-            public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
-                this.iter = iter;
-                this.serdes = serdes;
-            }
-
-            protected byte[] peekRawKey() {
-                return iter.key();
-            }
-
-            protected Entry<K, V> getEntry() {
-                return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
-            }
-
-            @Override
-            public boolean hasNext() {
-                return iter.isValid();
-            }
-
-            @Override
-            public Entry<K, V> next() {
-                if (!hasNext())
-                    throw new NoSuchElementException();
-
-                Entry<K, V> entry = this.getEntry();
-                iter.next();
-                return entry;
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException("RocksDB iterator does not support remove");
-            }
-
-            @Override
-            public void close() {
-            }
-
-        }
-
-        private static class LexicographicComparator implements Comparator<byte[]> {
-
-            @Override
-            public int compare(byte[] left, byte[] right) {
-                for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
-                    int leftByte = left[i] & 0xff;
-                    int rightByte = right[j] & 0xff;
-                    if (leftByte != rightByte) {
-                        return leftByte - rightByte;
-                    }
-                }
-                return left.length - right.length;
-            }
-        }
-
-        private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
-            // RocksDB's JNI interface does not expose getters/setters that allow the
-            // comparator to be pluggable, and the default is lexicographic, so it's
-            // safe to just force lexicographic comparator here for now.
-            private final Comparator<byte[]> comparator = new LexicographicComparator();
-            byte[] rawToKey;
-
-            public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
-                    K from, K to) {
-                super(iter, serdes);
-                iter.seek(serdes.rawKey(from));
-                this.rawToKey = serdes.rawKey(to);
-            }
-
-            @Override
-            public boolean hasNext() {
-                return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
new file mode 100644
index 0000000..fe8f00a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ *
+ * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
+ */
+public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
+
+    private final String name;
+    private final Serdes serdes;
+    private final Time time;
+
+    protected RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+        this.name = name;
+        this.serdes = serdes;
+        this.time = time;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public StateStore get() {
+        return new MeteredKeyValueStore<K, V>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
+    }
+
+    private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
+
+        private static final int TTL_NOT_USED = -1;
+
+        // TODO: these values should be configurable
+        private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
+        private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
+        private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
+        private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
+        private static final long BLOCK_SIZE = 4096L;
+        private static final int TTL_SECONDS = TTL_NOT_USED;
+        private static final int MAX_WRITE_BUFFERS = 3;
+        private static final String DB_FILE_DIR = "rocksdb";
+
+        private final Serdes<K, V> serdes;
+        private final String topic;
+
+        private final Options options;
+        private final WriteOptions wOptions;
+        private final FlushOptions fOptions;
+
+        private ProcessorContext context;
+        private int partition;
+        private String dbName;
+        private String dirName;
+        private RocksDB db;
+
+        public RocksDBStore(String name, Serdes<K, V> serdes) {
+            this.topic = name;
+            this.serdes = serdes;
+
+            // initialize the rocksdb options
+            BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+            tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
+            tableConfig.setBlockSize(BLOCK_SIZE);
+
+            options = new Options();
+            options.setTableFormatConfig(tableConfig);
+            options.setWriteBufferSize(WRITE_BUFFER_SIZE);
+            options.setCompressionType(COMPRESSION_TYPE);
+            options.setCompactionStyle(COMPACTION_STYLE);
+            options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
+            options.setCreateIfMissing(true);
+            options.setErrorIfExists(false);
+
+            wOptions = new WriteOptions();
+            wOptions.setDisableWAL(true);
+
+            fOptions = new FlushOptions();
+            fOptions.setWaitForFlush(true);
+        }
+
+        public void init(ProcessorContext context) {
+            this.context = context;
+            this.partition = context.id().partition;
+            this.dbName = this.topic + "." + this.partition;
+            this.dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
+            this.db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
+        }
+
+        private RocksDB openDB(File dir, Options options, int ttl) {
+            try {
+                if (ttl == TTL_NOT_USED) {
+                    dir.getParentFile().mkdirs();
+                    return RocksDB.open(options, dir.toString());
+                } else {
+                    throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
+                    // TODO: support TTL with change log?
+                    // return TtlDB.open(options, dir.toString(), ttl, false);
+                }
+            } catch (RocksDBException e) {
+                // TODO: this needs to be handled more accurately
+                throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
+            }
+        }
+
+        @Override
+        public String name() {
+            return this.topic;
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+
+        @Override
+        public V get(K key) {
+            try {
+                return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
+            } catch (RocksDBException e) {
+                // TODO: this needs to be handled more accurately
+                throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
+            }
+        }
+
+        @Override
+        public void put(K key, V value) {
+            try {
+                if (value == null) {
+                    db.remove(wOptions, serdes.rawKey(key));
+                } else {
+                    db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
+                }
+            } catch (RocksDBException e) {
+                // TODO: this needs to be handled more accurately
+                throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
+            }
+        }
+
+        @Override
+        public void putAll(List<Entry<K, V>> entries) {
+            for (Entry<K, V> entry : entries)
+                put(entry.key(), entry.value());
+        }
+
+        @Override
+        public V delete(K key) {
+            V value = get(key);
+            put(key, null);
+            return value;
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(K from, K to) {
+            return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            RocksIterator innerIter = db.newIterator();
+            innerIter.seekToFirst();
+            return new RocksDbIterator<K, V>(innerIter, serdes);
+        }
+
+        @Override
+        public void flush() {
+            try {
+                db.flush(fOptions);
+            } catch (RocksDBException e) {
+                // TODO: this needs to be handled more accurately
+                throw new KafkaException("Error while executing flush from store " + this.topic, e);
+            }
+        }
+
+        @Override
+        public void close() {
+            flush();
+            db.close();
+        }
+
+        private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
+            private final RocksIterator iter;
+            private final Serdes<K, V> serdes;
+
+            public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
+                this.iter = iter;
+                this.serdes = serdes;
+            }
+
+            protected byte[] peekRawKey() {
+                return iter.key();
+            }
+
+            protected Entry<K, V> getEntry() {
+                return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iter.isValid();
+            }
+
+            @Override
+            public Entry<K, V> next() {
+                if (!hasNext())
+                    throw new NoSuchElementException();
+
+                Entry<K, V> entry = this.getEntry();
+                iter.next();
+                return entry;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("RocksDB iterator does not support remove");
+            }
+
+            @Override
+            public void close() {
+            }
+
+        }
+
+        private static class LexicographicComparator implements Comparator<byte[]> {
+
+            @Override
+            public int compare(byte[] left, byte[] right) {
+                for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
+                    int leftByte = left[i] & 0xff;
+                    int rightByte = right[j] & 0xff;
+                    if (leftByte != rightByte) {
+                        return leftByte - rightByte;
+                    }
+                }
+                return left.length - right.length;
+            }
+        }
+
+        private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
+            // RocksDB's JNI interface does not expose getters/setters that allow the
+            // comparator to be pluggable, and the default is lexicographic, so it's
+            // safe to just force lexicographic comparator here for now.
+            private final Comparator<byte[]> comparator = new LexicographicComparator();
+            byte[] rawToKey;
+
+            public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
+                    K from, K to) {
+                super(iter, serdes);
+                iter.seek(serdes.rawKey(from));
+                this.rawToKey = serdes.rawKey(to);
+            }
+
+            @Override
+            public boolean hasNext() {
+                return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
index 540d763..31bd439 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamingConfig;
 
 final class Serdes<K, V> {
 
@@ -64,7 +64,7 @@ final class Serdes<K, V> {
 
     /**
      * Create a context for serialization using the specified serializers and deserializers.
-     * 
+     *
      * @param topic the name of the topic
      * @param keySerializer the serializer for keys; may not be null
      * @param keyDeserializer the deserializer for keys; may not be null
@@ -83,47 +83,44 @@ final class Serdes<K, V> {
 
     /**
      * Create a context for serialization using the specified serializers and deserializers, or if any of them are null the
-     * corresponding {@link ProcessorContext}'s default serializer or deserializer, which
+     * corresponding {@link StreamingConfig}'s serializer or deserializer, which
      * <em>must</em> match the key and value types used as parameters for this object.
-     * 
+     *
      * @param topic the name of the topic
-     * @param keySerializer the serializer for keys; may be null if the {@link ProcessorContext#keySerializer() default
+     * @param keySerializer the serializer for keys; may be null if the {@link StreamingConfig#keySerializer() default
      *            key serializer} should be used
-     * @param keyDeserializer the deserializer for keys; may be null if the {@link ProcessorContext#keyDeserializer() default
+     * @param keyDeserializer the deserializer for keys; may be null if the {@link StreamingConfig#keyDeserializer() default
      *            key deserializer} should be used
-     * @param valueSerializer the serializer for values; may be null if the {@link ProcessorContext#valueSerializer() default
+     * @param valueSerializer the serializer for values; may be null if the {@link StreamingConfig#valueSerializer() default
      *            value serializer} should be used
-     * @param valueDeserializer the deserializer for values; may be null if the {@link ProcessorContext#valueDeserializer()
+     * @param valueDeserializer the deserializer for values; may be null if the {@link StreamingConfig#valueDeserializer()
      *            default value deserializer} should be used
-     * @param context the processing context
+     * @param config the streaming config
      */
     @SuppressWarnings("unchecked")
     public Serdes(String topic,
             Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
             Serializer<V> valueSerializer, Deserializer<V> valueDeserializer,
-            ProcessorContext context) {
+            StreamingConfig config) {
         this.topic = topic;
-        this.keySerializer = keySerializer != null ? keySerializer : (Serializer<K>) context.keySerializer();
-        this.keyDeserializer = keyDeserializer != null ? keyDeserializer : (Deserializer<K>) context.keyDeserializer();
-        this.valueSerializer = valueSerializer != null ? valueSerializer : (Serializer<V>) context.valueSerializer();
-        this.valueDeserializer = valueDeserializer != null ? valueDeserializer : (Deserializer<V>) context.valueDeserializer();
+
+        this.keySerializer = keySerializer != null ? keySerializer : config.keySerializer();
+        this.keyDeserializer = keyDeserializer != null ? keyDeserializer : config.keyDeserializer();
+        this.valueSerializer = valueSerializer != null ? valueSerializer : config.valueSerializer();
+        this.valueDeserializer = valueDeserializer != null ? valueDeserializer : config.valueDeserializer();
     }
 
     /**
-     * Create a context for serialization using the {@link ProcessorContext}'s default serializers and deserializers, which
+     * Create a context for serialization using the {@link StreamingConfig}'s serializers and deserializers, which
      * <em>must</em> match the key and value types used as parameters for this object.
-     * 
+     *
      * @param topic the name of the topic
-     * @param context the processing context
+     * @param config the streaming config
      */
     @SuppressWarnings("unchecked")
     public Serdes(String topic,
-            ProcessorContext context) {
-        this.topic = topic;
-        this.keySerializer = (Serializer<K>) context.keySerializer();
-        this.keyDeserializer = (Deserializer<K>) context.keyDeserializer();
-        this.valueSerializer = (Serializer<V>) context.valueSerializer();
-        this.valueDeserializer = (Deserializer<V>) context.valueDeserializer();
+                  StreamingConfig config) {
+        this(topic, null, null, null, null, config);
     }
 
     public Deserializer<K> keyDeserializer() {
@@ -161,4 +158,4 @@ final class Serdes<K, V> {
     public byte[] rawValue(V value) {
         return valueSerializer.serialize(topic, value);
     }
-}
\ No newline at end of file
+}