You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/02 22:19:14 UTC

[1/2] kafka git commit: KAFKA-2706: make state stores first class citizens in the processor topology

Repository: kafka
Updated Branches:
  refs/heads/trunk 6383593fe -> 758272267


http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 8ec73fd..c5f040f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -26,7 +26,8 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 /**
  * Factory for creating key-value stores.
@@ -34,13 +35,12 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 public class Stores {
 
     /**
-     * Begin to create a new {@link org.apache.kafka.streams.processor.StateStore} instance.
-     * 
+     * Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance.
+     *
      * @param name the name of the store
-     * @param context the processor context
      * @return the factory that can be used to specify other options or configurations for the store; never null
      */
-    public static StoreFactory create(final String name, final ProcessorContext context) {
+    public static StoreFactory create(final String name, final StreamingConfig config) {
         return new StoreFactory() {
             @Override
             public <K> ValueFactory<K> withKeys(final Serializer<K> keySerializer, final Deserializer<K> keyDeserializer) {
@@ -48,8 +48,8 @@ public class Stores {
                     @Override
                     public <V> KeyValueFactory<K, V> withValues(final Serializer<V> valueSerializer,
                                                                 final Deserializer<V> valueDeserializer) {
-                        final Serdes<K, V> serdes = new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer,
-                                context);
+                        final Serdes<K, V> serdes =
+                                new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer, config);
                         return new KeyValueFactory<K, V>() {
                             @Override
                             public InMemoryKeyValueFactory<K, V> inMemory() {
@@ -64,11 +64,11 @@ public class Stores {
                                     }
 
                                     @Override
-                                    public KeyValueStore<K, V> build() {
+                                    public StateStoreSupplier build() {
                                         if (capacity < Integer.MAX_VALUE) {
-                                            return InMemoryLRUCacheStore.create(name, capacity, context, serdes, null);
+                                            return new InMemoryLRUCacheStoreSupplier<>(name, capacity, serdes, null);
                                         }
-                                        return new InMemoryKeyValueStore<>(name, context, serdes, null);
+                                        return new InMemoryKeyValueStoreSupplier<>(name, serdes, null);
                                     }
                                 };
                             }
@@ -77,8 +77,8 @@ public class Stores {
                             public LocalDatabaseKeyValueFactory<K, V> localDatabase() {
                                 return new LocalDatabaseKeyValueFactory<K, V>() {
                                     @Override
-                                    public KeyValueStore<K, V> build() {
-                                        return new RocksDBKeyValueStore<>(name, context, serdes, null);
+                                    public StateStoreSupplier build() {
+                                        return new RocksDBKeyValueStoreSupplier<>(name, serdes, null);
                                     }
                                 };
                             }
@@ -92,7 +92,7 @@ public class Stores {
     public static abstract class StoreFactory {
         /**
          * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link String}s.
-         * 
+         *
          * @return the interface used to specify the type of values; never null
          */
         public ValueFactory<String> withStringKeys() {
@@ -101,7 +101,7 @@ public class Stores {
 
         /**
          * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Integer}s.
-         * 
+         *
          * @return the interface used to specify the type of values; never null
          */
         public ValueFactory<Integer> withIntegerKeys() {
@@ -110,7 +110,7 @@ public class Stores {
 
         /**
          * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Long}s.
-         * 
+         *
          * @return the interface used to specify the type of values; never null
          */
         public ValueFactory<Long> withLongKeys() {
@@ -119,7 +119,7 @@ public class Stores {
 
         /**
          * Begin to create a {@link KeyValueStore} by specifying the keys will be byte arrays.
-         * 
+         *
          * @return the interface used to specify the type of values; never null
          */
         public ValueFactory<byte[]> withByteArrayKeys() {
@@ -129,7 +129,7 @@ public class Stores {
         /**
          * Begin to create a {@link KeyValueStore} by specifying the keys will be either {@link String}, {@link Integer},
          * {@link Long}, or {@code byte[]}.
-         * 
+         *
          * @param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serializers and
          *            deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or
          *            {@code byte[].class})
@@ -141,7 +141,7 @@ public class Stores {
 
         /**
          * Begin to create a {@link KeyValueStore} by specifying the serializer and deserializer for the keys.
-         * 
+         *
          * @param keySerializer the serializer for keys; may not be null
          * @param keyDeserializer the deserializer for keys; may not be null
          * @return the interface used to specify the type of values; never null
@@ -151,13 +151,13 @@ public class Stores {
 
     /**
      * The factory for creating off-heap key-value stores.
-     * 
+     *
      * @param <K> the type of keys
      */
     public static abstract class ValueFactory<K> {
         /**
          * Use {@link String} values.
-         * 
+         *
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public KeyValueFactory<K, String> withStringValues() {
@@ -166,7 +166,7 @@ public class Stores {
 
         /**
          * Use {@link Integer} values.
-         * 
+         *
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public KeyValueFactory<K, Integer> withIntegerValues() {
@@ -175,7 +175,7 @@ public class Stores {
 
         /**
          * Use {@link Long} values.
-         * 
+         *
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public KeyValueFactory<K, Long> withLongValues() {
@@ -184,7 +184,7 @@ public class Stores {
 
         /**
          * Use byte arrays for values.
-         * 
+         *
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public KeyValueFactory<K, byte[]> withByteArrayValues() {
@@ -194,7 +194,7 @@ public class Stores {
         /**
          * Use values of the specified type, which must be either {@link String}, {@link Integer}, {@link Long}, or {@code byte[]}
          * .
-         * 
+         *
          * @param valueClass the class for the values, which must be one of the types for which Kafka has built-in serializers and
          *            deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or
          *            {@code byte[].class})
@@ -206,7 +206,7 @@ public class Stores {
 
         /**
          * Use the specified serializer and deserializer for the values.
-         * 
+         *
          * @param valueSerializer the serializer for value; may not be null
          * @param valueDeserializer the deserializer for values; may not be null
          * @return the interface used to specify the remaining key-value store options; never null
@@ -224,7 +224,7 @@ public class Stores {
         /**
          * Keep all key-value entries in-memory, although for durability all entries are recorded in a Kafka topic that can be
          * read to restore the entries if they are lost.
-         * 
+         *
          * @return the factory to create in-memory key-value stores; never null
          */
         InMemoryKeyValueFactory<K, V> inMemory();
@@ -232,7 +232,7 @@ public class Stores {
         /**
          * Keep all key-value entries off-heap in a local database, although for durability all entries are recorded in a Kafka
          * topic that can be read to restore the entries if they are lost.
-         * 
+         *
          * @return the factory to create in-memory key-value stores; never null
          */
         LocalDatabaseKeyValueFactory<K, V> localDatabase();
@@ -248,7 +248,7 @@ public class Stores {
         /**
          * Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is
          * equivalent to not placing a limit on the number of entries.
-         * 
+         *
          * @param capacity the maximum capacity of the in-memory cache; should be one less than a power of 2
          * @return this factory
          * @throws IllegalArgumentException if the capacity is not positive
@@ -256,10 +256,10 @@ public class Stores {
         InMemoryKeyValueFactory<K, V> maxEntries(int capacity);
 
         /**
-         * Return the new key-value store.
-         * @return the key-value store; never null
+         * Return the instance of StateStoreSupplier of new key-value store.
+         * @return the state store supplier; never null
          */
-        KeyValueStore<K, V> build();
+        StateStoreSupplier build();
     }
 
     /**
@@ -270,9 +270,9 @@ public class Stores {
      */
     public static interface LocalDatabaseKeyValueFactory<K, V> {
         /**
-         * Return the new key-value store.
+         * Return the instance of StateStoreSupplier of new key-value store.
          * @return the key-value store; never null
          */
-        KeyValueStore<K, V> build();
+        StateStoreSupplier build();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index b77c253..de1328e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -106,6 +107,55 @@ public class TopologyBuilderTest {
         assertEquals(3, builder.sourceTopics().size());
     }
 
+    @Test(expected = TopologyException.class)
+    public void testAddStateStoreWithNonExistingProcessor() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testAddStateStoreWithSource() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source-1", "topic-1");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testAddStateStoreWithSink() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSink("sink-1", "topic-1");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testAddStateStoreWithDuplicates() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addStateStore(new MockStateStoreSupplier("store", false));
+        builder.addStateStore(new MockStateStoreSupplier("store", false));
+    }
+
+    @Test
+    public void testAddStateStore() {
+        final TopologyBuilder builder = new TopologyBuilder();
+        List<StateStoreSupplier> suppliers;
+
+        StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
+        builder.addStateStore(supplier);
+        suppliers = builder.build().stateStoreSuppliers();
+        assertEquals(0, suppliers.size());
+
+        builder.addSource("source-1", "topic-1");
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+        builder.connectProcessorAndStateStores("processor-1", "store-1");
+        suppliers = builder.build().stateStoreSuppliers();
+        assertEquals(1, suppliers.size());
+        assertEquals(supplier.name(), suppliers.get(0).name());
+    }
+
     @Test
     public void testTopicGroups() {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -138,6 +188,35 @@ public class TopologyBuilderTest {
         assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
     }
 
+    @Test
+    public void testTopicGroupsByStateStore() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source-1", "topic-1", "topic-1x");
+        builder.addSource("source-2", "topic-2");
+        builder.addSource("source-3", "topic-3");
+        builder.addSource("source-4", "topic-4");
+        builder.addSource("source-5", "topic-5");
+
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
+        builder.addStateStore(new MockStateStoreSupplier("strore-1", false), "processor-1", "processor-2");
+
+        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
+        builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
+        builder.addStateStore(new MockStateStoreSupplier("strore-2", false), "processor-3", "processor-4");
+
+        Map<Integer, Set<String>> topicGroups = builder.topicGroups();
+
+        Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
+        expectedTopicGroups.put(0, set("topic-1", "topic-1x", "topic-2"));
+        expectedTopicGroups.put(1, set("topic-3", "topic-4"));
+        expectedTopicGroups.put(2, set("topic-5"));
+
+        assertEquals(3, topicGroups.size());
+        assertEquals(expectedTopicGroups, topicGroups);
+    }
+
     private <T> Set<T> set(T... items) {
         Set<T> set = new HashSet<>();
         for (T item : items) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index eb33dc3..c447f99 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -24,14 +24,11 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Test;
 
 import java.io.File;
@@ -53,45 +50,6 @@ import static org.junit.Assert.assertFalse;
 
 public class ProcessorStateManagerTest {
 
-    private static class MockStateStore implements StateStore {
-        private final String name;
-        private final boolean persistent;
-
-        public boolean flushed = false;
-        public boolean closed = false;
-        public final ArrayList<Integer> keys = new ArrayList<>();
-
-        public MockStateStore(String name, boolean persistent) {
-            this.name = name;
-            this.persistent = persistent;
-        }
-        @Override
-        public String name() {
-            return name;
-        }
-        @Override
-        public void flush() {
-            flushed = true;
-        }
-        @Override
-        public void close() {
-            closed = true;
-        }
-        @Override
-        public boolean persistent() {
-            return persistent;
-        }
-
-        public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
-            private final Deserializer<Integer> deserializer = new IntegerDeserializer();
-
-            @Override
-            public void restore(byte[] key, byte[] value) {
-                keys.add(deserializer.deserialize("", key));
-            }
-        };
-    }
-
     private class MockRestoreConsumer  extends MockConsumer<byte[], byte[]> {
         private final Serializer<Integer> serializer = new IntegerSerializer();
 
@@ -255,7 +213,7 @@ public class ProcessorStateManagerTest {
     public void testNoTopic() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            MockStateStore mockStateStore = new MockStateStore("mockStore", false);
+            MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
 
             ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
             try {
@@ -283,7 +241,7 @@ public class ProcessorStateManagerTest {
             ));
             restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
 
-            MockStateStore persistentStore = new MockStateStore("persistentStore", true); // persistent store
+            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
 
             ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
             try {
@@ -331,7 +289,7 @@ public class ProcessorStateManagerTest {
             ));
             restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
 
-            MockStateStore nonPersistentStore = new MockStateStore("nonPersistentStore", false); // non persistent store
+            MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false); // non persistent store
 
             ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
             try {
@@ -371,7 +329,7 @@ public class ProcessorStateManagerTest {
                     new PartitionInfo("mockStore", 1, Node.noNode(), new Node[0], new Node[0])
             ));
 
-            MockStateStore mockStateStore = new MockStateStore("mockStore", false);
+            MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
 
             ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
             try {
@@ -411,8 +369,8 @@ public class ProcessorStateManagerTest {
             ackedOffsets.put(new TopicPartition("nonPersistentStore", 1), 456L);
             ackedOffsets.put(new TopicPartition("otherTopic", 1), 789L);
 
-            MockStateStore persistentStore = new MockStateStore("persistentStore", true);
-            MockStateStore nonPersistentStore = new MockStateStore("nonPersistentStore", false);
+            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
+            MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
 
             ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
             try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 5f8ca46..803d4b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -83,7 +83,7 @@ public class ProcessorTopologyTest {
         }
         driver = null;
     }
-    
+
     @Test
     public void testTopologyMetadata() {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -203,6 +203,10 @@ public class ProcessorTopologyTest {
     protected TopologyBuilder createStatefulTopology(String storeName) {
         return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
                                     .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
+                                    .addStateStore(
+                                            Stores.create(storeName, config).withStringKeys().withStringValues().inMemory().build(),
+                                            "processor"
+                                    )
                                     .addSink("counts", OUTPUT_TOPIC_1, "processor");
     }
 
@@ -262,9 +266,10 @@ public class ProcessorTopologyTest {
         }
 
         @Override
+        @SuppressWarnings("unchecked")
         public void init(ProcessorContext context) {
             super.init(context);
-            store = Stores.create(storeName, context).withStringKeys().withStringValues().inMemory().build();
+            store = (KeyValueStore<String, String>) context.getStateStore(storeName);
         }
 
         @Override
@@ -281,7 +286,7 @@ public class ProcessorTopologyTest {
             }
             context().forward(Long.toString(streamTime), count);
         }
-        
+
         @Override
         public void close() {
             store.close();
@@ -303,4 +308,4 @@ public class ProcessorTopologyTest {
             return timestamp;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index b1403bd..2c7aaeb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -30,7 +30,7 @@ public class PunctuationQueueTest {
     @Test
     public void testPunctuationInterval() {
         TestProcessor processor = new TestProcessor();
-        ProcessorNode<String, String> node = new ProcessorNode<>("test", processor);
+        ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
         PunctuationQueue queue = new PunctuationQueue();
 
         PunctuationSchedule sched = new PunctuationSchedule(node, 100L);

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 0b828f7..d80e98c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.test.MockSourceNode;
 import org.junit.Test;
@@ -37,6 +38,7 @@ import org.junit.Before;
 import java.io.File;
 import java.nio.file.Files;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Properties;
@@ -57,13 +59,15 @@ public class StreamTaskTest {
     private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
     private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
     private final ProcessorTopology topology = new ProcessorTopology(
-        Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2),
-        new HashMap<String, SourceNode>() {
-            {
-                put("topic1", source1);
-                put("topic2", source2);
-            }
-        });
+            Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2),
+            new HashMap<String, SourceNode>() {
+                {
+                    put("topic1", source1);
+                    put("topic2", source2);
+                }
+            },
+            Collections.<StateStoreSupplier>emptyList()
+    );
 
     private StreamingConfig createConfig(final File baseDir) throws Exception {
         return new StreamingConfig(new Properties() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
index d8f06ea..209f3c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
@@ -21,20 +21,22 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
+import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.junit.Test;
 
 public abstract class AbstractKeyValueStoreTest {
 
-    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
+    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(StreamingConfig config,
+                                                                      ProcessorContext context,
                                                                       Class<K> keyClass, Class<V> valueClass,
                                                                       boolean useContextSerdes);
-    
+
     @Test
     public void testPutGetRange() {
         // Create the test driver ...
         KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false);
         try {
 
             // Verify that the store reads and writes correctly ...
@@ -100,7 +102,7 @@ public abstract class AbstractKeyValueStoreTest {
     public void testPutGetRangeWithDefaultSerdes() {
         // Create the test driver ...
         KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true);
         try {
 
             // Verify that the store reads and writes correctly ...
@@ -150,7 +152,7 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false);
         try {
             // Verify that the store's contents were properly restored ...
             assertEquals(0, driver.checkForRestoredEntries(store));
@@ -176,7 +178,7 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true);
         try {
             // Verify that the store's contents were properly restored ...
             assertEquals(0, driver.checkForRestoredEntries(store));
@@ -188,4 +190,4 @@ public abstract class AbstractKeyValueStoreTest {
         }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
index bee9967..b3fe98c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
@@ -18,21 +18,33 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context, Class<K> keyClass, Class<V> valueClass,
-                                                             boolean useContextSerdes) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
+            StreamingConfig config,
+            ProcessorContext context,
+            Class<K> keyClass, Class<V> valueClass,
+            boolean useContextSerdes) {
+
+        StateStoreSupplier supplier;
         if (useContextSerdes) {
             Serializer<K> keySer = (Serializer<K>) context.keySerializer();
             Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
             Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
             Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            return Stores.create("my-store", context).withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
+            supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
+        } else {
+            supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).inMemory().build();
         }
-        return Stores.create("my-store", context).withKeys(keyClass).withValues(valueClass).inMemory().build();
+
+        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        store.init(context);
+        return store;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
index 6b96d3a..dddb9c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertNull;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.junit.Test;
 
 public class InMemoryLRUCacheStoreTest {
@@ -29,10 +30,12 @@ public class InMemoryLRUCacheStoreTest {
     public void testPutGetRange() {
         // Create the test driver ...
         KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-        KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+        StateStoreSupplier supplier = Stores.create("my-store", driver.config())
                                                      .withIntegerKeys().withStringValues()
                                                      .inMemory().maxEntries(3)
                                                      .build();
+        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+        store.init(driver.context());
 
         // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
         store.put(0, "zero");
@@ -79,11 +82,13 @@ public class InMemoryLRUCacheStoreTest {
         Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
         Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
         Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
-        KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+        StateStoreSupplier supplier = Stores.create("my-store", driver.config())
                                                      .withKeys(keySer, keyDeser)
                                                      .withValues(valSer, valDeser)
                                                      .inMemory().maxEntries(3)
                                                      .build();
+        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+        store.init(driver.context());
 
         // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
         store.put(0, "zero");
@@ -133,10 +138,12 @@ public class InMemoryLRUCacheStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+        StateStoreSupplier supplier = Stores.create("my-store", driver.config())
                                                      .withIntegerKeys().withStringValues()
                                                      .inMemory().maxEntries(3)
                                                      .build();
+        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+        store.init(driver.context());
 
         // Verify that the store's contents were properly restored ...
         assertEquals(0, driver.checkForRestoredEntries(store));
@@ -145,4 +152,4 @@ public class InMemoryLRUCacheStoreTest {
         assertEquals(3, driver.sizeOf(store));
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 7e1512a..8bab1c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -30,6 +31,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.MockTimestampExtractor;
 
 import java.io.File;
 import java.util.HashMap;
@@ -38,6 +40,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Properties;
 import java.util.Set;
 
 /**
@@ -218,6 +221,7 @@ public class KeyValueStoreTestDriver<K, V> {
     private final Map<K, V> flushedEntries = new HashMap<>();
     private final Set<K> flushedRemovals = new HashSet<>();
     private final List<Entry<K, V>> restorableEntries = new LinkedList<>();
+    private final StreamingConfig config;
     private final MockProcessorContext context;
     private final Map<String, StateStore> storeMap = new HashMap<>();
     private final StreamingMetrics metrics = new StreamingMetrics() {
@@ -243,6 +247,15 @@ public class KeyValueStoreTestDriver<K, V> {
                 recordFlushed(record.key(), record.value());
             }
         };
+        Properties props = new Properties();
+        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
+        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass());
+        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
+        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
+        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
+        this.config = new StreamingConfig(props);
+
         this.context = new MockProcessorContext(null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
                 serdes.valueDeserializer(), recordCollector) {
             @Override
@@ -349,6 +362,15 @@ public class KeyValueStoreTestDriver<K, V> {
     }
 
     /**
+     * Get the streaming config that should be supplied to a {@link Serdes}'s constructor.
+     *
+     * @return the streaming config; never null
+     */
+    public StreamingConfig config() {
+        return config;
+    }
+
+    /**
      * Get the context that should be supplied to a {@link KeyValueStore}'s constructor. This context records any messages
      * written by the store to the Kafka topic, making them available via the {@link #flushedEntryStored(Object)} and
      * {@link #flushedEntryRemoved(Object)} methods.

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
index 9ac1740..37a12f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
@@ -18,21 +18,35 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context, Class<K> keyClass, Class<V> valueClass,
-                                                             boolean useContextSerdes) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
+            StreamingConfig config,
+            ProcessorContext context,
+            Class<K> keyClass,
+            Class<V> valueClass,
+            boolean useContextSerdes) {
+
+        StateStoreSupplier supplier;
         if (useContextSerdes) {
             Serializer<K> keySer = (Serializer<K>) context.keySerializer();
             Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
             Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
             Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            return Stores.create("my-store", context).withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
+            supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
+        } else {
+            supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).localDatabase().build();
         }
-        return Stores.create("my-store", context).withKeys(keyClass).withValues(valueClass).localDatabase().build();
+
+        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        store.init(context);
+        return store;
+
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
new file mode 100644
index 0000000..16635b7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.ArrayList;
+
+public class MockStateStoreSupplier implements StateStoreSupplier {
+    private final String name;
+    private final boolean persistent;
+
+    public MockStateStoreSupplier(String name, boolean persistent) {
+        this.name = name;
+        this.persistent = persistent;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public StateStore get() {
+        return new MockStateStore(name, persistent);
+    }
+
+    public static class MockStateStore implements StateStore {
+        private final String name;
+        private final boolean persistent;
+
+        public boolean initialized = false;
+        public boolean flushed = false;
+        public boolean closed = false;
+        public final ArrayList<Integer> keys = new ArrayList<>();
+
+        public MockStateStore(String name, boolean persistent) {
+            this.name = name;
+            this.persistent = persistent;
+        }
+
+        @Override
+        public String name() {
+            return name;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            context.register(this, stateRestoreCallback);
+            initialized = true;
+        }
+
+        @Override
+        public void flush() {
+            flushed = true;
+        }
+
+        @Override
+        public void close() {
+            closed = true;
+        }
+
+        @Override
+        public boolean persistent() {
+            return persistent;
+        }
+
+        public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
+            private final Deserializer<Integer> deserializer = new IntegerDeserializer();
+
+            @Override
+            public void restore(byte[] key, byte[] value) {
+                keys.add(deserializer.deserialize("", key));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 0c4b1a2..fc83762 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -266,7 +267,7 @@ public class ProcessorTopologyTestDriver {
      * @see #getKeyValueStore(String)
      */
     public StateStore getStateStore(String name) {
-        return task.context().getStateStore(name);
+        return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
     }
 
     /**


[2/2] kafka git commit: KAFKA-2706: make state stores first class citizens in the processor topology

Posted by gu...@apache.org.
KAFKA-2706: make state stores first class citizens in the processor topology

* Added StateStoreSupplier
* StateStore
  * Added init(ProcessorContext context) method
* TopologyBuilder
  * Added addStateStore(StateStoreSupplier supplier, String... processNames)
  * Added connectProessorAndStateStores(String processorName, String... stateStoreNames)
    * This is for the case processors are not created when a store is added to the topology. (used by KStream)
* KStream
  * add stateStoreNames to process(), transform(), transformValues().
* Refactored existing state stores to implement StateStoreSupplier

guozhangwang

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #387 from ymatsuda/state_store_supplier


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75827226
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75827226
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75827226

Branch: refs/heads/trunk
Commit: 758272267c811bf559336ea45571bc420a62a478
Parents: 6383593
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Nov 2 13:24:48 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 2 13:24:48 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/StreamingConfig.java   |  18 ++
 .../kafka/streams/examples/ProcessorJob.java    |   5 +-
 .../apache/kafka/streams/kstream/KStream.java   |   9 +-
 .../streams/kstream/internals/KStreamImpl.java  |   9 +-
 .../kafka/streams/processor/StateStore.java     |   5 +
 .../streams/processor/StateStoreSupplier.java   |  25 ++
 .../streams/processor/TopologyBuilder.java      | 109 +++++--
 .../internals/ProcessorContextImpl.java         |  30 +-
 .../processor/internals/ProcessorNode.java      |   8 +-
 .../processor/internals/ProcessorTopology.java  |  11 +-
 .../streams/processor/internals/StreamTask.java |   8 +
 .../streams/state/InMemoryKeyValueStore.java    | 135 ---------
 .../state/InMemoryKeyValueStoreSupplier.java    | 155 ++++++++++
 .../streams/state/InMemoryLRUCacheStore.java    | 180 -----------
 .../state/InMemoryLRUCacheStoreSupplier.java    | 195 ++++++++++++
 .../streams/state/MeteredKeyValueStore.java     |  68 +++--
 .../streams/state/RocksDBKeyValueStore.java     | 284 ------------------
 .../state/RocksDBKeyValueStoreSupplier.java     | 298 +++++++++++++++++++
 .../org/apache/kafka/streams/state/Serdes.java  |  45 ++-
 .../org/apache/kafka/streams/state/Stores.java  |  66 ++--
 .../streams/processor/TopologyBuilderTest.java  |  79 +++++
 .../internals/ProcessorStateManagerTest.java    |  56 +---
 .../internals/ProcessorTopologyTest.java        |  13 +-
 .../internals/PunctuationQueueTest.java         |   2 +-
 .../processor/internals/StreamTaskTest.java     |  18 +-
 .../state/AbstractKeyValueStoreTest.java        |  16 +-
 .../state/InMemoryKeyValueStoreTest.java        |  22 +-
 .../state/InMemoryLRUCacheStoreTest.java        |  15 +-
 .../streams/state/KeyValueStoreTestDriver.java  |  22 ++
 .../streams/state/RocksDBKeyValueStoreTest.java |  24 +-
 .../kafka/test/MockStateStoreSupplier.java      |  97 ++++++
 .../kafka/test/ProcessorTopologyTestDriver.java |   3 +-
 32 files changed, 1224 insertions(+), 806 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
index a0aef48..88bd844 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
@@ -245,6 +247,22 @@ public class StreamingConfig extends AbstractConfig {
         return props;
     }
 
+    public Serializer keySerializer() {
+        return getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+    }
+
+    public Serializer valueSerializer() {
+        return getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+    }
+
+    public Deserializer keyDeserializer() {
+        return getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+    }
+
+    public Deserializer valueDeserializer() {
+        return getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+    }
+
     public static void main(String[] args) {
         System.out.println(CONFIG.toHtmlTable());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 0317b9d..3274aae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -45,10 +45,11 @@ public class ProcessorJob {
                 private KeyValueStore<String, Integer> kvStore;
 
                 @Override
+                @SuppressWarnings("unchecked")
                 public void init(ProcessorContext context) {
                     this.context = context;
                     this.context.schedule(1000);
-                    this.kvStore = Stores.create("local-state", context).withStringKeys().withIntegerValues().inMemory().build();
+                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("local-state");
                 }
 
                 @Override
@@ -103,6 +104,8 @@ public class ProcessorJob {
         builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
 
         builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
+        builder.addStateStore(Stores.create("local-state", config).withStringKeys().withIntegerValues().inMemory().build());
+        builder.connectProcessorAndStateStores("local-state", "PROCESS");
 
         builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 915cf1c..8f0794c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -154,24 +154,27 @@ public interface KStream<K, V> {
      * Applies a stateful transformation to all elements in this stream.
      *
      * @param transformerSupplier the class of TransformerDef
+     * @param stateStoreNames the names of the state store used by the processor
      * @return KStream
      */
-    <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier);
+    <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames);
 
     /**
      * Applies a stateful transformation to all values in this stream.
      *
      * @param valueTransformerSupplier the class of TransformerDef
+     * @param stateStoreNames the names of the state store used by the processor
      * @return KStream
      */
-    <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier);
+    <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
 
     /**
      * Processes all elements in this stream by applying a processor.
      *
      * @param processorSupplier the supplier of the Processor to use
+     * @param stateStoreNames the names of the state store used by the processor
      * @return the new stream containing the processed output
      */
-    void process(ProcessorSupplier<K, V> processorSupplier);
+    void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 1a2297c..1ea9b1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -201,27 +201,30 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier) {
+    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) {
         String name = TRANSFORM_NAME + INDEX.getAndIncrement();
 
         topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
+        topology.connectProcessorAndStateStores(name, stateStoreNames);
 
         return new KStreamImpl<>(topology, name, null);
     }
 
     @Override
-    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier) {
+    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String... stateStoreNames) {
         String name = TRANSFORMVALUES_NAME + INDEX.getAndIncrement();
 
         topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
+        topology.connectProcessorAndStateStores(name, stateStoreNames);
 
         return new KStreamImpl<>(topology, name, sourceNodes);
     }
 
     @Override
-    public void process(final ProcessorSupplier<K, V> processorSupplier) {
+    public void process(final ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames) {
         String name = PROCESSOR_NAME + INDEX.getAndIncrement();
 
         topology.addProcessor(name, processorSupplier, this.name);
+        topology.connectProcessorAndStateStores(name, stateStoreNames);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 38afe9b..9c085a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -36,6 +36,11 @@ public interface StateStore {
     String name();
 
     /**
+     * Initializes this state store
+     */
+    void init(ProcessorContext context);
+
+    /**
      * Flush any cached data
      */
     void flush();

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
new file mode 100644
index 0000000..11545c5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+public interface StateStoreSupplier {
+
+    String name();
+
+    StateStore get();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 077489c..5b6d4ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -33,6 +33,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -48,10 +50,9 @@ import java.util.Set;
  */
 public class TopologyBuilder {
 
-    // list of node factories in a topological order
-    private final ArrayList<NodeFactory> nodeFactories = new ArrayList<>();
+    // node factories in a topological order
+    private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
 
-    private final Set<String> nodeNames = new HashSet<>();
     private final Set<String> sourceTopicNames = new HashSet<>();
 
     private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
@@ -59,6 +60,9 @@ public class TopologyBuilder {
     private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
     private Map<Integer, Set<String>> nodeGroups = null;
 
+    private Map<String, StateStoreSupplier> stateStores = new HashMap<>();
+    private Map<String, Set<String>> stateStoreUsers = new HashMap();
+
     private interface NodeFactory {
         ProcessorNode build();
     }
@@ -67,6 +71,7 @@ public class TopologyBuilder {
         public final String[] parents;
         private final String name;
         private final ProcessorSupplier supplier;
+        private final Set<String> stateStoreNames = new HashSet<>();
 
         public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) {
             this.name = name;
@@ -74,9 +79,13 @@ public class TopologyBuilder {
             this.supplier = supplier;
         }
 
+        public void addStateStore(String stateStoreName) {
+            stateStoreNames.add(stateStoreName);
+        }
+
         @Override
         public ProcessorNode build() {
-            return new ProcessorNode(name, supplier.get());
+            return new ProcessorNode(name, supplier.get(), stateStoreNames);
         }
     }
 
@@ -155,7 +164,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      */
     public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
-        if (nodeNames.contains(name))
+        if (nodeFactories.containsKey(name))
             throw new TopologyException("Processor " + name + " is already added.");
 
         for (String topic : topics) {
@@ -165,8 +174,7 @@ public class TopologyBuilder {
             sourceTopicNames.add(topic);
         }
 
-        nodeNames.add(name);
-        nodeFactories.add(new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
+        nodeFactories.put(name, new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
         nodeToTopics.put(name, topics.clone());
         nodeGrouper.add(name);
 
@@ -204,7 +212,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      */
     public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
-        if (nodeNames.contains(name))
+        if (nodeFactories.containsKey(name))
             throw new TopologyException("Processor " + name + " is already added.");
 
         if (parentNames != null) {
@@ -212,14 +220,13 @@ public class TopologyBuilder {
                 if (parent.equals(name)) {
                     throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
                 }
-                if (!nodeNames.contains(parent)) {
+                if (!nodeFactories.containsKey(parent)) {
                     throw new TopologyException("Parent processor " + parent + " is not added yet.");
                 }
             }
         }
 
-        nodeNames.add(name);
-        nodeFactories.add(new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
+        nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
         return this;
     }
 
@@ -233,7 +240,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      */
     public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
-        if (nodeNames.contains(name))
+        if (nodeFactories.containsKey(name))
             throw new TopologyException("Processor " + name + " is already added.");
 
         if (parentNames != null) {
@@ -241,20 +248,80 @@ public class TopologyBuilder {
                 if (parent.equals(name)) {
                     throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
                 }
-                if (!nodeNames.contains(parent)) {
+                if (!nodeFactories.containsKey(parent)) {
                     throw new TopologyException("Parent processor " + parent + " is not added yet.");
                 }
             }
         }
 
-        nodeNames.add(name);
-        nodeFactories.add(new ProcessorNodeFactory(name, parentNames, supplier));
+        nodeFactories.put(name, new ProcessorNodeFactory(name, parentNames, supplier));
         nodeGrouper.add(name);
         nodeGrouper.unite(name, parentNames);
         return this;
     }
 
     /**
+     * Adds a state store
+     *
+     * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) {
+        if (stateStores.containsKey(supplier.name())) {
+            throw new TopologyException("StateStore " + supplier.name() + " is already added.");
+        }
+        stateStores.put(supplier.name(), supplier);
+        stateStoreUsers.put(supplier.name(), new HashSet<String>());
+
+        if (processorNames != null) {
+            for (String processorName : processorNames) {
+                connectProcessorAndStateStore(processorName, supplier.name());
+            }
+        }
+
+        return this;
+    }
+
+    /**
+     * Connects the processor and the state stores
+     *
+     * @param processorName the name of the processor
+     * @param stateStoreNames the names of state stores that the processor uses
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public final TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) {
+        if (stateStoreNames != null) {
+            for (String stateStoreName : stateStoreNames) {
+                connectProcessorAndStateStore(processorName, stateStoreName);
+            }
+        }
+
+        return this;
+    }
+
+    private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
+        if (!stateStores.containsKey(stateStoreName))
+            throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
+        if (!nodeFactories.containsKey(processorName))
+            throw new TopologyException("Processor " + processorName + " is not added yet.");
+
+        Set<String> users = stateStoreUsers.get(stateStoreName);
+        Iterator<String> iter = users.iterator();
+        if (iter.hasNext()) {
+            String user = iter.next();
+            nodeGrouper.unite(user, processorName);
+        }
+        users.add(processorName);
+
+        NodeFactory factory = nodeFactories.get(processorName);
+        if (factory instanceof ProcessorNodeFactory) {
+            ((ProcessorNodeFactory) factory).addStateStore(stateStoreName);
+        } else {
+            throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
+        }
+    }
+
+    /**
      * Returns the map of topic groups keyed by the group id.
      * A topic group is a group of topics in the same task.
      *
@@ -301,7 +368,7 @@ public class TopologyBuilder {
         }
 
         // Go through non-source nodes
-        for (String nodeName : Utils.sorted(nodeNames)) {
+        for (String nodeName : Utils.sorted(nodeFactories.keySet())) {
             if (!nodeToTopics.containsKey(nodeName)) {
                 String root = nodeGrouper.root(nodeName);
                 Set<String> nodeGroup = rootToNodeGroup.get(root);
@@ -357,10 +424,11 @@ public class TopologyBuilder {
         List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
         Map<String, ProcessorNode> processorMap = new HashMap<>();
         Map<String, SourceNode> topicSourceMap = new HashMap<>();
+        Map<String, StateStoreSupplier> stateStoreMap = new HashMap<>();
 
         try {
             // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
-            for (NodeFactory factory : nodeFactories) {
+            for (NodeFactory factory : nodeFactories.values()) {
                 ProcessorNode node = factory.build();
                 processorNodes.add(node);
                 processorMap.put(node.name(), node);
@@ -369,6 +437,11 @@ public class TopologyBuilder {
                     for (String parent : ((ProcessorNodeFactory) factory).parents) {
                         processorMap.get(parent).addChild(node);
                     }
+                    for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
+                        if (!stateStoreMap.containsKey(stateStoreName)) {
+                            stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName));
+                        }
+                    }
                 } else if (factory instanceof SourceNodeFactory) {
                     for (String topic : ((SourceNodeFactory) factory).topics) {
                         topicSourceMap.put(topic, (SourceNode) node);
@@ -385,7 +458,7 @@ public class TopologyBuilder {
             throw new KafkaException("ProcessorNode construction failed: this should not happen.");
         }
 
-        return new ProcessorTopology(processorNodes, topicSourceMap);
+        return new ProcessorTopology(processorNodes, topicSourceMap, new ArrayList<>(stateStoreMap.values()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 3c1e059..1321cc5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -62,19 +62,14 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         this.collector = collector;
         this.stateMgr = stateMgr;
 
-        this.keySerializer = config.getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
-        this.valSerializer = config.getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
-        this.keyDeserializer = config.getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
-        this.valDeserializer = config.getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+        this.keySerializer = config.keySerializer();
+        this.valSerializer = config.valueSerializer();
+        this.keyDeserializer = config.keyDeserializer();
+        this.valDeserializer = config.valueDeserializer();
 
         this.initialized = false;
     }
 
-    @Override
-    public RecordCollector recordCollector() {
-        return this.collector;
-    }
-
     public void initialized() {
         this.initialized = true;
     }
@@ -83,6 +78,15 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         return id;
     }
 
+    public ProcessorStateManager getStateMgr() {
+        return stateMgr;
+    }
+
+    @Override
+    public RecordCollector recordCollector() {
+        return this.collector;
+    }
+
     @Override
     public Serializer<?> keySerializer() {
         return this.keySerializer;
@@ -123,6 +127,14 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
 
     @Override
     public StateStore getStateStore(String name) {
+        ProcessorNode node = task.node();
+
+        if (node == null)
+            throw new KafkaException("accessing from an unknown node");
+
+        if (!node.stateStores.contains(name))
+            throw new KafkaException("Processor " + node.name() + " has no access to StateStore " + name);
+
         return stateMgr.getStore(name);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 9127c3f..6db83a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 public class ProcessorNode<K, V> {
 
@@ -30,14 +31,17 @@ public class ProcessorNode<K, V> {
     private final String name;
     private final Processor<K, V> processor;
 
+    public final Set<String> stateStores;
+
     public ProcessorNode(String name) {
-        this(name, null);
+        this(name, null, null);
     }
 
-    public ProcessorNode(String name, Processor<K, V> processor) {
+    public ProcessorNode(String name, Processor<K, V> processor, Set<String> stateStores) {
         this.name = name;
         this.processor = processor;
         this.children = new ArrayList<>();
+        this.stateStores = stateStores;
     }
 
     public final String name() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 3efae65..a70aa70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -27,11 +29,14 @@ public class ProcessorTopology {
 
     private final List<ProcessorNode> processorNodes;
     private final Map<String, SourceNode> sourceByTopics;
+    private final List<StateStoreSupplier> stateStoreSuppliers;
 
     public ProcessorTopology(List<ProcessorNode> processorNodes,
-                             Map<String, SourceNode> sourceByTopics) {
+                             Map<String, SourceNode> sourceByTopics,
+                             List<StateStoreSupplier> stateStoreSuppliers) {
         this.processorNodes = Collections.unmodifiableList(processorNodes);
         this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics);
+        this.stateStoreSuppliers = Collections.unmodifiableList(stateStoreSuppliers);
     }
 
     public Set<String> sourceTopics() {
@@ -50,4 +55,8 @@ public class ProcessorTopology {
         return processorNodes;
     }
 
+    public List<StateStoreSupplier> stateStoreSuppliers() {
+        return stateStoreSuppliers;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f01e00b..a9c14e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -26,6 +26,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.slf4j.Logger;
@@ -126,6 +128,12 @@ public class StreamTask implements Punctuator {
         // initialize the topology with its own context
         this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics);
 
+        // initialize the state stores
+        for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
+            StateStore store = stateStoreSupplier.get();
+            store.init(this.processorContext);
+        }
+
         // initialize the task by initializing all its processor nodes in the topology
         for (ProcessorNode node : this.topology.processors()) {
             this.currNode = node;

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
deleted file mode 100644
index 1eb526f..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-/**
- * An in-memory key-value store based on a TreeMap.
- *
- * @param <K> The key type
- * @param <V> The value type
- * 
- * @see Stores#create(String, ProcessorContext)
- */
-public class InMemoryKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
-
-    protected InMemoryKeyValueStore(String name, ProcessorContext context, Serdes<K, V> serdes, Time time) {
-        super(name, new MemoryStore<K, V>(name), context, serdes, "in-memory-state", time != null ? time : new SystemTime());
-    }
-
-    private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
-
-        private final String name;
-        private final NavigableMap<K, V> map;
-
-        public MemoryStore(String name) {
-            super();
-            this.name = name;
-            this.map = new TreeMap<>();
-        }
-
-        @Override
-        public String name() {
-            return this.name;
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            return this.map.get(key);
-        }
-
-        @Override
-        public void put(K key, V value) {
-            this.map.put(key, value);
-        }
-
-        @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
-        }
-
-        @Override
-        public V delete(K key) {
-            return this.map.remove(key);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
-        }
-
-        @Override
-        public void flush() {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public void close() {
-            // do-nothing
-        }
-
-        private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
-            private final Iterator<Map.Entry<K, V>> iter;
-
-            public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
-                this.iter = iter;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return iter.hasNext();
-            }
-
-            @Override
-            public Entry<K, V> next() {
-                Map.Entry<K, V> entry = iter.next();
-                return new Entry<>(entry.getKey(), entry.getValue());
-            }
-
-            @Override
-            public void remove() {
-                iter.remove();
-            }
-
-            @Override
-            public void close() {
-            }
-
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
new file mode 100644
index 0000000..d1f845c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * An in-memory key-value store based on a TreeMap.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
+ */
+public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
+
+    private final String name;
+    private final Serdes serdes;
+    private final Time time;
+
+    protected InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+        this.name = name;
+        this.serdes = serdes;
+        this.time = time;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public StateStore get() {
+        return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
+    }
+
+    private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
+
+        private final String name;
+        private final NavigableMap<K, V> map;
+
+        public MemoryStore(String name) {
+            super();
+            this.name = name;
+            this.map = new TreeMap<>();
+        }
+
+        @Override
+        public String name() {
+            return this.name;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            // do-nothing since it is in-memory
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+
+        @Override
+        public V get(K key) {
+            return this.map.get(key);
+        }
+
+        @Override
+        public void put(K key, V value) {
+            this.map.put(key, value);
+        }
+
+        @Override
+        public void putAll(List<Entry<K, V>> entries) {
+            for (Entry<K, V> entry : entries)
+                put(entry.key(), entry.value());
+        }
+
+        @Override
+        public V delete(K key) {
+            return this.map.remove(key);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(K from, K to) {
+            return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
+        }
+
+        @Override
+        public void flush() {
+            // do-nothing since it is in-memory
+        }
+
+        @Override
+        public void close() {
+            // do-nothing
+        }
+
+        private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
+            private final Iterator<Map.Entry<K, V>> iter;
+
+            public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
+                this.iter = iter;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iter.hasNext();
+            }
+
+            @Override
+            public Entry<K, V> next() {
+                Map.Entry<K, V> entry = iter.next();
+                return new Entry<>(entry.getKey(), entry.getValue());
+            }
+
+            @Override
+            public void remove() {
+                iter.remove();
+            }
+
+            @Override
+            public void close() {
+            }
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
deleted file mode 100644
index 1b96c59..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
-/**
- * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- */
-public class InMemoryLRUCacheStore<K, V> extends MeteredKeyValueStore<K, V> {
-
-    protected static <K, V> InMemoryLRUCacheStore<K, V> create(String name, int capacity, ProcessorContext context,
-                                                               Serdes<K, V> serdes, Time time) {
-        if (time == null) time = new SystemTime();
-        MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
-        final InMemoryLRUCacheStore<K, V> store = new InMemoryLRUCacheStore<>(name, context, cache, serdes, time);
-        cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
-            @Override
-            public void apply(K key, V value) {
-                store.removed(key);
-            }
-        });
-        return store;
-
-    }
-
-    private InMemoryLRUCacheStore(String name, ProcessorContext context, MemoryLRUCache<K, V> cache, Serdes<K, V> serdes, Time time) {
-        super(name, cache, context, serdes, "kafka-streams", time);
-    }
-
-    private static interface EldestEntryRemovalListener<K, V> {
-        public void apply(K key, V value);
-    }
-
-    protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
-
-        private final String name;
-        private final Map<K, V> map;
-        private final NavigableSet<K> keys;
-        private EldestEntryRemovalListener<K, V> listener;
-
-        public MemoryLRUCache(String name, final int maxCacheSize) {
-            this.name = name;
-            this.keys = new TreeSet<>();
-            // leave room for one extra entry to handle adding an entry before the oldest can be removed
-            this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
-                private static final long serialVersionUID = 1L;
-
-                @Override
-                protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-                    if (size() > maxCacheSize) {
-                        K key = eldest.getKey();
-                        keys.remove(key);
-                        if (listener != null) listener.apply(key, eldest.getValue());
-                        return true;
-                    }
-                    return false;
-                }
-            };
-        }
-
-        protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
-            this.listener = listener;
-        }
-
-        @Override
-        public String name() {
-            return this.name;
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            return this.map.get(key);
-        }
-
-        @Override
-        public void put(K key, V value) {
-            this.map.put(key, value);
-            this.keys.add(key);
-        }
-
-        @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
-        }
-
-        @Override
-        public V delete(K key) {
-            V value = this.map.remove(key);
-            this.keys.remove(key);
-            return value;
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
-        }
-
-        @Override
-        public void flush() {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public void close() {
-            // do-nothing
-        }
-
-        private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
-            private final Iterator<K> keys;
-            private final Map<K, V> entries;
-            private K lastKey;
-
-            public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
-                this.keys = keys;
-                this.entries = entries;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return keys.hasNext();
-            }
-
-            @Override
-            public Entry<K, V> next() {
-                lastKey = keys.next();
-                return new Entry<>(lastKey, entries.get(lastKey));
-            }
-
-            @Override
-            public void remove() {
-                keys.remove();
-                entries.remove(lastKey);
-            }
-
-            @Override
-            public void close() {
-                // do nothing
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
new file mode 100644
index 0000000..a346534
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ */
+public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
+
+    private final String name;
+    private final int capacity;
+    private final Serdes serdes;
+    private final Time time;
+
+    protected InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
+        this.name = name;
+        this.capacity = capacity;
+        this.serdes = serdes;
+        this.time = time;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public StateStore get() {
+        MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
+        final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
+        cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
+            @Override
+            public void apply(K key, V value) {
+                store.removed(key);
+            }
+        });
+        return store;
+    }
+
+    private static interface EldestEntryRemovalListener<K, V> {
+        public void apply(K key, V value);
+    }
+
+    protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
+
+        private final String name;
+        private final Map<K, V> map;
+        private final NavigableSet<K> keys;
+        private EldestEntryRemovalListener<K, V> listener;
+
+        public MemoryLRUCache(String name, final int maxCacheSize) {
+            this.name = name;
+            this.keys = new TreeSet<>();
+            // leave room for one extra entry to handle adding an entry before the oldest can be removed
+            this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+                    if (size() > maxCacheSize) {
+                        K key = eldest.getKey();
+                        keys.remove(key);
+                        if (listener != null) listener.apply(key, eldest.getValue());
+                        return true;
+                    }
+                    return false;
+                }
+            };
+        }
+
+        protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public String name() {
+            return this.name;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            // do-nothing since it is in-memory
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+
+        @Override
+        public V get(K key) {
+            return this.map.get(key);
+        }
+
+        @Override
+        public void put(K key, V value) {
+            this.map.put(key, value);
+            this.keys.add(key);
+        }
+
+        @Override
+        public void putAll(List<Entry<K, V>> entries) {
+            for (Entry<K, V> entry : entries)
+                put(entry.key(), entry.value());
+        }
+
+        @Override
+        public V delete(K key) {
+            V value = this.map.remove(key);
+            this.keys.remove(key);
+            return value;
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(K from, K to) {
+            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
+        }
+
+        @Override
+        public void flush() {
+            // do-nothing since it is in-memory
+        }
+
+        @Override
+        public void close() {
+            // do-nothing
+        }
+
+        private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
+            private final Iterator<K> keys;
+            private final Map<K, V> entries;
+            private K lastKey;
+
+            public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
+                this.keys = keys;
+                this.entries = entries;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return keys.hasNext();
+            }
+
+            @Override
+            public Entry<K, V> next() {
+                lastKey = keys.next();
+                return new Entry<>(lastKey, entries.get(lastKey));
+            }
+
+            @Override
+            public void remove() {
+                keys.remove();
+                entries.remove(lastKey);
+            }
+
+            @Override
+            public void close() {
+                // do nothing
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index a7f4c12..c1ccbd4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -35,33 +36,51 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     protected final KeyValueStore<K, V> inner;
     protected final Serdes<K, V> serialization;
-
-    private final Time time;
-    private final Sensor putTime;
-    private final Sensor getTime;
-    private final Sensor deleteTime;
-    private final Sensor putAllTime;
-    private final Sensor allTime;
-    private final Sensor rangeTime;
-    private final Sensor flushTime;
-    private final Sensor restoreTime;
-    private final StreamingMetrics metrics;
+    protected final String metricGrp;
+    protected final Time time;
 
     private final String topic;
-    private final int partition;
+
+    private Sensor putTime;
+    private Sensor getTime;
+    private Sensor deleteTime;
+    private Sensor putAllTime;
+    private Sensor allTime;
+    private Sensor rangeTime;
+    private Sensor flushTime;
+    private Sensor restoreTime;
+    private StreamingMetrics metrics;
+
     private final Set<K> dirty;
     private final Set<K> removed;
     private final int maxDirty;
     private final int maxRemoved;
-    private final ProcessorContext context;
+
+    private int partition;
+    private ProcessorContext context;
 
     // always wrap the logged store with the metered store
-    public MeteredKeyValueStore(final String name, final KeyValueStore<K, V> inner, ProcessorContext context,
-                                Serdes<K, V> serialization, String metricGrp, Time time) {
+    public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricGrp, Time time) {
         this.inner = inner;
         this.serialization = serialization;
+        this.metricGrp = metricGrp;
+        this.time = time != null ? time : new SystemTime();
+        this.topic = inner.name();
+
+        this.dirty = new HashSet<K>();
+        this.removed = new HashSet<K>();
+        this.maxDirty = 100; // TODO: this needs to be configurable
+        this.maxRemoved = 100; // TODO: this needs to be configurable
+    }
+
+    @Override
+    public String name() {
+        return inner.name();
+    }
 
-        this.time = time;
+    @Override
+    public void init(ProcessorContext context) {
+        String name = name();
         this.metrics = context.metrics();
         this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
         this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
@@ -72,18 +91,12 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name", name);
         this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
 
-        this.topic = name;
-        this.partition = context.id().partition;
-
         this.context = context;
-
-        this.dirty = new HashSet<K>();
-        this.removed = new HashSet<K>();
-        this.maxDirty = 100; // TODO: this needs to be configurable
-        this.maxRemoved = 100; // TODO: this needs to be configurable
+        this.partition = context.id().partition;
 
         // register and possibly restore the state from the logs
         long startNs = time.nanoseconds();
+        inner.init(context);
         try {
             final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
             final Deserializer<V> valDeserializer = serialization.valueDeserializer();
@@ -92,7 +105,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
                 @Override
                 public void restore(byte[] key, byte[] value) {
                     inner.put(keyDeserializer.deserialize(topic, key),
-                        valDeserializer.deserialize(topic, value));
+                            valDeserializer.deserialize(topic, value));
                 }
             });
         } finally {
@@ -101,11 +114,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public String name() {
-        return inner.name();
-    }
-
-    @Override
     public boolean persistent() {
         return inner.persistent();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
deleted file mode 100644
index 1de345e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.util.Comparator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see Stores#create(String, ProcessorContext)
- */
-public class RocksDBKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
-
-    protected RocksDBKeyValueStore(String name, ProcessorContext context, Serdes<K, V> serdes, Time time) {
-        super(name, new RocksDBStore<K, V>(name, context, serdes), context, serdes, "rocksdb-state", time != null ? time : new SystemTime());
-    }
-
-    private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
-
-        private static final int TTL_NOT_USED = -1;
-
-        // TODO: these values should be configurable
-        private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
-        private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
-        private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
-        private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
-        private static final long BLOCK_SIZE = 4096L;
-        private static final int TTL_SECONDS = TTL_NOT_USED;
-        private static final int MAX_WRITE_BUFFERS = 3;
-        private static final String DB_FILE_DIR = "rocksdb";
-
-        private final Serdes<K, V> serdes;
-
-        private final String topic;
-        private final int partition;
-        private final ProcessorContext context;
-
-        private final Options options;
-        private final WriteOptions wOptions;
-        private final FlushOptions fOptions;
-
-        private final String dbName;
-        private final String dirName;
-
-        private RocksDB db;
-
-        public RocksDBStore(String name, ProcessorContext context, Serdes<K, V> serdes) {
-            this.topic = name;
-            this.partition = context.id().partition;
-            this.context = context;
-            this.serdes = serdes;
-
-            // initialize the rocksdb options
-            BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
-            tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
-            tableConfig.setBlockSize(BLOCK_SIZE);
-
-            options = new Options();
-            options.setTableFormatConfig(tableConfig);
-            options.setWriteBufferSize(WRITE_BUFFER_SIZE);
-            options.setCompressionType(COMPRESSION_TYPE);
-            options.setCompactionStyle(COMPACTION_STYLE);
-            options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
-            options.setCreateIfMissing(true);
-            options.setErrorIfExists(false);
-
-            wOptions = new WriteOptions();
-            wOptions.setDisableWAL(true);
-
-            fOptions = new FlushOptions();
-            fOptions.setWaitForFlush(true);
-
-            dbName = this.topic + "." + this.partition;
-            dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
-
-            db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
-        }
-
-        private RocksDB openDB(File dir, Options options, int ttl) {
-            try {
-                if (ttl == TTL_NOT_USED) {
-                    dir.getParentFile().mkdirs();
-                    return RocksDB.open(options, dir.toString());
-                } else {
-                    throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
-                    // TODO: support TTL with change log?
-                    // return TtlDB.open(options, dir.toString(), ttl, false);
-                }
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
-            }
-        }
-
-        @Override
-        public String name() {
-            return this.topic;
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            try {
-                return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void put(K key, V value) {
-            try {
-                if (value == null) {
-                    db.remove(wOptions, serdes.rawKey(key));
-                } else {
-                    db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
-                }
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
-        }
-
-        @Override
-        public V delete(K key) {
-            V value = get(key);
-            put(key, null);
-            return value;
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            RocksIterator innerIter = db.newIterator();
-            innerIter.seekToFirst();
-            return new RocksDbIterator<K, V>(innerIter, serdes);
-        }
-
-        @Override
-        public void flush() {
-            try {
-                db.flush(fOptions);
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing flush from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void close() {
-            flush();
-            db.close();
-        }
-
-        private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
-            private final RocksIterator iter;
-            private final Serdes<K, V> serdes;
-
-            public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
-                this.iter = iter;
-                this.serdes = serdes;
-            }
-
-            protected byte[] peekRawKey() {
-                return iter.key();
-            }
-
-            protected Entry<K, V> getEntry() {
-                return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
-            }
-
-            @Override
-            public boolean hasNext() {
-                return iter.isValid();
-            }
-
-            @Override
-            public Entry<K, V> next() {
-                if (!hasNext())
-                    throw new NoSuchElementException();
-
-                Entry<K, V> entry = this.getEntry();
-                iter.next();
-                return entry;
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException("RocksDB iterator does not support remove");
-            }
-
-            @Override
-            public void close() {
-            }
-
-        }
-
-        private static class LexicographicComparator implements Comparator<byte[]> {
-
-            @Override
-            public int compare(byte[] left, byte[] right) {
-                for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
-                    int leftByte = left[i] & 0xff;
-                    int rightByte = right[j] & 0xff;
-                    if (leftByte != rightByte) {
-                        return leftByte - rightByte;
-                    }
-                }
-                return left.length - right.length;
-            }
-        }
-
-        private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
-            // RocksDB's JNI interface does not expose getters/setters that allow the
-            // comparator to be pluggable, and the default is lexicographic, so it's
-            // safe to just force lexicographic comparator here for now.
-            private final Comparator<byte[]> comparator = new LexicographicComparator();
-            byte[] rawToKey;
-
-            public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
-                    K from, K to) {
-                super(iter, serdes);
-                iter.seek(serdes.rawKey(from));
-                this.rawToKey = serdes.rawKey(to);
-            }
-
-            @Override
-            public boolean hasNext() {
-                return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
new file mode 100644
index 0000000..fe8f00a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ *
+ * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
+ */
+public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
+
+    private final String name;
+    private final Serdes serdes;
+    private final Time time;
+
+    protected RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+        this.name = name;
+        this.serdes = serdes;
+        this.time = time;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public StateStore get() {
+        return new MeteredKeyValueStore<K, V>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
+    }
+
+    private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
+
+        private static final int TTL_NOT_USED = -1;
+
+        // TODO: these values should be configurable
+        private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
+        private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
+        private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
+        private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
+        private static final long BLOCK_SIZE = 4096L;
+        private static final int TTL_SECONDS = TTL_NOT_USED;
+        private static final int MAX_WRITE_BUFFERS = 3;
+        private static final String DB_FILE_DIR = "rocksdb";
+
+        private final Serdes<K, V> serdes;
+        private final String topic;
+
+        private final Options options;
+        private final WriteOptions wOptions;
+        private final FlushOptions fOptions;
+
+        private ProcessorContext context;
+        private int partition;
+        private String dbName;
+        private String dirName;
+        private RocksDB db;
+
+        public RocksDBStore(String name, Serdes<K, V> serdes) {
+            this.topic = name;
+            this.serdes = serdes;
+
+            // initialize the rocksdb options
+            BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+            tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
+            tableConfig.setBlockSize(BLOCK_SIZE);
+
+            options = new Options();
+            options.setTableFormatConfig(tableConfig);
+            options.setWriteBufferSize(WRITE_BUFFER_SIZE);
+            options.setCompressionType(COMPRESSION_TYPE);
+            options.setCompactionStyle(COMPACTION_STYLE);
+            options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
+            options.setCreateIfMissing(true);
+            options.setErrorIfExists(false);
+
+            wOptions = new WriteOptions();
+            wOptions.setDisableWAL(true);
+
+            fOptions = new FlushOptions();
+            fOptions.setWaitForFlush(true);
+        }
+
+        public void init(ProcessorContext context) {
+            this.context = context;
+            this.partition = context.id().partition;
+            this.dbName = this.topic + "." + this.partition;
+            this.dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
+            this.db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
+        }
+
+        private RocksDB openDB(File dir, Options options, int ttl) {
+            try {
+                if (ttl == TTL_NOT_USED) {
+                    dir.getParentFile().mkdirs();
+                    return RocksDB.open(options, dir.toString());
+                } else {
+                    throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
+                    // TODO: support TTL with change log?
+                    // return TtlDB.open(options, dir.toString(), ttl, false);
+                }
+            } catch (RocksDBException e) {
+                // TODO: this needs to be handled more accurately
+                throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
+            }
+        }
+
+        @Override
+        public String name() {
+            return this.topic;
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+
+        @Override
+        public V get(K key) {
+            try {
+                return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
+            } catch (RocksDBException e) {
+                // TODO: this needs to be handled more accurately
+                throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
+            }
+        }
+
+        @Override
+        public void put(K key, V value) {
+            try {
+                if (value == null) {
+                    db.remove(wOptions, serdes.rawKey(key));
+                } else {
+                    db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
+                }
+            } catch (RocksDBException e) {
+                // TODO: this needs to be handled more accurately
+                throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
+            }
+        }
+
+        @Override
+        public void putAll(List<Entry<K, V>> entries) {
+            for (Entry<K, V> entry : entries)
+                put(entry.key(), entry.value());
+        }
+
+        @Override
+        public V delete(K key) {
+            V value = get(key);
+            put(key, null);
+            return value;
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(K from, K to) {
+            return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            RocksIterator innerIter = db.newIterator();
+            innerIter.seekToFirst();
+            return new RocksDbIterator<K, V>(innerIter, serdes);
+        }
+
+        @Override
+        public void flush() {
+            try {
+                db.flush(fOptions);
+            } catch (RocksDBException e) {
+                // TODO: this needs to be handled more accurately
+                throw new KafkaException("Error while executing flush from store " + this.topic, e);
+            }
+        }
+
+        @Override
+        public void close() {
+            flush();
+            db.close();
+        }
+
+        private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
+            private final RocksIterator iter;
+            private final Serdes<K, V> serdes;
+
+            public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
+                this.iter = iter;
+                this.serdes = serdes;
+            }
+
+            protected byte[] peekRawKey() {
+                return iter.key();
+            }
+
+            protected Entry<K, V> getEntry() {
+                return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iter.isValid();
+            }
+
+            @Override
+            public Entry<K, V> next() {
+                if (!hasNext())
+                    throw new NoSuchElementException();
+
+                Entry<K, V> entry = this.getEntry();
+                iter.next();
+                return entry;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("RocksDB iterator does not support remove");
+            }
+
+            @Override
+            public void close() {
+            }
+
+        }
+
+        private static class LexicographicComparator implements Comparator<byte[]> {
+
+            @Override
+            public int compare(byte[] left, byte[] right) {
+                for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
+                    int leftByte = left[i] & 0xff;
+                    int rightByte = right[j] & 0xff;
+                    if (leftByte != rightByte) {
+                        return leftByte - rightByte;
+                    }
+                }
+                return left.length - right.length;
+            }
+        }
+
+        private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
+            // RocksDB's JNI interface does not expose getters/setters that allow the
+            // comparator to be pluggable, and the default is lexicographic, so it's
+            // safe to just force lexicographic comparator here for now.
+            private final Comparator<byte[]> comparator = new LexicographicComparator();
+            byte[] rawToKey;
+
+            public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
+                    K from, K to) {
+                super(iter, serdes);
+                iter.seek(serdes.rawKey(from));
+                this.rawToKey = serdes.rawKey(to);
+            }
+
+            @Override
+            public boolean hasNext() {
+                return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
index 540d763..31bd439 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamingConfig;
 
 final class Serdes<K, V> {
 
@@ -64,7 +64,7 @@ final class Serdes<K, V> {
 
     /**
      * Create a context for serialization using the specified serializers and deserializers.
-     * 
+     *
      * @param topic the name of the topic
      * @param keySerializer the serializer for keys; may not be null
      * @param keyDeserializer the deserializer for keys; may not be null
@@ -83,47 +83,44 @@ final class Serdes<K, V> {
 
     /**
      * Create a context for serialization using the specified serializers and deserializers, or if any of them are null the
-     * corresponding {@link ProcessorContext}'s default serializer or deserializer, which
+     * corresponding {@link StreamingConfig}'s serializer or deserializer, which
      * <em>must</em> match the key and value types used as parameters for this object.
-     * 
+     *
      * @param topic the name of the topic
-     * @param keySerializer the serializer for keys; may be null if the {@link ProcessorContext#keySerializer() default
+     * @param keySerializer the serializer for keys; may be null if the {@link StreamingConfig#keySerializer() default
      *            key serializer} should be used
-     * @param keyDeserializer the deserializer for keys; may be null if the {@link ProcessorContext#keyDeserializer() default
+     * @param keyDeserializer the deserializer for keys; may be null if the {@link StreamingConfig#keyDeserializer() default
      *            key deserializer} should be used
-     * @param valueSerializer the serializer for values; may be null if the {@link ProcessorContext#valueSerializer() default
+     * @param valueSerializer the serializer for values; may be null if the {@link StreamingConfig#valueSerializer() default
      *            value serializer} should be used
-     * @param valueDeserializer the deserializer for values; may be null if the {@link ProcessorContext#valueDeserializer()
+     * @param valueDeserializer the deserializer for values; may be null if the {@link StreamingConfig#valueDeserializer()
      *            default value deserializer} should be used
-     * @param context the processing context
+     * @param config the streaming config
      */
     @SuppressWarnings("unchecked")
     public Serdes(String topic,
             Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
             Serializer<V> valueSerializer, Deserializer<V> valueDeserializer,
-            ProcessorContext context) {
+            StreamingConfig config) {
         this.topic = topic;
-        this.keySerializer = keySerializer != null ? keySerializer : (Serializer<K>) context.keySerializer();
-        this.keyDeserializer = keyDeserializer != null ? keyDeserializer : (Deserializer<K>) context.keyDeserializer();
-        this.valueSerializer = valueSerializer != null ? valueSerializer : (Serializer<V>) context.valueSerializer();
-        this.valueDeserializer = valueDeserializer != null ? valueDeserializer : (Deserializer<V>) context.valueDeserializer();
+
+        this.keySerializer = keySerializer != null ? keySerializer : config.keySerializer();
+        this.keyDeserializer = keyDeserializer != null ? keyDeserializer : config.keyDeserializer();
+        this.valueSerializer = valueSerializer != null ? valueSerializer : config.valueSerializer();
+        this.valueDeserializer = valueDeserializer != null ? valueDeserializer : config.valueDeserializer();
     }
 
     /**
-     * Create a context for serialization using the {@link ProcessorContext}'s default serializers and deserializers, which
+     * Create a context for serialization using the {@link StreamingConfig}'s serializers and deserializers, which
      * <em>must</em> match the key and value types used as parameters for this object.
-     * 
+     *
      * @param topic the name of the topic
-     * @param context the processing context
+     * @param config the streaming config
      */
     @SuppressWarnings("unchecked")
     public Serdes(String topic,
-            ProcessorContext context) {
-        this.topic = topic;
-        this.keySerializer = (Serializer<K>) context.keySerializer();
-        this.keyDeserializer = (Deserializer<K>) context.keyDeserializer();
-        this.valueSerializer = (Serializer<V>) context.valueSerializer();
-        this.valueDeserializer = (Deserializer<V>) context.valueDeserializer();
+                  StreamingConfig config) {
+        this(topic, null, null, null, null, config);
     }
 
     public Deserializer<K> keyDeserializer() {
@@ -161,4 +158,4 @@ final class Serdes<K, V> {
     public byte[] rawValue(V value) {
         return valueSerializer.serialize(topic, value);
     }
-}
\ No newline at end of file
+}