You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/07 08:39:54 UTC

[1/2] kafka git commit: KAFKA-5650; add StateStoreBuilder interface and implementations

Repository: kafka
Updated Branches:
  refs/heads/trunk 667cd60dc -> 9cbb9f093


http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
new file mode 100644
index 0000000..a0500b6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import static org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS;
+
+public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier {
+    private final String name;
+    private final long retentionPeriod;
+    private final int segments;
+    private final long windowSize;
+    private final boolean retainDuplicates;
+
+    public RocksDbWindowBytesStoreSupplier(final String name,
+                                           final long retentionPeriod,
+                                           final int segments,
+                                           final long windowSize,
+                                           final boolean retainDuplicates) {
+        if (segments < MIN_SEGMENTS) {
+            throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
+        }
+        this.name = name;
+        this.retentionPeriod = retentionPeriod;
+        this.segments = segments;
+        this.windowSize = windowSize;
+        this.retainDuplicates = retainDuplicates;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public WindowStore<Bytes, byte[]> get() {
+        final RocksDBSegmentedBytesStore segmentedBytesStore = new RocksDBSegmentedBytesStore(
+                name,
+                retentionPeriod,
+                segments,
+                new WindowKeySchema()
+        );
+        return RocksDBWindowStore.bytesStore(segmentedBytesStore,
+                                             retainDuplicates,
+                                             windowSize);
+
+    }
+
+    @Override
+    public String metricsScope() {
+        return "rocksdb-window";
+    }
+
+    @Override
+    public int segments() {
+        return segments;
+    }
+
+    @Override
+    public long windowSize() {
+        return windowSize;
+    }
+
+    @Override
+    public boolean retainDuplicates() {
+        return retainDuplicates;
+    }
+
+    @Override
+    public long retentionPeriod() {
+        return retentionPeriod;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
new file mode 100644
index 0000000..61919c3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+
+
+public class SessionStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, SessionStore<K, V>> {
+
+    private final SessionBytesStoreSupplier storeSupplier;
+
+    public SessionStoreBuilder(final SessionBytesStoreSupplier storeSupplier,
+                               final Serde<K> keySerde,
+                               final Serde<V> valueSerde,
+                               final Time time) {
+        super(storeSupplier.name(), keySerde, valueSerde, time);
+        this.storeSupplier = storeSupplier;
+    }
+
+    @Override
+    public SessionStore<K, V> build() {
+        return new MeteredSessionStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+                                         storeSupplier.metricsScope(),
+                                         keySerde,
+                                         valueSerde,
+                                         time);
+    }
+
+    private SessionStore<Bytes, byte[]> maybeWrapCaching(final SessionStore<Bytes, byte[]> inner) {
+        if (!enableCaching) {
+            return inner;
+        }
+        return new CachingSessionStore<>(inner,
+                                         keySerde,
+                                         valueSerde,
+                                         storeSupplier.segmentIntervalMs());
+    }
+
+    private SessionStore<Bytes, byte[]> maybeWrapLogging(final SessionStore<Bytes, byte[]> inner) {
+        if (!enableLogging) {
+            return inner;
+        }
+        return new ChangeLoggingSessionBytesStore(inner);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
new file mode 100644
index 0000000..97b4883
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+
+public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {
+
+    private final WindowBytesStoreSupplier storeSupplier;
+
+    public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier,
+                              final Serde<K> keySerde,
+                              final Serde<V> valueSerde,
+                              final Time time) {
+        super(storeSupplier.name(), keySerde, valueSerde, time);
+        this.storeSupplier = storeSupplier;
+    }
+
+    @Override
+    public WindowStore<K, V> build() {
+        return new MeteredWindowStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+                                        storeSupplier.metricsScope(),
+                                        time,
+                                        keySerde,
+                                        valueSerde);
+    }
+
+    private WindowStore<Bytes, byte[]> maybeWrapCaching(final WindowStore<Bytes, byte[]> inner) {
+        if (!enableCaching) {
+            return inner;
+        }
+        return new CachingWindowStore<>(inner,
+                                        keySerde,
+                                        valueSerde,
+                                        storeSupplier.windowSize(),
+                                        storeSupplier.segments());
+    }
+
+    private WindowStore<Bytes, byte[]> maybeWrapLogging(final WindowStore<Bytes, byte[]> inner) {
+        if (!enableLogging) {
+            return inner;
+        }
+        return new ChangeLoggingWindowBytesStore(inner, storeSupplier.retainDuplicates());
+    }
+
+    public long retentionPeriod() {
+        return storeSupplier.retentionPeriod();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 12947d8..dbdc854 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -23,10 +23,12 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.ProcessorTopologyTestDriver;
+import org.easymock.EasyMock;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -42,6 +44,8 @@ import static org.junit.Assert.fail;
 
 public class TopologyTest {
 
+    private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
+    private final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
     private final Topology topology = new Topology();
     private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
 
@@ -203,38 +207,52 @@ public class TopologyTest {
 
     @Test(expected = TopologyException.class)
     public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
-        topology.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
+        mockStoreBuilder();
+        EasyMock.replay(storeBuilder);
+        topology.addStateStore(storeBuilder, "no-such-processsor");
     }
 
     @Test
     public void shouldNotAllowToAddStateStoreToSource() {
+        mockStoreBuilder();
+        EasyMock.replay(storeBuilder);
         topology.addSource("source-1", "topic-1");
         try {
-            topology.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
+            topology.addStateStore(storeBuilder, "source-1");
             fail("Should have thrown TopologyException for adding store to source node");
         } catch (final TopologyException expected) { }
     }
 
     @Test
     public void shouldNotAllowToAddStateStoreToSink() {
+        mockStoreBuilder();
+        EasyMock.replay(storeBuilder);
         topology.addSink("sink-1", "topic-1");
         try {
-            topology.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
+            topology.addStateStore(storeBuilder, "sink-1");
             fail("Should have thrown TopologyException for adding store to sink node");
         } catch (final TopologyException expected) { }
     }
 
+    private void mockStoreBuilder() {
+        EasyMock.expect(storeBuilder.name()).andReturn("store").anyTimes();
+        EasyMock.expect(storeBuilder.logConfig()).andReturn(Collections.emptyMap());
+        EasyMock.expect(storeBuilder.loggingEnabled()).andReturn(false);
+    }
+
     @Test
     public void shouldNotAllowToAddStoreWithSameName() {
-        topology.addStateStore(new MockStateStoreSupplier("store", false));
+        mockStoreBuilder();
+        EasyMock.replay(storeBuilder);
+        topology.addStateStore(storeBuilder);
         try {
-            topology.addStateStore(new MockStateStoreSupplier("store", false));
+            topology.addStateStore(storeBuilder);
             fail("Should have thrown TopologyException for duplicate store name");
         } catch (final TopologyException expected) { }
     }
 
     @Test(expected = TopologyBuilderException.class)
-    public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
+    public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
         final String badNodeName = "badGuy";
@@ -243,12 +261,14 @@ public class TopologyTest {
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         final StreamsConfig streamsConfig = new StreamsConfig(config);
-
+        mockStoreBuilder();
+        EasyMock.expect(storeBuilder.build()).andReturn(new MockStateStoreSupplier.MockStateStore("store", false));
+        EasyMock.replay(storeBuilder);
         topology
             .addSource(sourceNodeName, "topic")
             .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
             .addStateStore(
-                Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+                storeBuilder,
                 goodNodeName)
             .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
 
@@ -292,8 +312,10 @@ public class TopologyTest {
 
     @Test(expected = TopologyException.class)
     public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
+        EasyMock.expect(globalStoreBuilder.name()).andReturn("anyName").anyTimes();
+        EasyMock.replay(globalStoreBuilder);
         topology.addGlobalStore(
-            new MockStateStoreSupplier("anyName", false, false),
+            globalStoreBuilder,
             "sameName",
             null,
             null,
@@ -611,7 +633,10 @@ public class TopologyTest {
         topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
         if (newStores) {
             for (final String store : storeNames) {
-                topology.addStateStore(new MockStateStoreSupplier(store, false), processorName);
+                final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
+                EasyMock.expect(storeBuilder.name()).andReturn(store).anyTimes();
+                EasyMock.replay(storeBuilder);
+                topology.addStateStore(storeBuilder, processorName);
             }
         } else {
             topology.connectProcessorAndStateStores(processorName, storeNames);
@@ -651,8 +676,11 @@ public class TopologyTest {
                                                                 final String sourceName,
                                                                 final String globalTopicName,
                                                                 final String processorName) {
+        final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
+        EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).anyTimes();
+        EasyMock.replay(globalStoreBuilder);
         topology.addGlobalStore(
-            new MockStateStoreSupplier(globalStoreName, false, false),
+            globalStoreBuilder,
             sourceName,
             null,
             null,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 9bd8756..91edac5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -470,7 +470,7 @@ public class InternalTopologyBuilderTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAddNullStateStoreSupplier() throws Exception {
-        builder.addStateStore(null);
+        builder.addStateStore((StateStoreSupplier) null);
     }
 
     private Set<String> nodeNames(final Collection<ProcessorNode> nodes) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 66adbf5..700b243 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -18,11 +18,20 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
+import org.apache.kafka.streams.state.internals.RocksDBSessionStore;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.Map;
 
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -30,6 +39,7 @@ import static org.junit.Assert.fail;
 
 public class StoresTest {
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() throws Exception {
         final StateStoreSupplier supplier = Stores.create("store")
@@ -44,6 +54,7 @@ public class StoresTest {
         assertEquals("1000", config.get("retention.ms"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCreateInMemoryStoreSupplierNotLogged() throws Exception {
         final StateStoreSupplier supplier = Stores.create("store")
@@ -56,6 +67,7 @@ public class StoresTest {
         assertFalse(supplier.loggingEnabled());
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCreatePersistenStoreSupplierWithLoggedConfig() throws Exception {
         final StateStoreSupplier supplier = Stores.create("store")
@@ -70,6 +82,7 @@ public class StoresTest {
         assertEquals("1000", config.get("retention.ms"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCreatePersistenStoreSupplierNotLogged() throws Exception {
         final StateStoreSupplier supplier = Stores.create("store")
@@ -95,4 +108,53 @@ public class StoresTest {
          // ok
         }
     }
+
+    @Test
+    public void shouldCreateInMemoryKeyValueStore() {
+        assertThat(Stores.inMemoryKeyValueStore("memory").get(), instanceOf(InMemoryKeyValueStore.class));
+    }
+
+    @Test
+    public void shouldCreateMemoryNavigableCache() {
+        assertThat(Stores.lruMap("map", 10).get(), instanceOf(MemoryNavigableLRUCache.class));
+    }
+
+    @Test
+    public void shouldCreateRocksDbStore() {
+        assertThat(Stores.persistentKeyValueStore("store").get(), instanceOf(RocksDBStore.class));
+    }
+
+    @Test
+    public void shouldCreateRocksDbWindowStore() {
+        assertThat(Stores.persistentWindowStore("store", 1, 3, 1, false).get(), instanceOf(RocksDBWindowStore.class));
+    }
+
+    @Test
+    public void shouldCreateRocksDbSessionStore() {
+        assertThat(Stores.persistentSessionStore("store", 1).get(), instanceOf(RocksDBSessionStore.class));
+    }
+
+    @Test
+    public void shouldBuildWindowStore() {
+        final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3, 2, 3, true),
+                                                                      Serdes.String(),
+                                                                      Serdes.String()).build();
+        assertThat(store, not(nullValue()));
+    }
+
+    @Test
+    public void shouldBuildKeyValueStore() {
+        final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("name"),
+                                                                          Serdes.String(),
+                                                                          Serdes.String()).build();
+        assertThat(store, not(nullValue()));
+    }
+
+    @Test
+    public void shouldBuildSessionStore() {
+        final SessionStore<String, String> store = Stores.sessionStoreBuilder(Stores.persistentSessionStore("name", 10),
+                                                                       Serdes.String(),
+                                                                       Serdes.String()).build();
+        assertThat(store, not(nullValue()));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
new file mode 100644
index 0000000..2d378d8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+@RunWith(EasyMockRunner.class)
+public class KeyValueStoreBuilderTest {
+
+    @Mock(type = MockType.NICE)
+    private KeyValueBytesStoreSupplier supplier;
+    @Mock(type = MockType.NICE)
+    private KeyValueStore<Bytes, byte[]> inner;
+    private KeyValueStoreBuilder<String, String> builder;
+
+    @Before
+    public void setUp() throws Exception {
+        EasyMock.expect(supplier.get()).andReturn(inner);
+        EasyMock.expect(supplier.name()).andReturn("name");
+        EasyMock.replay(supplier);
+        builder = new KeyValueStoreBuilder<>(supplier,
+                                             Serdes.String(),
+                                             Serdes.String(),
+                                             new MockTime()
+        );
+
+    }
+
+    @Test
+    public void shouldHaveMeteredStoreAsOuterStore() {
+        final KeyValueStore<String, String> store = builder.build();
+        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreByDefault() {
+        final KeyValueStore<String, String> store = builder.build();
+        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        assertThat(next, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+    }
+
+    @Test
+    public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+        final KeyValueStore<String, String> store = builder.withLoggingDisabled().build();
+        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingStoreWhenEnabled() {
+        final KeyValueStore<String, String> store = builder.withCachingEnabled().build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        assertThat(wrapped, instanceOf(CachingKeyValueStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+        final KeyValueStore<String, String> store = builder
+                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        assertThat(wrapped, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+        assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+        final KeyValueStore<String, String> store = builder
+                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .withCachingEnabled()
+                .build();
+        final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+        final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore();
+        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        assertThat(caching, instanceOf(CachingKeyValueStore.class));
+        assertThat(changeLogging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+        assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfInnerIsNull() {
+        new KeyValueStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfKeySerdeIsNull() {
+        new KeyValueStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfValueSerdeIsNull() {
+        new KeyValueStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfTimeIsNull() {
+        new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfMetricsScopeIsNull() {
+        new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
new file mode 100644
index 0000000..621a1c2
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+@RunWith(EasyMockRunner.class)
+public class SessionStoreBuilderTest {
+
+    @Mock(type = MockType.NICE)
+    private SessionBytesStoreSupplier supplier;
+    @Mock(type = MockType.NICE)
+    private SessionStore<Bytes, byte[]> inner;
+    private SessionStoreBuilder<String, String> builder;
+
+    @Before
+    public void setUp() throws Exception {
+
+        EasyMock.expect(supplier.get()).andReturn(inner);
+        EasyMock.expect(supplier.name()).andReturn("name");
+        EasyMock.replay(supplier);
+
+        builder = new SessionStoreBuilder<>(supplier,
+                                            Serdes.String(),
+                                            Serdes.String(),
+                                            new MockTime()
+        );
+    }
+
+    @Test
+    public void shouldHaveMeteredStoreAsOuterStore() {
+        final SessionStore<String, String> store = builder.build();
+        assertThat(store, instanceOf(MeteredSessionStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreByDefault() {
+        final SessionStore<String, String> store = builder.build();
+        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        assertThat(next, instanceOf(ChangeLoggingSessionBytesStore.class));
+    }
+
+    @Test
+    public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+        final SessionStore<String, String> store = builder.withLoggingDisabled().build();
+        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingStoreWhenEnabled() {
+        final SessionStore<String, String> store = builder.withCachingEnabled().build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        assertThat(store, instanceOf(MeteredSessionStore.class));
+        assertThat(wrapped, instanceOf(CachingSessionStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+        final SessionStore<String, String> store = builder
+                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        assertThat(store, instanceOf(MeteredSessionStore.class));
+        assertThat(wrapped, instanceOf(ChangeLoggingSessionBytesStore.class));
+        assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+        final SessionStore<String, String> store = builder
+                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .withCachingEnabled()
+                .build();
+        final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+        final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore();
+        assertThat(store, instanceOf(MeteredSessionStore.class));
+        assertThat(caching, instanceOf(CachingSessionStore.class));
+        assertThat(changeLogging, instanceOf(ChangeLoggingSessionBytesStore.class));
+        assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfInnerIsNull() {
+        new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfKeySerdeIsNull() {
+        new SessionStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfValueSerdeIsNull() {
+        new SessionStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfTimeIsNull() {
+        new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfMetricsScopeIsNull() {
+        new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
new file mode 100644
index 0000000..25b8178
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+@RunWith(EasyMockRunner.class)
+public class WindowStoreBuilderTest {
+
+    @Mock(type = MockType.NICE)
+    private WindowBytesStoreSupplier supplier;
+    @Mock(type = MockType.NICE)
+    private WindowStore<Bytes, byte[]> inner;
+    private WindowStoreBuilder<String, String> builder;
+
+    @Before
+    public void setUp() throws Exception {
+        EasyMock.expect(supplier.get()).andReturn(inner);
+        EasyMock.expect(supplier.name()).andReturn("name");
+        EasyMock.replay(supplier);
+
+        builder = new WindowStoreBuilder<>(supplier,
+                                           Serdes.String(),
+                                           Serdes.String(),
+                                           new MockTime());
+
+    }
+
+    @Test
+    public void shouldHaveMeteredStoreAsOuterStore() {
+        final WindowStore<String, String> store = builder.build();
+        assertThat(store, instanceOf(MeteredWindowStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreByDefault() {
+        final WindowStore<String, String> store = builder.build();
+        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        assertThat(next, instanceOf(ChangeLoggingWindowBytesStore.class));
+    }
+
+    @Test
+    public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+        final WindowStore<String, String> store = builder.withLoggingDisabled().build();
+        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingStoreWhenEnabled() {
+        final WindowStore<String, String> store = builder.withCachingEnabled().build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        assertThat(store, instanceOf(MeteredWindowStore.class));
+        assertThat(wrapped, instanceOf(CachingWindowStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+        final WindowStore<String, String> store = builder
+                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        assertThat(store, instanceOf(MeteredWindowStore.class));
+        assertThat(wrapped, instanceOf(ChangeLoggingWindowBytesStore.class));
+        assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+        final WindowStore<String, String> store = builder
+                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .withCachingEnabled()
+                .build();
+        final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+        final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore();
+        assertThat(store, instanceOf(MeteredWindowStore.class));
+        assertThat(caching, instanceOf(CachingWindowStore.class));
+        assertThat(changeLogging, instanceOf(ChangeLoggingWindowBytesStore.class));
+        assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfInnerIsNull() {
+        new WindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfKeySerdeIsNull() {
+        new WindowStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfValueSerdeIsNull() {
+        new WindowStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfTimeIsNull() {
+        new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
+    }
+
+}
\ No newline at end of file


[2/2] kafka git commit: KAFKA-5650; add StateStoreBuilder interface and implementations

Posted by da...@apache.org.
KAFKA-5650; add StateStoreBuilder interface and implementations

Part of KIP-182

- Add `StateStoreBuilder` interface and `WindowStateStoreBuilder`, `KeyValueStateStoreBuilder`, and `SessionStateStoreBuilder` implementations
- Add `StoreSupplier`, `WindowBytesStoreSupplier`, `KeyValueBytesStoreSupplier`, `SessionBytesStoreSupplier` interfaces and implementations
- Add new methods to `Stores` to create the newly added `StoreSupplier` and `StateStoreBuilder` implementations
- Update `Topology` and `InternalTopology` to use the interfaces

Author: Damian Guy <da...@gmail.com>

Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3767 from dguy/kafka-5650


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9cbb9f09
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9cbb9f09
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9cbb9f09

Branch: refs/heads/trunk
Commit: 9cbb9f0939c88868bf33ae340c5fa756ee2965e8
Parents: 667cd60
Author: Damian Guy <da...@gmail.com>
Authored: Thu Sep 7 09:39:46 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Sep 7 09:39:46 2017 +0100

----------------------------------------------------------------------
 docs/streams/developer-guide.html               |  56 ++--
 .../wordcount/WordCountProcessorDemo.java       |   6 +-
 .../java/org/apache/kafka/streams/Topology.java |  24 +-
 .../streams/processor/StateStoreSupplier.java   |   2 +
 .../internals/InternalTopologyBuilder.java      | 295 +++++++++++++++----
 .../state/KeyValueBytesStoreSupplier.java       |  26 ++
 .../state/SessionBytesStoreSupplier.java        |  33 +++
 .../kafka/streams/state/StoreBuilder.java       |  82 ++++++
 .../kafka/streams/state/StoreSupplier.java      |  47 +++
 .../org/apache/kafka/streams/state/Stores.java  | 148 ++++++++++
 .../streams/state/WindowBytesStoreSupplier.java |  56 ++++
 .../state/internals/AbstractStoreBuilder.java   |  84 ++++++
 .../state/internals/KeyValueStoreBuilder.java   |  62 ++++
 .../internals/RocksDBKeyValueStoreSupplier.java |  49 +--
 .../state/internals/RocksDBSessionStore.java    |   2 +-
 .../internals/RocksDBSessionStoreSupplier.java  |  51 +---
 .../state/internals/RocksDBWindowStore.java     |   2 +-
 .../internals/RocksDBWindowStoreSupplier.java   |  62 ++--
 .../RocksDbKeyValueBytesStoreSupplier.java      |  48 +++
 .../RocksDbSessionBytesStoreSupplier.java       |  59 ++++
 .../RocksDbWindowBytesStoreSupplier.java        |  90 ++++++
 .../state/internals/SessionStoreBuilder.java    |  63 ++++
 .../state/internals/WindowStoreBuilder.java     |  68 +++++
 .../org/apache/kafka/streams/TopologyTest.java  |  52 +++-
 .../internals/InternalTopologyBuilderTest.java  |   2 +-
 .../apache/kafka/streams/state/StoresTest.java  |  62 ++++
 .../internals/KeyValueStoreBuilderTest.java     | 141 +++++++++
 .../internals/SessionStoreBuilderTest.java      | 141 +++++++++
 .../state/internals/WindowStoreBuilderTest.java | 135 +++++++++
 29 files changed, 1716 insertions(+), 232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 10220fb..a140b46 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -320,23 +320,24 @@ You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code>
 <h4> <a id="disable-changelogs" href="#disable-changelogs">Enable / Disable Fault Tolerance of State Stores (Store Changelogs)</a></h4>
 
 <p>
-  You can enable or disable fault tolerance for a state store by enabling or disabling, respectively, the changelogging of the store through <code>enableLogging()</code> and <code>disableLogging()</code>. 
-  You can also fine-tune the associated topic’s configuration if needed.
+    You can enable or disable fault tolerance for a state store by enabling or disabling, respectively ,the changelogging of the store through <code>StateStoreBuilder#withLoggingEnabled(Map&lt;String, String&gt;)</code>
+    and <code>StateStoreBuilder#withLoggingDisabled()</code>.
+    You can also fine-tune the associated topic’s configuration if needed.
 </p>
 
 <p>Example for disabling fault-tolerance:</p>
 
 <pre class="brush: java;">
 
-  import org.apache.kafka.streams.processor.StateStoreSupplier;
+  import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
+  import org.apache.kafka.streams.processor.state.StateStoreBuilder;
   import org.apache.kafka.streams.state.Stores;
 
-  StateStoreSupplier countStoreSupplier = Stores.create("Counts")
-              .withKeys(Serdes.String())
-              .withValues(Serdes.Long())
-              .persistent()
-              .disableLogging() // disable backing up the store to a changelog topic
-              .build();
+  KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts");
+  StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
+                                                          Serdes.String(),
+                                                          Serdes.Long())
+                                    .withLoggingDisabled(); // disable backing up the store to a changelog topic
 
 </pre>
 
@@ -351,19 +352,20 @@ You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code>
 
 <pre class="brush: java;">
 
-  import org.apache.kafka.streams.processor.StateStoreSupplier;
+  import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
+  import org.apache.kafka.streams.processor.state.StateStoreBuilder;
   import org.apache.kafka.streams.state.Stores;
 
   Map&lt;String, String&gt; changelogConfig = new HashMap();
   // override min.insync.replicas
   changelogConfig.put("min.insyc.replicas", "1")
 
-  StateStoreSupplier countStoreSupplier = Stores.create("Counts")
-              .withKeys(Serdes.String())
-              .withValues(Serdes.Long())
-              .persistent()
-              .enableLogging(changelogConfig) // enable changelogging, with custom changelog settings
-              .build();
+  KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts");
+  StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
+                                                          Serdes.String(),
+                                                          Serdes.Long())
+                                    .withLoggingEnabled(changelogConfig); // enable changelogging, with custom changelog settings
+
 
 </pre>
 
@@ -376,7 +378,7 @@ You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code>
 </p>
 
 <p>
-  In addition to the actual store, you also need to provide a &quot;factory&quot; for the store by implementing the <code>org.apache.kafka.streams.processor.StateStoreSupplier</code> interface, which Kafka Streams uses to create instances of your store.
+  In addition to the actual store, you also need to provide a &quot;factory&quot; for the store by implementing the <code>org.apache.kafka.streams.processor.state.StoreSupplier</code> interface, which Kafka Streams uses to create instances of your store.
 </p>
 
 <p>
@@ -2244,7 +2246,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
         <li>Your custom state store must implement <code>StateStore</code>.</li>
         <li>You should have an interface to represent the operations available on the store.</li>
         <li>It is recommended that you also provide an interface that restricts access to read-only operations so users of this API can't mutate the state of your running Kafka Streams application out-of-band.</li>
-        <li>You also need to provide an implementation of <code>StateStoreSupplier</code> for creating instances of your store.</li>
+        <li>You also need to provide an implementation of <code>StoreSupplier</code> for creating instances of your store.</li>
     </ol>
 
     <p>
@@ -2266,7 +2268,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
             V read(K key);
           }
 
-          public class MyCustomStoreSupplier implements StateStoreSupplier {
+          public class MyCustomStoreSupplier implements StoreSupplier {
             // implementation of the supplier for MyCustomStore
           }
         </pre>
@@ -2655,18 +2657,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
         might not want to use the unified record cache for both state store and forwarding downstream.
     </p>
     <p>
-        Following from the example first shown in section <a href="#streams_processor_statestore">State Stores</a>, to enable caching, you can
-        add the <code>enableCaching</code> call (note that caches are disabled by default and there is no explicit <code>disableCaching</code>
-        call) :
+        Following from the example first shown in section <a href="#streams_processor_statestore">State Stores</a>, to enable caching,
+        you first create a <code>StateStoreBuilder</code> and then call <code>withCachingEnabled</code> (note that caches
+        are disabled by default and there is no explicit <code>withCachingDisabled</code> call) :
     </p>
     <pre class="brush: java;">
-        StateStoreSupplier countStoreSupplier =
-            Stores.create("Counts")
-                .withKeys(Serdes.String())
-                .withValues(Serdes.Long())
-                .persistent()
-                .enableCaching()
-                .build();
+        KeyValueBytesStoreSupplier countSupplier = Stores.persistentKeyValueStore("Counts");
+        StateStoreBuilder&lt;KeyValueStore&lt;String, Long&gt;&gt; builder = Stores.keyValueStoreBuilder(countSupplier, Serdes.String(), Serdes.Long());
+        builder.withCachingEnabled()
     </pre>
 
     <h4><a id="streams_developer-guide_memory-management_other_memory_usage" href="#streams_developer-guide_memory-management_other_memory_usage">Other memory usage</a></h4>

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index b0b8be5..cfa2137 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -123,7 +123,11 @@ public class WordCountProcessorDemo {
         builder.addSource("Source", "streams-plaintext-input");
 
         builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
-        builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
+        builder.addStateStore(Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("Counts"),
+                Serdes.String(),
+                Serdes.Integer()),
+                              "Process");
 
         builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index e8f7d23..386aacf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
@@ -32,7 +31,8 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 
 import java.util.regex.Pattern;
 
@@ -431,7 +431,7 @@ public class Topology {
      * <p>
      * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
      * the named Kafka topic's partitions.
-     * Such control is often useful with topologies that use {@link #addStateStore(StateStoreSupplier, String...) state
+     * Such control is often useful with topologies that use {@link #addStateStore(StoreBuilder, String...) state
      * stores} in its processors.
      * In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
      * records among partitions using Kafka's default partitioning logic.
@@ -537,14 +537,14 @@ public class Topology {
     /**
      * Adds a state store.
      *
-     * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+     * @param storeBuilder the storeBuilder used to obtain this state store {@link StateStore} instance
      * @param processorNames the names of the processors that should be able to access the provided store
      * @return itself
      * @throws TopologyException if state store supplier is already added
      */
-    public synchronized Topology addStateStore(final StateStoreSupplier supplier,
+    public synchronized Topology addStateStore(final StoreBuilder storeBuilder,
                                                final String... processorNames) {
-        internalTopologyBuilder.addStateStore(supplier, processorNames);
+        internalTopologyBuilder.addStateStore(storeBuilder, processorNames);
         return this;
     }
 
@@ -561,7 +561,7 @@ public class Topology {
      * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
      * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      *
-     * @param storeSupplier         user defined state store supplier
+     * @param storeBuilder          user defined state store builder
      * @param sourceName            name of the {@link SourceNode} that will be automatically added
      * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
      * @param valueDeserializer     the {@link Deserializer} to deserialize values with
@@ -571,14 +571,14 @@ public class Topology {
      * @return itself
      * @throws TopologyException if the processor of state is already registered
      */
-    public synchronized Topology addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+    public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder,
                                                 final String sourceName,
                                                 final Deserializer keyDeserializer,
                                                 final Deserializer valueDeserializer,
                                                 final String topic,
                                                 final String processorName,
                                                 final ProcessorSupplier stateUpdateSupplier) {
-        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
+        internalTopologyBuilder.addGlobalStore(storeBuilder, sourceName, null, keyDeserializer,
             valueDeserializer, topic, processorName, stateUpdateSupplier);
         return this;
     }
@@ -595,7 +595,7 @@ public class Topology {
      * records forwarded from the {@link SourceNode}.
      * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
      *
-     * @param storeSupplier         user defined state store supplier
+     * @param storeBuilder          user defined key value store builder
      * @param sourceName            name of the {@link SourceNode} that will be automatically added
      * @param timestampExtractor    the stateless timestamp extractor used for this source,
      *                              if not specified the default extractor defined in the configs will be used
@@ -607,7 +607,7 @@ public class Topology {
      * @return itself
      * @throws TopologyException if the processor of state is already registered
      */
-    public synchronized Topology addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+    public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder,
                                                 final String sourceName,
                                                 final TimestampExtractor timestampExtractor,
                                                 final Deserializer keyDeserializer,
@@ -615,7 +615,7 @@ public class Topology {
                                                 final String topic,
                                                 final String processorName,
                                                 final ProcessorSupplier stateUpdateSupplier) {
-        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
+        internalTopologyBuilder.addGlobalStore(storeBuilder, sourceName, timestampExtractor, keyDeserializer,
             valueDeserializer, topic, processorName, stateUpdateSupplier);
         return this;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
index 173dab9..536b194 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -22,7 +22,9 @@ import java.util.Map;
  * A state store supplier which can create one or more {@link StateStore} instances.
  *
  * @param <T> State store type
+ * @deprecated use {@link org.apache.kafka.streams.state.StoreSupplier}
  */
+@Deprecated
 public interface StateStoreSupplier<T extends StateStore> {
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index c7f70a0..da5fe38 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -28,6 +28,9 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -119,14 +122,107 @@ public class InternalTopologyBuilder {
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
-    private static class StateStoreFactory {
-        public final Set<String> users;
+    interface StateStoreFactory {
+        Set<String> users();
+        boolean loggingEnabled();
+        StateStore build();
+        String name();
+        boolean isWindowStore();
+        Map<String, String> logConfig();
+        long retentionPeriod();
+    }
+
+    private static abstract class AbstractStateStoreFactory implements StateStoreFactory {
+        private final Set<String> users = new HashSet<>();
+        private final String name;
+        private final boolean loggingEnabled;
+        private final boolean windowStore;
+        private final Map<String, String> logConfig;
+
+        AbstractStateStoreFactory(final String name,
+                                  final boolean loggingEnabled,
+                                  final boolean windowStore,
+                                  final Map<String, String> logConfig) {
+            this.name = name;
+            this.loggingEnabled = loggingEnabled;
+            this.windowStore = windowStore;
+            this.logConfig = logConfig;
+        }
+
+        @Override
+        public Set<String> users() {
+            return users;
+        }
+
+        @Override
+        public boolean loggingEnabled() {
+            return loggingEnabled;
+        }
+
+        @Override
+        public String name() {
+            return name;
+        }
 
-        public final StateStoreSupplier supplier;
+        @Override
+        public boolean isWindowStore() {
+            return windowStore;
+        }
+
+        @Override
+        public Map<String, String> logConfig() {
+            return logConfig;
+        }
+    }
 
-        StateStoreFactory(final StateStoreSupplier supplier) {
+    private static class StateStoreSupplierFactory extends AbstractStateStoreFactory {
+        private final StateStoreSupplier supplier;
+
+        StateStoreSupplierFactory(final StateStoreSupplier<?> supplier) {
+            super(supplier.name(),
+                  supplier.loggingEnabled(),
+                  supplier instanceof WindowStoreSupplier,
+                  supplier.logConfig());
             this.supplier = supplier;
-            users = new HashSet<>();
+
+        }
+
+        @Override
+        public StateStore build() {
+            return supplier.get();
+        }
+
+        @Override
+        public long retentionPeriod() {
+            if (!isWindowStore()) {
+                throw new IllegalStateException("retentionPeriod is not supported when not a window store");
+            }
+            return ((WindowStoreSupplier) supplier).retentionPeriod();
+        }
+    }
+
+    private static class StoreBuilderFactory extends AbstractStateStoreFactory {
+        private final StoreBuilder builder;
+
+        StoreBuilderFactory(final StoreBuilder<?> builder) {
+            super(builder.name(),
+                  builder.loggingEnabled(),
+                  builder instanceof WindowStoreBuilder,
+                  builder.logConfig());
+            this.builder = builder;
+        }
+
+        @Override
+        public StateStore build() {
+            return builder.build();
+        }
+
+        @Override
+        public long retentionPeriod() {
+            if (!isWindowStore()) {
+                throw new IllegalStateException("retentionPeriod is not supported when not a window store");
+            }
+            return ((WindowStoreBuilder) builder).retentionPeriod();
         }
     }
 
@@ -405,7 +501,7 @@ public class InternalTopologyBuilder {
             throw new TopologyException("StateStore " + supplier.name() + " is already added.");
         }
 
-        stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
+        stateFactories.put(supplier.name(), new StateStoreSupplierFactory(supplier));
 
         if (processorNames != null) {
             for (final String processorName : processorNames) {
@@ -414,6 +510,22 @@ public class InternalTopologyBuilder {
         }
     }
 
+    public final void addStateStore(final StoreBuilder storeBuilder,
+                                    final String... processorNames) {
+        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+        if (stateFactories.containsKey(storeBuilder.name())) {
+            throw new TopologyException("StateStore " + storeBuilder.name() + " is already added.");
+        }
+
+        stateFactories.put(storeBuilder.name(), new StoreBuilderFactory(storeBuilder));
+
+        if (processorNames != null) {
+            for (final String processorName : processorNames) {
+                connectProcessorAndStateStore(processorName, storeBuilder.name());
+            }
+        }
+    }
+
     public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
                                      final String sourceName,
                                      final TimestampExtractor timestampExtractor,
@@ -423,43 +535,52 @@ public class InternalTopologyBuilder {
                                      final String processorName,
                                      final ProcessorSupplier stateUpdateSupplier) {
         Objects.requireNonNull(storeSupplier, "store supplier must not be null");
-        Objects.requireNonNull(sourceName, "sourceName must not be null");
-        Objects.requireNonNull(topic, "topic must not be null");
-        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
-        Objects.requireNonNull(processorName, "processorName must not be null");
-        if (nodeFactories.containsKey(sourceName)) {
-            throw new TopologyException("Processor " + sourceName + " is already added.");
-        }
-        if (nodeFactories.containsKey(processorName)) {
-            throw new TopologyException("Processor " + processorName + " is already added.");
-        }
-        if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
-            throw new TopologyException("StateStore " + storeSupplier.name() + " is already added.");
-        }
-        if (storeSupplier.loggingEnabled()) {
-            throw new TopologyException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
-        }
-        if (sourceName.equals(processorName)) {
-            throw new TopologyException("sourceName and processorName must be different.");
-        }
-
+        final String name = storeSupplier.name();
+        validateGlobalStoreArguments(sourceName,
+                                     topic,
+                                     processorName,
+                                     stateUpdateSupplier,
+                                     name,
+                                     storeSupplier.loggingEnabled());
         validateTopicNotAlreadyRegistered(topic);
+        addGlobalStore(sourceName,
+                       timestampExtractor,
+                       keyDeserializer,
+                       valueDeserializer,
+                       topic,
+                       processorName,
+                       stateUpdateSupplier,
+                       name,
+                       storeSupplier.get());
+    }
 
-        globalTopics.add(topic);
-        final String[] topics = {topic};
-        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
-        nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
-        nodeGrouper.add(sourceName);
 
-        final String[] predecessors = {sourceName};
-        final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
-        nodeFactory.addStateStore(storeSupplier.name());
-        nodeFactories.put(processorName, nodeFactory);
-        nodeGrouper.add(processorName);
-        nodeGrouper.unite(processorName, predecessors);
+    public final void addGlobalStore(final KeyValueStoreBuilder storeBuilder,
+                                     final String sourceName,
+                                     final TimestampExtractor timestampExtractor,
+                                     final Deserializer keyDeserializer,
+                                     final Deserializer valueDeserializer,
+                                     final String topic,
+                                     final String processorName,
+                                     final ProcessorSupplier stateUpdateSupplier) {
+        Objects.requireNonNull(storeBuilder, "store builder must not be null");
+        validateGlobalStoreArguments(sourceName,
+                                     topic,
+                                     processorName,
+                                     stateUpdateSupplier,
+                                     storeBuilder.name(),
+                                     storeBuilder.loggingEnabled());
+        validateTopicNotAlreadyRegistered(topic);
 
-        globalStateStores.put(storeSupplier.name(), storeSupplier.get());
-        connectSourceStoreAndTopic(storeSupplier.name(), topic);
+        addGlobalStore(sourceName,
+                       timestampExtractor,
+                       keyDeserializer,
+                       valueDeserializer,
+                       topic,
+                       processorName,
+                       stateUpdateSupplier,
+                       storeBuilder.name(),
+                       storeBuilder.build());
     }
 
     private void validateTopicNotAlreadyRegistered(final String topic) {
@@ -521,6 +642,64 @@ public class InternalTopologyBuilder {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    private void validateGlobalStoreArguments(final String sourceName,
+                                              final String topic,
+                                              final String processorName,
+                                              final ProcessorSupplier stateUpdateSupplier,
+                                              final String storeName,
+                                              final boolean loggingEnabled) {
+        Objects.requireNonNull(sourceName, "sourceName must not be null");
+        Objects.requireNonNull(topic, "topic must not be null");
+        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
+        Objects.requireNonNull(processorName, "processorName must not be null");
+        if (nodeFactories.containsKey(sourceName)) {
+            throw new TopologyException("Processor " + sourceName + " is already added.");
+        }
+        if (nodeFactories.containsKey(processorName)) {
+            throw new TopologyException("Processor " + processorName + " is already added.");
+        }
+        if (stateFactories.containsKey(storeName) || globalStateStores.containsKey(storeName)) {
+            throw new TopologyException("StateStore " + storeName + " is already added.");
+        }
+        if (loggingEnabled) {
+            throw new TopologyException("StateStore " + storeName + " for global table must not have logging enabled.");
+        }
+        if (sourceName.equals(processorName)) {
+            throw new TopologyException("sourceName and processorName must be different.");
+        }
+    }
+
+    private void addGlobalStore(final String sourceName,
+                                final TimestampExtractor timestampExtractor,
+                                final Deserializer keyDeserializer,
+                                final Deserializer valueDeserializer,
+                                final String topic,
+                                final String processorName,
+                                final ProcessorSupplier stateUpdateSupplier,
+                                final String name,
+                                final KeyValueStore store) {
+        final String[] topics = {topic};
+        final String[] predecessors = {sourceName};
+        final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName,
+                                                                          predecessors,
+                                                                          stateUpdateSupplier);
+        globalTopics.add(topic);
+        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName,
+                                                            topics,
+                                                            null,
+                                                            timestampExtractor,
+                                                            keyDeserializer,
+                                                            valueDeserializer));
+        nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
+        nodeGrouper.add(sourceName);
+        nodeFactory.addStateStore(name);
+        nodeFactories.put(processorName, nodeFactory);
+        nodeGrouper.add(processorName);
+        nodeGrouper.unite(processorName, predecessors);
+        globalStateStores.put(name, store);
+        connectSourceStoreAndTopic(name, topic);
+    }
+
     private void connectProcessorAndStateStore(final String processorName,
                                                final String stateStoreName) {
         if (!stateFactories.containsKey(stateStoreName)) {
@@ -531,12 +710,12 @@ public class InternalTopologyBuilder {
         }
 
         final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
-        final Iterator<String> iter = stateStoreFactory.users.iterator();
+        final Iterator<String> iter = stateStoreFactory.users().iterator();
         if (iter.hasNext()) {
             final String user = iter.next();
             nodeGrouper.unite(user, processorName);
         }
-        stateStoreFactory.users.add(processorName);
+        stateStoreFactory.users().add(processorName);
 
         final NodeFactory nodeFactory = nodeFactories.get(processorName);
         if (nodeFactory instanceof ProcessorNodeFactory) {
@@ -723,22 +902,20 @@ public class InternalTopologyBuilder {
                     }
                     for (final String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
                         if (!stateStoreMap.containsKey(stateStoreName)) {
-                            final StateStore stateStore;
-
                             if (stateFactories.containsKey(stateStoreName)) {
-                                final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier;
-                                stateStore = supplier.get();
+                                final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
 
                                 // remember the changelog topic if this state store is change-logging enabled
-                                if (supplier.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
+                                if (stateStoreFactory.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
                                     final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, stateStoreName);
                                     storeToChangelogTopic.put(stateStoreName, changelogTopic);
                                 }
+                                stateStoreMap.put(stateStoreName, stateStoreFactory.build());
                             } else {
-                                stateStore = globalStateStores.get(stateStoreName);
+                                stateStoreMap.put(stateStoreName, globalStateStores.get(stateStoreName));
                             }
 
-                            stateStoreMap.put(stateStoreName, stateStore);
+
                         }
                     }
                 } else if (factory instanceof SourceNodeFactory) {
@@ -839,10 +1016,9 @@ public class InternalTopologyBuilder {
 
                 // if the node is connected to a state, add to the state topics
                 for (final StateStoreFactory stateFactory : stateFactories.values()) {
-                    final StateStoreSupplier supplier = stateFactory.supplier;
-                    if (supplier.loggingEnabled() && stateFactory.users.contains(node)) {
-                        final String name = ProcessorStateManager.storeChangelogTopic(applicationId, supplier.name());
-                        final InternalTopicConfig internalTopicConfig = createInternalTopicConfig(supplier, name);
+                    if (stateFactory.loggingEnabled() && stateFactory.users().contains(node)) {
+                        final String name = ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name());
+                        final InternalTopicConfig internalTopicConfig = createInternalTopicConfig(stateFactory, name);
                         stateChangelogTopics.put(name, internalTopicConfig);
                     }
                 }
@@ -891,19 +1067,20 @@ public class InternalTopologyBuilder {
             }
         }
     }
-    
-    private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?> supplier,
+
+    private InternalTopicConfig createInternalTopicConfig(final StateStoreFactory factory,
                                                           final String name) {
-        if (!(supplier instanceof WindowStoreSupplier)) {
-            return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
+        if (!factory.isWindowStore()) {
+            return new InternalTopicConfig(name,
+                                           Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                           factory.logConfig());
         }
 
-        final WindowStoreSupplier windowStoreSupplier = (WindowStoreSupplier) supplier;
         final InternalTopicConfig config = new InternalTopicConfig(name,
                                                                    Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
                                                                            InternalTopicConfig.CleanupPolicy.delete),
-                                                                   supplier.logConfig());
-        config.setRetentionMs(windowStoreSupplier.retentionPeriod());
+                                                                   factory.logConfig());
+        config.setRetentionMs(factory.retentionPeriod());
         return config;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
new file mode 100644
index 0000000..73e6732
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Bytes;
+
+/**
+ * A store supplier that can be used to create one or more {@link KeyValueStore KeyValueStore<Bytes, byte[]>} instances of type &lt;Byte, byte[]&gt;.
+ */
+public interface KeyValueBytesStoreSupplier extends StoreSupplier<KeyValueStore<Bytes, byte[]>> {
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
new file mode 100644
index 0000000..e5523da
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Bytes;
+
+/**
+ * A store supplier that can be used to create one or more {@link SessionStore SessionStore<Bytes, byte[]>>} instances of type &lt;Byte, byte[]&gt;.
+ */
+public interface SessionBytesStoreSupplier extends StoreSupplier<SessionStore<Bytes, byte[]>> {
+
+    /**
+     * The size of a segment, in milliseconds. Used when caching is enabled to segment the cache
+     * and reduce the amount of data that needs to be scanned when performing range queries.
+     *
+     * @return segmentInterval in milliseconds
+     */
+    long segmentIntervalMs();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
new file mode 100644
index 0000000..2d1b241
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.Map;
+
+/**
+ * Build a {@link StateStore} wrapped with optional caching and logging.
+ * @param <T>  the type of store to build
+ */
+public interface StoreBuilder<T extends StateStore> {
+
+    /**
+     * Enable caching on the store.
+     * @return  this
+     */
+    StoreBuilder<T> withCachingEnabled();
+
+    /**
+     * Maintain a changelog for any changes made to the store.
+     * Use the provided config to set the config of the changelog topic.
+     * @param config  config applied to the changelog topic
+     * @return this
+     */
+    StoreBuilder<T> withLoggingEnabled(final Map<String, String> config);
+
+    /**
+     * Disable the changelog for store built by this {@link StoreBuilder}.
+     * This will turn off fault-tolerance for your store.
+     * By default the changelog is enabled.
+     * @return this
+     */
+    StoreBuilder<T> withLoggingDisabled();
+
+    /**
+     * Build the store as defined by the builder.
+     *
+     * @return the built {@link StateStore}
+     */
+    T build();
+
+
+    /**
+     * Returns a Map containing any log configs that will be used when creating the changelog for the {@link StateStore}.
+     * <p>
+     * Note: any unrecognized configs will be ignored by the Kafka brokers.
+     *
+     * @return Map containing any log configs to be used when creating the changelog for the {@link StateStore}
+     * If {@code loggingEnabled} returns false, this function will always return an empty map
+     */
+    Map<String, String> logConfig();
+
+    /**
+     * @return {@code true} if the {@link StateStore} should have logging enabled
+     */
+    boolean loggingEnabled();
+
+    /**
+     * Return the name of this state store builder.
+     * This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'.
+     *
+     * @return the name of this state store builder
+     */
+    String name();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/StoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/StoreSupplier.java
new file mode 100644
index 0000000..10e6f2d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StoreSupplier.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.processor.StateStore;
+
+/**
+ * A state store supplier which can create one or more {@link StateStore} instances.
+ *
+ * @param <T> State store type
+ */
+public interface StoreSupplier<T extends StateStore> {
+    /**
+     * Return the name of this state store supplier.
+     * This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'.
+     *
+     * @return the name of this state store supplier
+     */
+    String name();
+
+    /**
+     * Return a new {@link StateStore} instance.
+     *
+     * @return a new {@link StateStore} instance of type T
+     */
+    T get();
+
+    /**
+     * Return a String that is used as the scope for metrics recorded by Metered stores.
+     * @return metricsScope
+     */
+    String metricsScope();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 3cf22c1..c0beb9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -19,12 +19,22 @@ package org.apache.kafka.streams.state;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
+import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,11 +51,149 @@ public class Stores {
     private static final Logger log = LoggerFactory.getLogger(Stores.class);
 
     /**
+     * Create a persistent {@link KeyValueBytesStoreSupplier}.
+     * @param name  name of the store
+     * @return  an instance of a {@link KeyValueBytesStoreSupplier} that can be used
+     * to build a persistent store
+     */
+    public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
+        return new RocksDbKeyValueBytesStoreSupplier(name);
+    }
+
+    /**
+     * Create an in-memory {@link KeyValueBytesStoreSupplier}.
+     * @param name  name of the store
+     * @return  an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
+     * build an in-memory store
+     */
+    public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {
+        return new KeyValueBytesStoreSupplier() {
+            @Override
+            public String name() {
+                return name;
+            }
+
+            @Override
+            public KeyValueStore<Bytes, byte[]> get() {
+                return new InMemoryKeyValueStore<>(name, Serdes.Bytes(), Serdes.ByteArray());
+            }
+
+            @Override
+            public String metricsScope() {
+                return "in-memory-state";
+            }
+        };
+    }
+
+    /**
+     * Create a LRU Map {@link KeyValueBytesStoreSupplier}.
+     * @param name          name of the store
+     * @param maxCacheSize  maximum number of items in the LRU
+     * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used to build
+     * an LRU Map based store
+     */
+    public static KeyValueBytesStoreSupplier lruMap(final String name, final int maxCacheSize) {
+        return new KeyValueBytesStoreSupplier() {
+            @Override
+            public String name() {
+                return name;
+            }
+
+            @Override
+            public KeyValueStore<Bytes, byte[]> get() {
+                return new MemoryNavigableLRUCache<>(name, maxCacheSize, Serdes.Bytes(), Serdes.ByteArray());
+            }
+
+            @Override
+            public String metricsScope() {
+                return "in-memory-lru-state";
+            }
+        };
+    }
+
+    /**
+     * Create a persistent {@link WindowBytesStoreSupplier}.
+     * @param name                  name of the store
+     * @param retentionPeriod       length of time to retain data in the store
+     * @param numSegments           number of db segments
+     * @param windowSize            size of the windows
+     * @param retainDuplicates      whether or not to retain duplicates.
+     * @return an instance of {@link WindowBytesStoreSupplier}
+     */
+    public static WindowBytesStoreSupplier persistentWindowStore(final String name,
+                                                                 final long retentionPeriod,
+                                                                 final int numSegments,
+                                                                 final long windowSize,
+                                                                 final boolean retainDuplicates) {
+        return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, numSegments, windowSize, retainDuplicates);
+    }
+
+    /**
+     * Create a persistent {@link SessionBytesStoreSupplier}.
+     * @param name              name of the store
+     * @param retentionPeriod   length ot time to retain data in the store
+     * @return an instance of a {@link  SessionBytesStoreSupplier}
+     */
+    public static SessionBytesStoreSupplier persistentSessionStore(final String name,
+                                                                   final long retentionPeriod) {
+        return new RocksDbSessionBytesStoreSupplier(name, retentionPeriod);
+    }
+
+
+    /**
+     * Creates a {@link StoreBuilder} that can be used to build a {@link WindowStore}.
+     * @param supplier      a {@link WindowBytesStoreSupplier}
+     * @param keySerde      the key serde to use
+     * @param valueSerde    the value serde to use
+     * @param <K>           key type
+     * @param <V>           value type
+     * @return an instance of {@link StoreBuilder} than can build a {@link WindowStore}
+     */
+    public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(final WindowBytesStoreSupplier supplier,
+                                                                            final Serde<K> keySerde,
+                                                                            final Serde<V> valueSerde) {
+        return new WindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
+    }
+
+    /**
+     * Creates a {@link StoreBuilder} than can be used to build a {@link KeyValueStore}.
+     * @param supplier      a {@link KeyValueBytesStoreSupplier}
+     * @param keySerde      the key serde to use
+     * @param valueSerde    the value serde to use
+     * @param <K>           key type
+     * @param <V>           value type
+     * @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore}
+     */
+    public static <K, V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
+                                                                                final Serde<K> keySerde,
+                                                                                final Serde<V> valueSerde) {
+        return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
+    }
+
+    /**
+     * Creates a {@link StoreBuilder} that can be used to build a {@link SessionStore}.
+     * @param supplier      a {@link SessionBytesStoreSupplier}
+     * @param keySerde      the key serde to use
+     * @param valueSerde    the value serde to use
+     * @param <K>           key type
+     * @param <V>           value type
+     * @return an instance of {@link StoreBuilder} than can build a {@link SessionStore}
+     * */
+    public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final SessionBytesStoreSupplier supplier,
+                                                                              final Serde<K> keySerde,
+                                                                              final Serde<V> valueSerde) {
+        return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
+    }
+
+    /**
      * Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance.
      *
      * @param name the name of the store
      * @return the factory that can be used to specify other options or configurations for the store; never null
+     * @deprected use {@link #persistentKeyValueStore(String)}, {@link #persistentWindowStore(String, long, int, long, boolean)}
+     * {@link #persistentSessionStore(String, long)}, {@link #lruMap(String, int)}, or {@link #inMemoryKeyValueStore(String)}
      */
+    @Deprecated
     public static StoreFactory create(final String name) {
         return new StoreFactory() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
new file mode 100644
index 0000000..5fbe6b0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Bytes;
+
+/**
+ * A store supplier that can be used to create one or more {@link WindowStore WindowStore<Bytes, byte[]>>} instances of type &lt;Byte, byte[]&gt;.
+ */
+public interface WindowBytesStoreSupplier extends StoreSupplier<WindowStore<Bytes, byte[]>> {
+    /**
+     * The number of segments the store has. If your store is segmented then this should be the number of segments
+     * in the underlying store.
+     * It is also used to reduce the amount of data that is scanned when caching is enabled.
+     *
+     * @return number of segments
+     */
+    int segments();
+
+    /**
+     * The size of the windows any store created from this supplier is creating.
+     *
+     * @return window size
+     */
+    long windowSize();
+
+    /**
+     * Whether or not this store is retaining duplicate keys.
+     * Usually only true if the store is being used for joins.
+     * Note this should return false if caching is enabled.
+     *
+     * @return true if duplicates should be retained
+     */
+    boolean retainDuplicates();
+
+    /**
+     * The time period for which the {@link WindowStore} will retain historic data.
+     *
+     * @return retentionPeriod
+     */
+    long retentionPeriod();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
new file mode 100644
index 0000000..39b9d03
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+abstract class AbstractStoreBuilder<K, V, T extends StateStore> implements StoreBuilder<T> {
+    private final String name;
+    private Map<String, String> logConfig = new HashMap<>();
+    final Serde<K> keySerde;
+    final Serde<V> valueSerde;
+    final Time time;
+    boolean enableCaching;
+    boolean enableLogging = true;
+
+    AbstractStoreBuilder(final String name,
+                         final Serde<K> keySerde,
+                         final Serde<V> valueSerde,
+                         final Time time) {
+        Objects.requireNonNull(name, "name can't be null");
+        Objects.requireNonNull(time, "time can't be null");
+        this.name = name;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+        this.time = time;
+    }
+
+    @Override
+    public StoreBuilder<T> withCachingEnabled() {
+        enableCaching = true;
+        return this;
+    }
+
+    @Override
+    public StoreBuilder<T> withLoggingEnabled(final Map<String, String> config) {
+        Objects.requireNonNull(config, "config can't be null");
+        enableLogging = true;
+        logConfig = config;
+        return this;
+    }
+
+    @Override
+    public StoreBuilder<T> withLoggingDisabled() {
+        enableLogging = false;
+        logConfig.clear();
+        return this;
+    }
+
+    @Override
+    public Map<String, String> logConfig() {
+        return logConfig;
+    }
+
+    @Override
+    public boolean loggingEnabled() {
+        return enableLogging;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
new file mode 100644
index 0000000..20230f9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import java.util.Objects;
+
+public class KeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, KeyValueStore<K, V>> {
+
+    private final KeyValueBytesStoreSupplier storeSupplier;
+
+
+    public KeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier,
+                                final Serde<K> keySerde,
+                                final Serde<V> valueSerde,
+                                final Time time) {
+        super(storeSupplier.name(), keySerde, valueSerde, time);
+        Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null");
+        this.storeSupplier = storeSupplier;
+    }
+
+    @Override
+    public KeyValueStore<K, V> build() {
+        return new MeteredKeyValueBytesStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+                                               storeSupplier.metricsScope(),
+                                               time,
+                                               keySerde,
+                                               valueSerde);
+    }
+
+    private KeyValueStore<Bytes, byte[]> maybeWrapCaching(final KeyValueStore<Bytes, byte[]> inner) {
+        if (!enableCaching) {
+            return inner;
+        }
+        return new CachingKeyValueStore<>(inner, keySerde, valueSerde);
+    }
+
+    private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final KeyValueStore<Bytes, byte[]> inner) {
+        if (!enableLogging) {
+            return inner;
+        }
+        return new ChangeLoggingKeyValueBytesStore(inner);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index 60a506b..d629c1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -17,8 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -34,48 +32,29 @@ import java.util.Map;
 
 public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
 
-    private static final String METRICS_SCOPE = "rocksdb-state";
-    private final boolean cached;
+    private final KeyValueStoreBuilder<K, V> builder;
 
     public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean cached) {
-        this(name, keySerde, valueSerde, null, logged, logConfig, cached);
+        this(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig, cached);
     }
 
     public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig, boolean cached) {
         super(name, keySerde, valueSerde, time, logged, logConfig);
-        this.cached = cached;
-    }
-
-    public KeyValueStore get() {
-        final RocksDBStore<Bytes, byte[]> rocks = new RocksDBStore<>(name,
-                                                                     Serdes.Bytes(),
-                                                                     Serdes.ByteArray());
-
-        if (!cached && !logged) {
-            return new MeteredKeyValueBytesStore<>(
-                    rocks, METRICS_SCOPE, time, keySerde, valueSerde);
-        }
-
-
-        if (cached && logged) {
-            final KeyValueStore<Bytes, byte[]> caching = new CachingKeyValueStore<>(new ChangeLoggingKeyValueBytesStore(rocks), keySerde, valueSerde);
-            return new MeteredKeyValueBytesStore<>(caching, METRICS_SCOPE, time, keySerde, valueSerde);
-        }
-
+        builder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier(name),
+                                             keySerde,
+                                             valueSerde,
+                                             time);
         if (cached) {
-            final KeyValueStore<Bytes, byte[]> caching = new CachingKeyValueStore<>(rocks, keySerde, valueSerde);
-            return new MeteredKeyValueBytesStore<>(caching, METRICS_SCOPE, time, keySerde, valueSerde);
-
-        } else {
-            // logged
-            return new MeteredKeyValueBytesStore<>(
-                    new ChangeLoggingKeyValueBytesStore(rocks),
-                    METRICS_SCOPE,
-                    time,
-                    keySerde,
-                    valueSerde);
+            builder.withCachingEnabled();
         }
+        // logged by default so we only need to worry about when it is disabled.
+        if (!logged) {
+            builder.withLoggingDisabled();
+        }
+    }
 
+    public KeyValueStore get() {
+        return builder.build();
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index 9fde74b..77b1abb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
 
 
-class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG> {
+public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG> {
 
     private final Serde<K> keySerde;
     private final Serde<AGG> aggSerde;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
index aa0466d..f5432dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.SessionStore;
 
@@ -33,50 +32,30 @@ import java.util.Map;
  */
 public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, SessionStore> implements WindowStoreSupplier<SessionStore> {
 
-    private static final String METRIC_SCOPE = "rocksdb-session";
-    private static final int NUM_SEGMENTS = 3;
+    static final int NUM_SEGMENTS = 3;
     private final long retentionPeriod;
-    private final boolean cached;
+    private final SessionStoreBuilder<K, V> builder;
 
     public RocksDBSessionStoreSupplier(String name, long retentionPeriod, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean cached) {
         super(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig);
         this.retentionPeriod = retentionPeriod;
-        this.cached = cached;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    public SessionStore<K, V> get() {
-        final SessionKeySchema keySchema = new SessionKeySchema();
-        final long segmentInterval = Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
-        final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name,
-                                                                                    retentionPeriod,
-                                                                                    NUM_SEGMENTS,
-                                                                                    keySchema);
-
-        final RocksDBSessionStore<Bytes, byte[]> bytesStore = RocksDBSessionStore.bytesStore(segmented);
-        return new MeteredSessionStore<>(maybeWrapCaching(maybeWrapLogged(bytesStore), segmentInterval),
-                                         METRIC_SCOPE,
-                                         keySerde,
-                                         valueSerde,
-                                         time);
-
-    }
-
-    private SessionStore<Bytes, byte[]> maybeWrapLogged(final SessionStore<Bytes, byte[]> inner) {
+        builder = new SessionStoreBuilder<>(new RocksDbSessionBytesStoreSupplier(name,
+                                                                                 retentionPeriod),
+                                            keySerde,
+                                            valueSerde,
+                                            time);
+        if (cached) {
+            builder.withCachingEnabled();
+        }
+        // logged by default so we only need to worry about when it is disabled.
         if (!logged) {
-            return inner;
+            builder.withLoggingDisabled();
         }
-        return new ChangeLoggingSessionBytesStore(inner);
     }
 
-    private SessionStore<Bytes, byte[]> maybeWrapCaching(final SessionStore<Bytes, byte[]> inner, final long segmentInterval) {
-        if (!cached) {
-            return inner;
-        }
-        return new CachingSessionStore<>(inner, keySerde, valueSerde, segmentInterval);
+    public SessionStore<K, V> get() {
+        return builder.build();
+
     }
 
     public long retentionPeriod() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index b147894..b7dd532 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V> {
+public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V> {
 
     // this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs
     private static class RocksDBWindowBytesStore extends RocksDBWindowStore<Bytes, byte[]> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 456d9e9..b899f5e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.WindowStore;
 
@@ -33,14 +32,9 @@ import java.util.Map;
  */
 
 public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> implements WindowStoreSupplier<WindowStore> {
-    private static final String METRICS_SCOPE = "rocksdb-window";
     public static final int MIN_SEGMENTS = 2;
     private final long retentionPeriod;
-    private final boolean retainDuplicates;
-    private final int numSegments;
-    private final long segmentInterval;
-    private final long windowSize;
-    private final boolean enableCaching;
+    private WindowStoreBuilder<K, V> builder;
 
     public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, long windowSize, boolean logged, Map<String, String> logConfig, boolean enableCaching) {
         this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, Time.SYSTEM, windowSize, logged, logConfig, enableCaching);
@@ -52,34 +46,25 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
             throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
         }
         this.retentionPeriod = retentionPeriod;
-        this.retainDuplicates = retainDuplicates;
-        this.numSegments = numSegments;
-        this.windowSize = windowSize;
-        this.enableCaching = enableCaching;
-        this.segmentInterval = Segments.segmentInterval(retentionPeriod, numSegments);
-    }
-
-    public String name() {
-        return name;
+        builder = new WindowStoreBuilder<>(new RocksDbWindowBytesStoreSupplier(name,
+                                                                               retentionPeriod,
+                                                                               numSegments,
+                                                                               windowSize,
+                                                                               retainDuplicates),
+                                           keySerde,
+                                           valueSerde,
+                                           time);
+        if (enableCaching) {
+            builder.withCachingEnabled();
+        }
+        // logged by default so we only need to worry about when it is disabled.
+        if (!logged) {
+            builder.withLoggingDisabled();
+        }
     }
 
     public WindowStore<K, V> get() {
-        final RocksDBSegmentedBytesStore segmentedBytesStore = new RocksDBSegmentedBytesStore(
-                name,
-                retentionPeriod,
-                numSegments,
-                new WindowKeySchema()
-        );
-        final RocksDBWindowStore<Bytes, byte[]> innerStore = RocksDBWindowStore.bytesStore(segmentedBytesStore,
-                                                                                           retainDuplicates,
-                                                                                           windowSize);
-
-        return new MeteredWindowStore<>(maybeWrapCaching(maybeWrapLogged(innerStore)),
-                                        METRICS_SCOPE,
-                                        time,
-                                        keySerde,
-                                        valueSerde);
-
+        return builder.build();
     }
 
     @Override
@@ -87,17 +72,4 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
         return retentionPeriod;
     }
 
-    private WindowStore<Bytes, byte[]> maybeWrapLogged(final WindowStore<Bytes, byte[]> inner) {
-        if (!logged) {
-            return inner;
-        }
-        return new ChangeLoggingWindowBytesStore(inner, retainDuplicates);
-    }
-
-    private WindowStore<Bytes, byte[]> maybeWrapCaching(final WindowStore<Bytes, byte[]> inner) {
-        if (!enableCaching) {
-            return inner;
-        }
-        return new CachingWindowStore<>(inner, keySerde, valueSerde, windowSize, segmentInterval);
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
new file mode 100644
index 0000000..7870579
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class RocksDbKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupplier {
+
+    private final String name;
+
+    public RocksDbKeyValueBytesStoreSupplier(final String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public KeyValueStore<Bytes, byte[]> get() {
+        return new RocksDBStore<>(name,
+                                  Serdes.Bytes(),
+                                  Serdes.ByteArray());
+    }
+
+    @Override
+    public String metricsScope() {
+        return "rocksdb-state";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
new file mode 100644
index 0000000..ffeb7d8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+
+import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS;
+
+public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSupplier {
+    private final String name;
+    private final long retentionPeriod;
+
+    public RocksDbSessionBytesStoreSupplier(final String name,
+                                            final long retentionPeriod) {
+        this.name = name;
+        this.retentionPeriod = retentionPeriod;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public SessionStore<Bytes, byte[]> get() {
+        final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name,
+                                                                                    retentionPeriod,
+                                                                                    NUM_SEGMENTS,
+                                                                                    new SessionKeySchema());
+        return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), Serdes.ByteArray());
+    }
+
+    @Override
+    public String metricsScope() {
+        return "rocksdb-session";
+    }
+
+    @Override
+    public long segmentIntervalMs() {
+        return Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
+    }
+}