You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/18 14:53:49 UTC

[1/2] kafka git commit: KAFKA-5873; add materialized overloads to StreamsBuilder

Repository: kafka
Updated Branches:
  refs/heads/trunk 52d7b6763 -> f2b74aa1c


http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index cbf2b56..0bdd3a3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -33,7 +34,9 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
@@ -101,9 +104,13 @@ public class GlobalKTableIntegrationTest {
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), null, globalOne, globalStore);
-        stream = builder.stream(inputStream, Consumed.with(Serdes.String(), Serdes.Long()));
-        table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");
+        globalTable = builder.globalTable(globalOne, Consumed.with(Serdes.Long(), Serdes.String()),
+                                          Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
+                                                  .withKeySerde(Serdes.Long())
+                                                  .withValueSerde(Serdes.String()));
+        final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
+        stream = builder.stream(inputStream, stringLongConsumed);
+        table = builder.table(inputTable, stringLongConsumed);
         foreachAction = new ForeachAction<String, String>() {
             @Override
             public void apply(final String key, final String value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 3a771c4..faa581b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -128,8 +128,8 @@ public class JoinIntegrationTest {
         CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
 
         builder = new StreamsBuilder();
-        leftTable = builder.table(INPUT_TOPIC_1, "leftTable");
-        rightTable = builder.table(INPUT_TOPIC_2, "rightTable");
+        leftTable = builder.table(INPUT_TOPIC_1);
+        rightTable = builder.table(INPUT_TOPIC_2);
         leftStream = leftTable.toStream();
         rightStream = rightTable.toStream();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index a433667..8d4299b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -211,7 +211,8 @@ public class KStreamKTableJoinIntegrationTest {
         // subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe"
         // (which overrides her previous region value of "asia").
         final KTable<String, String> userRegionsTable =
-            builder.table(stringSerde, stringSerde, userRegionsTopic, userRegionsStoreName);
+            builder.table(userRegionsTopic,
+                          Consumed.with(Serdes.String(), Serdes.String()));
 
 
         // Compute the number of clicks per region, e.g. "europe" -> 13L.

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 1b45711..a12ffac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -343,9 +343,9 @@ public class KTableKTableJoinIntegrationTest {
     private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2, final String queryableName) {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
-        final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2);
-        final KTable<String, String> table3 = builder.table(TABLE_3, TABLE_3);
+        final KTable<String, String> table1 = builder.table(TABLE_1);
+        final KTable<String, String> table2 = builder.table(TABLE_2);
+        final KTable<String, String> table3 = builder.table(TABLE_3);
 
         Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized = null;
         if (queryableName != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 69c42fe..31b7222 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -106,7 +107,7 @@ public class RestoreIntegrationTest {
 
         createStateForRestoration();
 
-        builder.table(Serdes.Integer(), Serdes.Integer(), inputStream, "store")
+        builder.table(inputStream, Consumed.with(Serdes.Integer(), Serdes.Integer()))
                 .toStream()
                 .foreach(new ForeachAction<Integer, Integer>() {
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 0ee74b8..0f8109d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -54,8 +54,9 @@ public class GlobalKTableJoinsTest {
     @Before
     public void setUp() {
         stateDir = TestUtils.tempDirectory();
-        global = builder.globalTable(Serdes.String(), Serdes.String(), null, globalTopic, "global-store");
-        stream = builder.stream(streamTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
+        global = builder.globalTable(globalTopic, consumed);
+        stream = builder.stream(streamTopic, consumed);
         keyValueMapper = new KeyValueMapper<String, String, String>() {
             @Override
             public String apply(final String key, final String value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 1a2dc13..494e197 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -16,17 +16,20 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -58,6 +61,8 @@ public class InternalStreamsBuilderTest {
 
     private KStreamTestDriver driver = null;
     private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>();
+    private MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+            = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-store"), false);
 
     @Before
     public void setUp() {
@@ -132,27 +137,30 @@ public class InternalStreamsBuilderTest {
     }
 
     @Test
-    public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() throws Exception {
-        KTable table1 = builder.table("topic1", consumed, "table1");
-        KTable table2 = builder.table("topic2", consumed, null);
+    public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() throws Exception {
+        KTable table1 = builder.table("topic2",
+                                      consumed,
+                                      new MaterializedInternal<>(
+                                              Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("topic2"),
+                                              false));
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.build(null);
 
-        assertEquals(2, topology.stateStores().size());
-        assertEquals("table1", topology.stateStores().get(0).name());
+        assertEquals(1, topology.stateStores().size());
+        assertEquals("topic2", topology.stateStores().get(0).name());
 
-        final String internalStoreName = topology.stateStores().get(1).name();
-        assertTrue(internalStoreName.contains(KTableImpl.STATE_STORE_NAME));
-        assertEquals(2, topology.storeToChangelogTopic().size());
-        assertEquals("topic1", topology.storeToChangelogTopic().get("table1"));
-        assertEquals("topic2", topology.storeToChangelogTopic().get(internalStoreName));
-        assertEquals(table1.queryableStoreName(), "table1");
-        assertNull(table2.queryableStoreName());
+        assertEquals(1, topology.storeToChangelogTopic().size());
+        assertEquals("topic2", topology.storeToChangelogTopic().get("topic2"));
+        assertNull(table1.queryableStoreName());
     }
 
     @Test
     public void shouldBuildSimpleGlobalTableTopology() throws Exception {
-        builder.globalTable("table", consumed, "globalTable");
+        builder.globalTable("table",
+                            consumed,
+                            new MaterializedInternal<>(
+                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"),
+                                    false));
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
         final List<StateStore> stateStores = topology.globalStateStores();
@@ -173,16 +181,14 @@ public class InternalStreamsBuilderTest {
 
     @Test
     public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
-        builder.globalTable("table", consumed, "globalTable");
-        builder.globalTable("table2", consumed, "globalTable2");
-
-        doBuildGlobalTopologyWithAllGlobalTables();
-    }
-
-    @Test
-    public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() throws Exception {
-        builder.globalTable("table", consumed, null);
-        builder.globalTable("table2", consumed, null);
+        builder.globalTable("table",
+                            consumed,
+                            new MaterializedInternal<>(
+                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global1")));
+        builder.globalTable("table2",
+                            consumed,
+                            new MaterializedInternal<>(
+                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global2")));
 
         doBuildGlobalTopologyWithAllGlobalTables();
     }
@@ -191,10 +197,19 @@ public class InternalStreamsBuilderTest {
     public void shouldAddGlobalTablesToEachGroup() throws Exception {
         final String one = "globalTable";
         final String two = "globalTable2";
-        final GlobalKTable<String, String> globalTable = builder.globalTable("table", consumed, one);
-        final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", consumed, two);
 
-        builder.table("not-global", consumed, "not-global");
+        final GlobalKTable<String, String> globalTable = builder.globalTable("table",
+                                                                             consumed,
+                                                                             new MaterializedInternal<>(
+                                                                                     Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(one)));
+        final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2",
+                                                                              consumed,
+                                                                              new MaterializedInternal<>(
+                                                                                      Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(two)));
+
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("not-global"), false);
+        builder.table("not-global", consumed, materialized);
 
         final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
             @Override
@@ -227,7 +242,9 @@ public class InternalStreamsBuilderTest {
     public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
         final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed);
 
-        final KTable<String, String> table = builder.table("table-topic", consumed, "table-store");
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("table-store"), false);
+        final KTable<String, String> table = builder.table("table-topic", consumed, materialized);
         assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
 
         final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
@@ -260,8 +277,7 @@ public class InternalStreamsBuilderTest {
     @Test
     public void shouldAddTableToEarliestAutoOffsetResetList() {
         final String topicName = "topic-1";
-        final String storeName = "test-store";
-        builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)), storeName);
+        builder.table(topicName, new ConsumedInternal<>(Consumed.<String, String>with(AutoOffsetReset.EARLIEST)), materialized);
 
         assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
@@ -270,9 +286,7 @@ public class InternalStreamsBuilderTest {
     @Test
     public void shouldAddTableToLatestAutoOffsetResetList() {
         final String topicName = "topic-1";
-        final String storeName = "test-store";
-
-        builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)), storeName);
+        builder.table(topicName, new ConsumedInternal<>(Consumed.<String, String>with(AutoOffsetReset.LATEST)), materialized);
 
         assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
@@ -281,9 +295,8 @@ public class InternalStreamsBuilderTest {
     @Test
     public void shouldNotAddTableToOffsetResetLists() {
         final String topicName = "topic-1";
-        final String storeName = "test-store";
 
-        builder.table(topicName, consumed, storeName);
+        builder.table(topicName, consumed, materialized);
 
         assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
@@ -340,15 +353,15 @@ public class InternalStreamsBuilderTest {
 
     @Test
     public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception {
-        builder.table("topic", consumed, "store");
+        builder.table("topic", consumed, materialized);
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
     @Test
     public void ktableShouldUseProvidedTimestampExtractor() throws Exception {
-        final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(new MockTimestampExtractor()));
-        builder.table("topic", consumed, "store");
+        final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.<String, String>with(new MockTimestampExtractor()));
+        builder.table("topic", consumed, materialized);
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index ff9726e..705cf62 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -60,7 +61,7 @@ public class KGroupedTableImplTest {
 
     @Before
     public void before() {
-        groupedTable = builder.table(Serdes.String(), Serdes.String(), "blah", "blah")
+        groupedTable = builder.table("blah", Consumed.with(Serdes.String(), Serdes.String()))
                 .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
     }
 
@@ -157,7 +158,11 @@ public class KGroupedTableImplTest {
                 }
             };
 
-        final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
+        final KTable<String, Integer> reduced = builder.table(topic,
+                                                              Consumed.with(Serdes.String(), Serdes.Double()),
+                                                              Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store")
+                                                                      .withKeySerde(Serdes.String())
+                                                                      .withValueSerde(Serdes.Double()))
             .groupBy(intProjection)
             .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced");
 
@@ -175,7 +180,11 @@ public class KGroupedTableImplTest {
                 }
             };
 
-        final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
+        final KTable<String, Integer> reduced = builder.table(topic,
+                                                              Consumed.with(Serdes.String(), Serdes.Double()),
+                                                              Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store")
+                                                                      .withKeySerde(Serdes.String())
+                                                                      .withValueSerde(Serdes.Double()))
             .groupBy(intProjection)
             .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
 
@@ -194,7 +203,7 @@ public class KGroupedTableImplTest {
                 }
             };
 
-        final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
+        final KTable<String, Integer> reduced = builder.table(topic, Consumed.with(Serdes.String(), Serdes.Double()))
                 .groupBy(intProjection)
                 .reduce(MockReducer.INTEGER_ADDER,
                         MockReducer.INTEGER_SUBTRACTOR,
@@ -211,10 +220,10 @@ public class KGroupedTableImplTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldCountAndMaterializeResults() {
-        final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store");
+        final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
         table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
-                         Serialized.with(Serdes.String(),
-                                         Serdes.String()))
+                      Serialized.with(Serdes.String(),
+                                      Serdes.String()))
                 .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(Serdes.Long()));
@@ -228,7 +237,7 @@ public class KGroupedTableImplTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldAggregateAndMaterializeResults() {
-        final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store");
+        final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
         table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
                       Serialized.with(Serdes.String(),
                                       Serdes.String()))

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 5e8687f..be1d865 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -61,6 +61,7 @@ public class KStreamImplTest {
 
     final private Serde<String> stringSerde = Serdes.String();
     final private Serde<Integer> intSerde = Serdes.Integer();
+    private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
     private KStream<String, String> testStream;
     private StreamsBuilder builder;
     private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
@@ -361,7 +362,7 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullValueMapperOnTableJoin() {
-        testStream.leftJoin(builder.table(Serdes.String(), Serdes.String(), "topic", "store"), null);
+        testStream.leftJoin(builder.table("topic", stringConsumed), null);
     }
 
     @Test(expected = NullPointerException.class)
@@ -383,14 +384,14 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnJoinWithGlobalTable() {
-        testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
+        testStream.join(builder.globalTable("global", stringConsumed),
                         null,
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() {
-        testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
+        testStream.join(builder.globalTable("global", stringConsumed),
                         MockKeyValueMapper.<String, String>SelectValueMapper(),
                         null);
     }
@@ -404,14 +405,14 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() {
-        testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
+        testStream.leftJoin(builder.globalTable("global", stringConsumed),
                         null,
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() {
-        testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
+        testStream.leftJoin(builder.globalTable("global", stringConsumed),
                         MockKeyValueMapper.<String, String>SelectValueMapper(),
                         null);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 745ab4e..39b318f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -87,7 +87,7 @@ public class KStreamKStreamLeftJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver.setUp(builder, stateDir);
+        driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
         driver.setTime(0L);
 
         // push two items to the primary stream. the other window is empty

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index d206b02..d1226c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -67,8 +67,9 @@ public class KStreamKTableJoinTest {
         final MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(topic1, Consumed.with(intSerde, stringSerde));
-        table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
+        final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+        stream = builder.stream(topic1, consumed);
+        table = builder.table(topic2, consumed);
         stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -76,7 +77,7 @@ public class KStreamKTableJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver.setUp(builder, stateDir);
+        driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
         driver.setTime(0L);
 
         // push two items to the primary stream. the other table is empty

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index a79184e..ed835a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -68,8 +68,9 @@ public class KStreamKTableLeftJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(topic1, Consumed.with(intSerde, stringSerde));
-        table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
+        Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+        stream = builder.stream(topic1, consumed);
+        table = builder.table(topic2, consumed);
         stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
         Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index accbb9c..0ae95dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -50,6 +51,7 @@ import static org.junit.Assert.assertEquals;
 public class KTableAggregateTest {
 
     final private Serde<String> stringSerde = Serdes.String();
+    private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
     private final Serialized<String, String> stringSerialzied = Serialized.with(stringSerde, stringSerde);
 
     private File stateDir = null;
@@ -70,7 +72,7 @@ public class KTableAggregateTest {
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTable<String, String> table1 = builder.table(topic1, consumed);
         KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
                                                        stringSerialzied
         ).aggregate(MockInitializer.STRING_INIT,
@@ -81,7 +83,7 @@ public class KTableAggregateTest {
 
         table2.toStream().process(proc);
 
-        driver.setUp(builder, stateDir);
+        driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
 
         driver.process(topic1, "A", "1");
         driver.flushState();
@@ -118,7 +120,7 @@ public class KTableAggregateTest {
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTable<String, String> table1 = builder.table(topic1, consumed);
         KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
                                                        stringSerialzied
         ).aggregate(MockInitializer.STRING_INIT,
@@ -146,7 +148,7 @@ public class KTableAggregateTest {
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTable<String, String> table1 = builder.table(topic1, consumed);
         KTable<String, String> table2 = table1.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
             @Override
                 public KeyValue<String, String> apply(String key, String value) {
@@ -232,7 +234,7 @@ public class KTableAggregateTest {
         final String input = "count-test-input";
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
-        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+        builder.table(input, consumed)
                 .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
                 .count("count")
                 .toStream()
@@ -247,7 +249,7 @@ public class KTableAggregateTest {
         final String input = "count-test-input";
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
-        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+        builder.table(input, consumed)
             .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
             .count()
             .toStream()
@@ -262,7 +264,7 @@ public class KTableAggregateTest {
         final String input = "count-test-input";
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
-        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+        builder.table(input, consumed)
             .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
             .count("count")
             .toStream()
@@ -291,7 +293,7 @@ public class KTableAggregateTest {
         final String input = "count-test-input";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
-        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+        builder.table(input, consumed)
                 .groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
 
                     @Override
@@ -348,8 +350,8 @@ public class KTableAggregateTest {
         final String reduceTopic = "TestDriver-reducer-store-repartition";
         final Map<String, Long> reduceResults = new HashMap<>();
 
-        final KTable<String, String> one = builder.table(Serdes.String(), Serdes.String(), tableOne, tableOne);
-        final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);
+        final KTable<String, String> one = builder.table(tableOne, consumed);
+        final KTable<Long, String> two = builder.table(tableTwo, Consumed.with(Serdes.Long(), Serdes.String()));
 
 
         final KTable<String, Long> reduce = two.groupBy(new KeyValueMapper<Long, String, KeyValue<String, Long>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index a885edd..7986277 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -42,6 +43,7 @@ public class KTableFilterTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
+    private final Consumed<String, Integer> consumed = Consumed.with(stringSerde, intSerde);
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
@@ -79,7 +81,7 @@ public class KTableFilterTest {
 
         final String topic1 = "topic1";
 
-        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+        KTable<String, Integer> table1 = builder.table(topic1, consumed);
 
         KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
             @Override
@@ -104,7 +106,7 @@ public class KTableFilterTest {
 
         final String topic1 = "topic1";
 
-        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+        KTable<String, Integer> table1 = builder.table(topic1, consumed);
 
         KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
             @Override
@@ -128,7 +130,7 @@ public class KTableFilterTest {
 
         final String topic1 = "topic1";
 
-        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+        KTable<String, Integer> table1 = builder.table(topic1, consumed);
 
         KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
             @Override
@@ -213,7 +215,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+                (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
                 new Predicate<String, Integer>() {
                     @Override
@@ -239,7 +241,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-            (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
             new Predicate<String, Integer>() {
                 @Override
@@ -304,7 +306,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+                (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
                 new Predicate<String, Integer>() {
                     @Override
@@ -323,7 +325,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-            (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
             new Predicate<String, Integer>() {
                 @Override
@@ -382,7 +384,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+                (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
                 new Predicate<String, Integer>() {
                     @Override
@@ -401,7 +403,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-            (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
             new Predicate<String, Integer>() {
                 @Override
@@ -440,8 +442,9 @@ public class KTableFilterTest {
 
         String topic1 = "topic1";
 
+        final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
         KTableImpl<String, String, String> table1 =
-            (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+            (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
             new Predicate<String, String>() {
                 @Override
@@ -461,8 +464,9 @@ public class KTableFilterTest {
 
         String topic1 = "topic1";
 
+        final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
         KTableImpl<String, String, String> table1 =
-            (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+            (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
             new Predicate<String, String>() {
                 @Override
@@ -485,7 +489,7 @@ public class KTableFilterTest {
         };
 
         new StreamsBuilder()
-            .<Integer, String>table("empty", "emptyStore")
+            .<Integer, String>table("empty")
             .filter(numberKeyPredicate)
             .filterNot(numberKeyPredicate)
             .to("nirvana");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index 693aac8..23e0b59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -18,10 +18,14 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -79,7 +83,11 @@ public class KTableForeachTest {
 
         // When
         StreamsBuilder builder = new StreamsBuilder();
-        KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName, "anyStoreName");
+        KTable<Integer, String> table = builder.table(topicName,
+                                                      Consumed.with(intSerde, stringSerde),
+                                                      new MaterializedInternal<>(Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName)
+                                                                                         .withKeySerde(intSerde)
+                                                                                         .withValueSerde(stringSerde)));
         table.foreach(action);
 
         // Then
@@ -105,7 +113,7 @@ public class KTableForeachTest {
         };
 
         new StreamsBuilder()
-            .<Integer, String>table("emptyTopic", "emptyStore")
+            .<Integer, String>table("emptyTopic")
             .foreach(consume);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 6ca38b8..9d918e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.KTable;
@@ -54,6 +55,7 @@ import static org.junit.Assert.assertTrue;
 public class KTableImplTest {
 
     final private Serde<String> stringSerde = Serdes.String();
+    private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
@@ -64,7 +66,7 @@ public class KTableImplTest {
     public void setUp() {
         stateDir = TestUtils.tempDirectory("kafka-test");
         builder = new StreamsBuilder();
-        table = builder.table("test", "test");
+        table = builder.table("test");
     }
 
     @Test
@@ -73,10 +75,9 @@ public class KTableImplTest {
 
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String storeName1 = "storeName1";
         String storeName2 = "storeName2";
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, storeName1);
+        KTable<String, String> table1 = builder.table(topic1, consumed);
 
         MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
         table1.toStream().process(proc1);
@@ -130,11 +131,10 @@ public class KTableImplTest {
 
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String storeName1 = "storeName1";
         String storeName2 = "storeName2";
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
@@ -256,14 +256,12 @@ public class KTableImplTest {
     public void testStateStoreLazyEval() {
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String storeName1 = "storeName1";
-        String storeName2 = "storeName2";
 
         final StreamsBuilder builder = new StreamsBuilder();
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
-        builder.table(stringSerde, stringSerde, topic2, storeName2);
+                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
+        builder.table(topic2, consumed);
 
         KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
@@ -291,15 +289,13 @@ public class KTableImplTest {
     public void testStateStore() {
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String storeName1 = "storeName1";
-        String storeName2 = "storeName2";
 
         final StreamsBuilder builder = new StreamsBuilder();
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, String> table2 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2, storeName2);
+                (KTableImpl<String, String, String>) builder.table(topic2, consumed);
 
         KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
@@ -338,7 +334,12 @@ public class KTableImplTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+                (KTableImpl<String, String, String>) builder.table(topic1,
+                                                                   consumed,
+                                                                   Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName1)
+                                                                           .withKeySerde(stringSerde)
+                                                                           .withValueSerde(stringSerde)
+                );
 
         table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "mock-result1");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 124114b..aeb2418 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
@@ -50,6 +51,7 @@ public class KTableKTableJoinTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
+    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
     private File stateDir = null;
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
@@ -165,8 +167,8 @@ public class KTableKTableJoinTest {
         final MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
         joined.toStream().process(processor);
 
@@ -186,8 +188,8 @@ public class KTableKTableJoinTest {
         final MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
         joined.toStream().process(processor);
 
@@ -285,8 +287,8 @@ public class KTableKTableJoinTest {
         final KTable<Integer, String> joined;
         final MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
         proc = new MockProcessorSupplier<>();
         builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
@@ -306,8 +308,8 @@ public class KTableKTableJoinTest {
         final KTable<Integer, String> joined;
         final MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
         proc = new MockProcessorSupplier<>();
         builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
@@ -327,8 +329,8 @@ public class KTableKTableJoinTest {
         final KTable<Integer, String> joined;
         final MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
 
         proc = new MockProcessorSupplier<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 9cdc782..ca0c81c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
@@ -60,6 +61,7 @@ public class KTableKTableLeftJoinTest {
     private File stateDir = null;
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
 
     @Before
     public void setUp() throws IOException {
@@ -72,8 +74,8 @@ public class KTableKTableLeftJoinTest {
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        KTable<Integer, String> table1 = builder.table(topic1, consumed);
+        KTable<Integer, String> table2 = builder.table(topic2, consumed);
         KTable<Integer, String> joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
         MockProcessorSupplier<Integer, String> processor;
         processor = new MockProcessorSupplier<>();
@@ -171,8 +173,8 @@ public class KTableKTableLeftJoinTest {
         final KTable<Integer, String> joined;
         final MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         proc = new MockProcessorSupplier<>();
@@ -252,8 +254,8 @@ public class KTableKTableLeftJoinTest {
         KTable<Integer, String> joined;
         MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
@@ -341,7 +343,8 @@ public class KTableKTableLeftJoinTest {
         final String[] inputs = {agg, tableOne, tableTwo, tableThree, tableFour, tableFive, tableSix};
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<Long, String> aggTable = builder.table(Serdes.Long(), Serdes.String(), agg, agg)
+        final Consumed<Long, String> consumed = Consumed.with(Serdes.Long(), Serdes.String());
+        final KTable<Long, String> aggTable = builder.table(agg, consumed)
                 .groupBy(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
                     @Override
                     public KeyValue<Long, String> apply(final Long key, final String value) {
@@ -349,12 +352,12 @@ public class KTableKTableLeftJoinTest {
                     }
                 }, Serialized.with(Serdes.Long(), Serdes.String())).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, "agg-store");
 
-        final KTable<Long, String> one = builder.table(Serdes.Long(), Serdes.String(), tableOne, tableOne);
-        final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);
-        final KTable<Long, String> three = builder.table(Serdes.Long(), Serdes.String(), tableThree, tableThree);
-        final KTable<Long, String> four = builder.table(Serdes.Long(), Serdes.String(), tableFour, tableFour);
-        final KTable<Long, String> five = builder.table(Serdes.Long(), Serdes.String(), tableFive, tableFive);
-        final KTable<Long, String> six = builder.table(Serdes.Long(), Serdes.String(), tableSix, tableSix);
+        final KTable<Long, String> one = builder.table(tableOne, consumed);
+        final KTable<Long, String> two = builder.table(tableTwo, consumed);
+        final KTable<Long, String> three = builder.table(tableThree, consumed);
+        final KTable<Long, String> four = builder.table(tableFour, consumed);
+        final KTable<Long, String> five = builder.table(tableFive, consumed);
+        final KTable<Long, String> six = builder.table(tableSix, consumed);
 
         final ValueMapper<String, String> mapper = new ValueMapper<String, String>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 368a3ea..d6ab613 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
@@ -54,6 +55,7 @@ public class KTableKTableOuterJoinTest {
     private File stateDir = null;
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
 
     @Before
     public void setUp() throws IOException {
@@ -72,8 +74,8 @@ public class KTableKTableOuterJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
         joined.toStream().process(processor);
 
@@ -176,8 +178,8 @@ public class KTableKTableOuterJoinTest {
         KTable<Integer, String> joined;
         MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         proc = new MockProcessorSupplier<>();
@@ -264,8 +266,8 @@ public class KTableKTableOuterJoinTest {
         KTable<Integer, String> joined;
         MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 756404f..81797cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -57,7 +58,7 @@ public class KTableMapKeysTest {
 
         String topic1 = "topic_map_keys";
 
-        KTable<Integer, String> table1 = builder.table(integerSerde, stringSerde, topic1, "anyStoreName");
+        KTable<Integer, String> table1 = builder.table(topic1, Consumed.with(integerSerde, stringSerde));
 
         final Map<Integer, String> keyMap = new HashMap<>();
         keyMap.put(1, "ONE");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 4bfaea6..5d92846 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -43,6 +44,7 @@ import static org.junit.Assert.assertTrue;
 public class KTableMapValuesTest {
 
     final private Serde<String> stringSerde = Serdes.String();
+    private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
@@ -69,7 +71,7 @@ public class KTableMapValuesTest {
 
         String topic1 = "topic1";
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTable<String, String> table1 = builder.table(topic1, consumed);
         KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
             @Override
             public Integer apply(CharSequence value) {
@@ -89,7 +91,7 @@ public class KTableMapValuesTest {
 
         String topic1 = "topic1";
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTable<String, String> table1 = builder.table(topic1, consumed);
         KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
             @Override
             public Integer apply(CharSequence value) {
@@ -210,11 +212,10 @@ public class KTableMapValuesTest {
 
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String storeName1 = "storeName1";
         String storeName2 = "storeName2";
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
@@ -241,11 +242,10 @@ public class KTableMapValuesTest {
 
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String storeName1 = "storeName1";
         String storeName2 = "storeName2";
 
         KTableImpl<String, String, String> table1 =
-            (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+            (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
             new ValueMapper<String, Integer>() {
                 @Override
@@ -273,7 +273,7 @@ public class KTableMapValuesTest {
         String topic1 = "topic1";
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
@@ -321,7 +321,7 @@ public class KTableMapValuesTest {
         String topic1 = "topic1";
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 3f8a6b2..35a3dbd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -37,6 +38,7 @@ import static org.junit.Assert.assertTrue;
 public class KTableSourceTest {
 
     final private Serde<String> stringSerde = Serdes.String();
+    private final Consumed<String, String> stringConsumed = Consumed.with(stringSerde, stringSerde);
     final private Serde<Integer> intSerde = Serdes.Integer();
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
@@ -53,7 +55,7 @@ public class KTableSourceTest {
 
         String topic1 = "topic1";
 
-        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+        KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(stringSerde, intSerde));
 
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
         table1.toStream().process(proc1);
@@ -77,7 +79,7 @@ public class KTableSourceTest {
 
         String topic1 = "topic1";
 
-        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
 
         KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
 
@@ -121,7 +123,7 @@ public class KTableSourceTest {
 
         String topic1 = "topic1";
 
-        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
 
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
 
@@ -159,7 +161,7 @@ public class KTableSourceTest {
 
         String topic1 = "topic1";
 
-        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
 
         table1.enableSendingOldValues();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 3cbceeb..1e4afcd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -45,6 +45,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.TestUtils;
 
@@ -686,7 +687,7 @@ public class SimpleBenchmark {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic);
-        final KTable<Long, byte[]> input2 = builder.table(kTableTopic, kTableTopic + "-store");
+        final KTable<Long, byte[]> input2 = builder.table(kTableTopic);
 
         input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
 
@@ -697,8 +698,8 @@ public class SimpleBenchmark {
                                                             String kTableTopic2, final CountDownLatch latch) {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KTable<Long, byte[]> input1 = builder.table(kTableTopic1, kTableTopic1 + "-store");
-        final KTable<Long, byte[]> input2 = builder.table(kTableTopic2, kTableTopic2 + "-store");
+        final KTable<Long, byte[]> input1 = builder.table(kTableTopic1);
+        final KTable<Long, byte[]> input2 = builder.table(kTableTopic2);
 
         input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
 
@@ -725,10 +726,12 @@ public class SimpleBenchmark {
 
         StreamsBuilder builder = new StreamsBuilder();
 
+        final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder
+                = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), Serdes.Integer(), Serdes.ByteArray());
         if (enableCaching) {
-            builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().enableCaching().build());
+            builder.addStateStore(storeBuilder.withCachingEnabled());
         } else {
-            builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().build());
+            builder.addStateStore(storeBuilder);
         }
         KStream<Integer, byte[]> source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index 0a10f9f..d98fd7f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -286,8 +286,7 @@ public class YahooBenchmark {
         final KStream<String, ProjectedEvent> kEvents = builder.stream(eventsTopic,
                                                                        Consumed.with(Serdes.String(),
                                                                                      Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer)));
-        final KTable<String, String> kCampaigns = builder.table(Serdes.String(), Serdes.String(),
-            campaignsTopic, "campaign-state");
+        final KTable<String, String> kCampaigns = builder.table(campaignsTopic, Consumed.with(Serdes.String(), Serdes.String()));
 
 
         KStream<String, ProjectedEvent> filteredEvents = kEvents

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index b22c488..f3dbb32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -22,14 +22,18 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StreamsMetadata;
 import org.junit.Before;
 import org.junit.Test;
@@ -88,7 +92,9 @@ public class StreamsMetadataStateTest {
             }
         });
 
-        builder.globalTable("global-topic", "global-table");
+        builder.globalTable("global-topic",
+                            Consumed.with(null, null),
+                            Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>as(globalTable));
 
         StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("appId");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index c4e108d..4b75702 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -105,9 +105,9 @@ public class SmokeTestClient extends SmokeTestUtil {
         props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
         props.put(ProducerConfig.ACKS_CONFIG, "all");
 
-
         StreamsBuilder builder = new StreamsBuilder();
-        KStream<String, Integer> source = builder.stream("data", Consumed.with(stringSerde, intSerde));
+        Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
+        KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
         source.to(stringSerde, intSerde, "echo");
         KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
             @Override
@@ -141,7 +141,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                 new Unwindow<String, Integer>()
         ).to(stringSerde, intSerde, "min");
 
-        KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min", "minStoreName");
+        KTable<String, Integer> minTable = builder.table("min", stringIntConsumed);
         minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"));
 
         // max
@@ -163,7 +163,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                 new Unwindow<String, Integer>()
         ).to(stringSerde, intSerde, "max");
 
-        KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max", "maxStoreName");
+        KTable<String, Integer> maxTable = builder.table("max", stringIntConsumed);
         maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"));
 
         // sum
@@ -186,7 +186,8 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, longSerde, "sum");
 
 
-        KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum", "sumStoreName");
+        Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
+        KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
         sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"));
 
         // cnt
@@ -195,7 +196,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                 new Unwindow<String, Long>()
         ).to(stringSerde, longSerde, "cnt");
 
-        KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt", "cntStoreName");
+        KTable<String, Long> cntTable = builder.table("cnt", stringLongConsumed);
         cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"));
 
         // dif


[2/2] kafka git commit: KAFKA-5873; add materialized overloads to StreamsBuilder

Posted by da...@apache.org.
KAFKA-5873; add materialized overloads to StreamsBuilder

Add overloads for `table` and `globalTable` that use `Materialized`

Author: Damian Guy <da...@gmail.com>

Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3837 from dguy/kafka-5873


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f2b74aa1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f2b74aa1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f2b74aa1

Branch: refs/heads/trunk
Commit: f2b74aa1c36bf2882006c14f7cbd56b493f39d26
Parents: 52d7b67
Author: Damian Guy <da...@gmail.com>
Authored: Mon Sep 18 15:53:44 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Mon Sep 18 15:53:44 2017 +0100

----------------------------------------------------------------------
 .../examples/pageview/PageViewTypedDemo.java    |   7 +-
 .../examples/pageview/PageViewUntypedDemo.java  |   9 +-
 .../apache/kafka/streams/StreamsBuilder.java    | 700 +++----------------
 .../java/org/apache/kafka/streams/Topology.java |   2 +-
 .../internals/InternalStreamsBuilder.java       | 152 ++--
 .../streams/kstream/internals/KTableImpl.java   |   9 +-
 .../kstream/internals/MaterializedInternal.java |   7 +-
 .../internals/InternalTopologyBuilder.java      |   3 +-
 .../apache/kafka/streams/KafkaStreamsTest.java  |   3 +-
 .../streams/integration/EosIntegrationTest.java |  12 +-
 .../GlobalKTableIntegrationTest.java            |  13 +-
 .../integration/JoinIntegrationTest.java        |   4 +-
 .../KStreamKTableJoinIntegrationTest.java       |   3 +-
 .../KTableKTableJoinIntegrationTest.java        |   6 +-
 .../integration/RestoreIntegrationTest.java     |   3 +-
 .../internals/GlobalKTableJoinsTest.java        |   5 +-
 .../internals/InternalStreamsBuilderTest.java   |  87 ++-
 .../internals/KGroupedTableImplTest.java        |  25 +-
 .../kstream/internals/KStreamImplTest.java      |  11 +-
 .../internals/KStreamKStreamLeftJoinTest.java   |   2 +-
 .../internals/KStreamKTableJoinTest.java        |   7 +-
 .../internals/KStreamKTableLeftJoinTest.java    |   5 +-
 .../kstream/internals/KTableAggregateTest.java  |  22 +-
 .../kstream/internals/KTableFilterTest.java     |  28 +-
 .../kstream/internals/KTableForeachTest.java    |  12 +-
 .../kstream/internals/KTableImplTest.java       |  29 +-
 .../kstream/internals/KTableKTableJoinTest.java |  22 +-
 .../internals/KTableKTableLeftJoinTest.java     |  29 +-
 .../internals/KTableKTableOuterJoinTest.java    |  14 +-
 .../kstream/internals/KTableMapKeysTest.java    |   3 +-
 .../kstream/internals/KTableMapValuesTest.java  |  16 +-
 .../kstream/internals/KTableSourceTest.java     |  10 +-
 .../kafka/streams/perf/SimpleBenchmark.java     |  13 +-
 .../kafka/streams/perf/YahooBenchmark.java      |   3 +-
 .../internals/StreamsMetadataStateTest.java     |   8 +-
 .../kafka/streams/tests/SmokeTestClient.java    |  13 +-
 36 files changed, 443 insertions(+), 854 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 72f9be8..068eece 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -145,8 +146,8 @@ public class PageViewTypedDemo {
 
         KStream<String, PageView> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), pageViewSerde));
 
-        KTable<String, UserProfile> users = builder.table(Serdes.String(), userProfileSerde,
-            "streams-userprofile-input", "streams-userprofile-store-name");
+        KTable<String, UserProfile> users = builder.table("streams-userprofile-input",
+                                                          Consumed.with(Serdes.String(), userProfileSerde));
 
         KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
                 .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
@@ -190,7 +191,7 @@ public class PageViewTypedDemo {
                 });
 
         // write to the result topic
-        regionCount.to(wPageViewByRegionSerde, regionCountSerde, "streams-pageviewstats-typed-output");
+        regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde, regionCountSerde));
 
         KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index e8787af..c20c077 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -73,10 +74,10 @@ public class PageViewUntypedDemo {
         final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
         final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
 
-        KStream<String, JsonNode> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), jsonSerde));
+        final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
+        KStream<String, JsonNode> views = builder.stream("streams-pageview-input", consumed);
 
-        KTable<String, JsonNode> users = builder.table(Serdes.String(), jsonSerde,
-            "streams-userprofile-input", "streams-userprofile-store-name");
+        KTable<String, JsonNode> users = builder.table("streams-userprofile-input", consumed);
 
         KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
             @Override
@@ -121,7 +122,7 @@ public class PageViewUntypedDemo {
                 });
 
         // write to the result topic
-        regionCount.to(jsonSerde, jsonSerde, "streams-pageviewstats-untyped-output");
+        regionCount.to("streams-pageviewstats-untyped-output", Produced.with(jsonSerde, jsonSerde));
 
         KafkaStreams streams = new KafkaStreams(builder.build(), props);
         streams.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index a26822a..a272ec4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -16,28 +16,30 @@
  */
 package org.apache.kafka.streams;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Objects;
 import java.util.regex.Pattern;
 
 /**
@@ -129,6 +131,8 @@ public class StreamsBuilder {
      */
     public synchronized <K, V> KStream<K, V> stream(final Collection<String> topics,
                                                     final Consumed<K, V> consumed) {
+        Objects.requireNonNull(topics, "topics can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
         return internalStreamsBuilder.stream(topics, new ConsumedInternal<>(consumed));
     }
 
@@ -170,6 +174,8 @@ public class StreamsBuilder {
      */
     public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern,
                                                     final Consumed<K, V> consumed) {
+        Objects.requireNonNull(topicPattern, "topicPattern can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
         return internalStreamsBuilder.stream(topicPattern, new ConsumedInternal<>(consumed));
     }
 
@@ -182,11 +188,17 @@ public class StreamsBuilder {
      * Note that the specified input topic must be partitioned by key.
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} using the given
+     * {@code Materialized} instance.
      * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
+     * You should only specify serdes in the {@link Consumed} instance as these will also be used to overwrite the
+     * serdes in {@link Materialized}, i.e.,
+     * <pre> {@code
+     * streamBuilder.table(topic, Consumed.with(Serde.String(), Serde.String(), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
+     * }
+     * </pre>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
@@ -199,46 +211,20 @@ public class StreamsBuilder {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
      * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} this is the equivalent of {@link #table(String)}
+     * @param consumed           the instance of {@link Consumed} used to define optional parameters; cannot be {@code null}
+     * @param materialized       the instance of {@link Materialized} used to materialize a state store; cannot be {@code null}
      * @return a {@link KTable} for the specified topic
      */
     public synchronized <K, V> KTable<K, V> table(final String topic,
-                                                  final String queryableStoreName) {
-        return internalStreamsBuilder.table(topic, new ConsumedInternal<K, V>(), queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
-     * {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param topic         the topic name; cannot be {@code null}
-     * @param storeSupplier user defined state store supplier; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final String topic,
-                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return internalStreamsBuilder.table(null, null, null, null, topic, storeSupplier);
+                                                  final Consumed<K, V> consumed,
+                                                  final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
+        return internalStreamsBuilder.table(topic,
+                                            new ConsumedInternal<>(consumed),
+                                            new MaterializedInternal<>(materialized));
     }
 
     /**
@@ -259,7 +245,7 @@ public class StreamsBuilder {
      * @return a {@link KTable} for the specified topic
      */
     public synchronized <K, V> KTable<K, V> table(final String topic) {
-        return internalStreamsBuilder.table(topic, new ConsumedInternal<K, V>(), null);
+        return table(topic, new ConsumedInternal<K, V>());
     }
 
     /**
@@ -277,450 +263,47 @@ public class StreamsBuilder {
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      *
      * @param topic     the topic name; cannot be {@code null}
-     * @param consumed  the instance of {@link Consumed} used to define optional parameters
+     * @param consumed  the instance of {@link Consumed} used to define optional parameters; cannot be {@code null}
      * @return a {@link KTable} for the specified topic
      */
     public synchronized <K, V> KTable<K, V> table(final String topic,
                                                   final Consumed<K, V> consumed) {
-        return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), null);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                           offsets are available
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} this is the equivalent of
-     *                           {@link #table(String, Consumed)}
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
-        return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.<K, V>with(offsetReset)), queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@link TimestampExtractor} and default key and value deserializers
-     * as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset   the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                      offsets are available
-     * @param topic         the topic name; cannot be {@code null}
-     * @param storeSupplier user defined state store supplier; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final String topic,
-                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return internalStreamsBuilder.table(offsetReset, null, null, null, topic, storeSupplier);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers
-     * as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
-        return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.<K, V>with(timestampExtractor)), queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                           offsets are available
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final TimestampExtractor timestampExtractor,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
-        final Consumed<K, V> consumed = Consumed.<K, V>with(offsetReset).withTimestampExtractor(timestampExtractor);
-        return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
-     * as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
         return internalStreamsBuilder.table(topic,
-                                            new ConsumedInternal<>(keySerde, valueSerde, null, null),
-                                            queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param keySerde      key serde used to send key-value pairs,
-     *                      if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde    value serde used to send key-value pairs,
-     *                      if not specified the default value serde defined in the configuration will be used
-     * @param topic         the topic name; cannot be {@code null}
-     * @param storeSupplier user defined state store supplier; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic,
-                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, storeSupplier);
+                                            new ConsumedInternal<>(consumed),
+                                            new MaterializedInternal<>(
+                                                    Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(
+                                                            internalStreamsBuilder.newStoreName(topic))
+                                                    .withKeySerde(consumed.keySerde)
+                                                    .withValueSerde(consumed.valueSerde),
+                                                    false));
     }
 
     /**
      * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
+     * {@link StreamsConfig config} are used.
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * Note that the specified input topics must be partitioned by key.
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
-     *                           committed offsets are available
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
-        final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(keySerde, valueSerde, null, offsetReset);
-        return internalStreamsBuilder.table(topic, consumed, queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
-                                                  final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
-        final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, null);
-        return internalStreamsBuilder.table(topic, consumed, queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topic must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} using the {@link Materialized} instance.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
-     *                           committed offsets are available
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+     * @param topic         the topic name; cannot be {@code null}
+     * @param materialized  the instance of {@link Materialized} used to materialize a state store; cannot be {@code null}
      * @return a {@link KTable} for the specified topic
      */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final TimestampExtractor timestampExtractor,
-                                                  final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic,
-                                                  final String queryableStoreName) {
+    public synchronized <K, V> KTable<K, V> table(final String topic,
+                                                  final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
         return internalStreamsBuilder.table(topic,
-                                            new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, offsetReset),
-                                            queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                           offsets are available
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param storeSupplier      user defined state store supplier; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final TimestampExtractor timestampExtractor,
-                                                  final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic,
-                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return internalStreamsBuilder.table(offsetReset, timestampExtractor, keySerde, valueSerde, topic, storeSupplier);
-    }
-
-    /**
-     * Create a {@link GlobalKTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key);
-     * }</pre>
-     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
-     *
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link GlobalKTable} for the specified topic
-     */
-    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
-                                                              final String queryableStoreName) {
-        return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<K, V>(), queryableStoreName);
+                                            new ConsumedInternal<K, V>(),
+                                            new MaterializedInternal<>(materialized));
     }
 
     /**
@@ -741,7 +324,15 @@ public class StreamsBuilder {
      */
     public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
                                                               final Consumed<K, V> consumed) {
-        return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), null);
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized =
+                new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(
+                        internalStreamsBuilder.newStoreName(topic))
+                                                   .withKeySerde(consumed.keySerde)
+                                                   .withValueSerde(consumed.valueSerde),
+                                           false);
+        return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), materialized);
     }
 
     /**
@@ -761,61 +352,25 @@ public class StreamsBuilder {
      * @return a {@link GlobalKTable} for the specified topic
      */
     public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic) {
-        return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<K, V>(), null);
+        return globalTable(topic, Consumed.<K, V>with(null, null));
     }
 
     /**
      * Create a {@link GlobalKTable} for the specified topic.
-     * The default {@link TimestampExtractor} and default key and value deserializers as specified in
-     * the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
-     * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
-     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * To query the local {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
-     * String key = "some-key";
-     * Long valueForKey = localStore.get(key);
-     * }</pre>
-     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
      *
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
-     * @return a {@link GlobalKTable} for the specified topic
-     */
-    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                              final Serde<V> valueSerde,
-                                                              final TimestampExtractor timestampExtractor,
-                                                              final String topic,
-                                                              final String queryableStoreName) {
-        return internalStreamsBuilder.globalTable(topic,
-                                                  new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, null),
-                                                  queryableStoreName);
-    }
-
-    /**
-     * Create a {@link GlobalKTable} for the specified topic.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      * Input {@link KeyValue} pairs with {@code null} key will be dropped.
      * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} configured with
+     * the provided instance of {@link Materialized}.
      * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
+     * You should only specify serdes in the {@link Consumed} instance as these will also be used to overwrite the
+     * serdes in {@link Materialized}, i.e.,
+     * <pre> {@code
+     * streamBuilder.globalTable(topic, Consumed.with(Serde.String(), Serde.String(), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
+     * }
+     * </pre>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
@@ -827,28 +382,31 @@ public class StreamsBuilder {
      * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
      * regardless of the specified value in {@link StreamsConfig}.
      *
-     * @param keySerde      key serde used to send key-value pairs,
-     *                      if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde    value serde used to send key-value pairs,
-     *                      if not specified the default value serde defined in the configuration will be used
      * @param topic         the topic name; cannot be {@code null}
-     * @param storeSupplier user defined state store supplier; Cannot be {@code null}
+     * @param consumed      the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
+     * @param materialized   the instance of {@link Materialized} used to materialize a state store; cannot be {@code null}
      * @return a {@link GlobalKTable} for the specified topic
      */
-    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                              final Serde<V> valueSerde,
-                                                              final String topic,
-                                                              final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return internalStreamsBuilder.globalTable(keySerde, valueSerde, topic, storeSupplier);
+    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
+                                                              final Consumed<K, V> consumed,
+                                                              final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        // always use the serdes from consumed
+        materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
+        return internalStreamsBuilder.globalTable(topic,
+                                                  new ConsumedInternal<>(consumed),
+                                                  new MaterializedInternal<>(materialized));
     }
 
     /**
      * Create a {@link GlobalKTable} for the specified topic.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
      * Input {@link KeyValue} pairs with {@code null} key will be dropped.
      * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code queryableStoreName}.
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} configured with
+     * the provided instance of {@link Materialized}.
      * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -863,34 +421,30 @@ public class StreamsBuilder {
      * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
      * regardless of the specified value in {@link StreamsConfig}.
      *
-     * @param keySerde           key serde used to send key-value pairs,
-     *                           if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde         value serde used to send key-value pairs,
-     *                           if not specified the default value serde defined in the configuration will be used
-     * @param topic              the topic name; cannot be {@code null}
-     * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+     * @param topic         the topic name; cannot be {@code null}
+     * @param materialized   the instance of {@link Materialized} used to materialize a state store; cannot be {@code null}
      * @return a {@link GlobalKTable} for the specified topic
      */
-    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                              final Serde<V> valueSerde,
-                                                              final String topic,
-                                                              final String queryableStoreName) {
+    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
+                                                              final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
         return internalStreamsBuilder.globalTable(topic,
-                                                  new ConsumedInternal<>(Consumed.with(keySerde, valueSerde)),
-                                                  queryableStoreName);
+                                                  new ConsumedInternal<K, V>(),
+                                                  new MaterializedInternal<>(materialized));
     }
 
+
     /**
      * Adds a state store to the underlying {@link Topology}.
      *
-     * @param supplier the supplier used to obtain this state store {@link StateStore} instance
-     * @param processorNames the names of the processors that should be able to access the provided store
+     * @param builder the builder used to obtain this state store {@link StateStore} instance
      * @return itself
      * @throws TopologyException if state store supplier is already added
      */
-    public synchronized StreamsBuilder addStateStore(final StateStoreSupplier supplier,
-                                                     final String... processorNames) {
-        internalStreamsBuilder.addStateStore(supplier, processorNames);
+    public synchronized StreamsBuilder addStateStore(final StoreBuilder builder) {
+        Objects.requireNonNull(builder, "builder can't be null");
+        internalStreamsBuilder.addStateStore(builder);
         return this;
     }
 
@@ -907,62 +461,30 @@ public class StreamsBuilder {
      * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
      * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      *
-     * @param storeSupplier         user defined state store supplier
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
      * @param sourceName            name of the {@link SourceNode} that will be automatically added
-     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
      * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
      * @param processorName         the name of the {@link ProcessorSupplier}
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already registered
      */
-    public synchronized StreamsBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
-                                                      final String sourceName,
-                                                      final Deserializer keyDeserializer,
-                                                      final Deserializer valueDeserializer,
+    @SuppressWarnings("unchecked")
+    public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder,
                                                       final String topic,
-                                                      final String processorName,
-                                                      final ProcessorSupplier stateUpdateSupplier) {
-        internalStreamsBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
-            valueDeserializer, topic, processorName, stateUpdateSupplier);
-        return this;
-    }
-
-    /**
-     * Adds a global {@link StateStore} to the topology.
-     * The {@link StateStore} sources its data from all partitions of the provided input topic.
-     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
-     * <p>
-     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
-     * of the input topic.
-     * <p>
-     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
-     * records forwarded from the {@link SourceNode}.
-     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
-     *
-     * @param storeSupplier         user defined state store supplier
-     * @param sourceName            name of the {@link SourceNode} that will be automatically added
-     * @param timestampExtractor    the stateless timestamp extractor used for this source,
-     *                              if not specified the default extractor defined in the configs will be used
-     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
-     * @param topic                 the topic to source the data from
-     * @param processorName         the name of the {@link ProcessorSupplier}
-     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
-     * @return itself
-     * @throws TopologyException if the processor of state is already registered
-     */
-    public synchronized StreamsBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
                                                       final String sourceName,
-                                                      final TimestampExtractor timestampExtractor,
-                                                      final Deserializer keyDeserializer,
-                                                      final Deserializer valueDeserializer,
-                                                      final String topic,
+                                                      final Consumed consumed,
                                                       final String processorName,
                                                       final ProcessorSupplier stateUpdateSupplier) {
-        internalStreamsBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
-            valueDeserializer, topic, processorName, stateUpdateSupplier);
+        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(storeBuilder,
+                                              sourceName,
+                                              topic,
+                                              new ConsumedInternal<>(consumed),
+                                              processorName,
+                                              stateUpdateSupplier);
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 386aacf..85d769f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -571,7 +571,7 @@ public class Topology {
      * @return itself
      * @throws TopologyException if the processor of state is already registered
      */
-    public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder,
+    public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
                                                 final String sourceName,
                                                 final Deserializer keyDeserializer,
                                                 final Deserializer valueDeserializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 4da9906..3963657 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -17,17 +17,15 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.Topology;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -72,37 +70,55 @@ public class InternalStreamsBuilder {
         return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
 
-    @SuppressWarnings("unchecked")
     public <K, V> KTable<K, V> table(final String topic,
                                      final ConsumedInternal<K, V> consumed,
-                                     final String queryableStoreName) {
-        final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
-        final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
-            consumed.keySerde(),
-            consumed.valueSerde(),
-            false,
-            Collections.<String, String>emptyMap(),
-            true);
-        return doTable(consumed, topic, storeSupplier, queryableStoreName != null);
-    }
-
-    public <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                     final TimestampExtractor timestampExtractor,
-                                     final Serde<K> keySerde,
-                                     final Serde<V> valSerde,
-                                     final String topic,
                                      final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        return doTable(new ConsumedInternal<>(keySerde, valSerde, timestampExtractor, offsetReset), topic, storeSupplier, true);
+        final String source = newName(KStreamImpl.SOURCE_NAME);
+        final String name = newName(KTableImpl.SOURCE_NAME);
+
+        final KTable<K, V> kTable = createKTable(consumed,
+                                                 topic,
+                                                 storeSupplier.name(),
+                                                 true,
+                                                 source,
+                                                 name);
+
+        internalTopologyBuilder.addStateStore(storeSupplier, name);
+        internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic);
+
+        return kTable;
     }
 
-    private <K, V> KTable<K, V> doTable(final ConsumedInternal<K, V> consumed,
-                                        final String topic,
-                                        final StateStoreSupplier<KeyValueStore> storeSupplier,
-                                        final boolean isQueryable) {
+    @SuppressWarnings("unchecked")
+    public <K, V> KTable<K, V> table(final String topic,
+                                     final ConsumedInternal<K, V> consumed,
+                                     final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
+
         final String source = newName(KStreamImpl.SOURCE_NAME);
         final String name = newName(KTableImpl.SOURCE_NAME);
-        final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
+        final KTable<K, V> kTable = createKTable(consumed,
+                                                 topic,
+                                                 storeBuilder.name(),
+                                                 materialized.isQueryable(),
+                                                 source,
+                                                 name);
+
+        internalTopologyBuilder.addStateStore(storeBuilder, name);
+        internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
+
+        return kTable;
+    }
+
+
+    private <K, V> KTable<K, V> createKTable(final ConsumedInternal<K, V> consumed,
+                                             final String topic,
+                                             final String storeName,
+                                             final boolean isQueryable,
+                                             final String source,
+                                             final String name) {
+        final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName);
 
         internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
                                           source,
@@ -112,48 +128,27 @@ public class InternalStreamsBuilder {
                                           topic);
         internalTopologyBuilder.addProcessor(name, processorSupplier, source);
 
-        final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
-            consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeSupplier.name(), isQueryable);
-
-        internalTopologyBuilder.addStateStore(storeSupplier, name);
-        internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic);
-
-        return kTable;
+        return new KTableImpl<>(this, name, processorSupplier,
+                                consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeName, isQueryable);
     }
 
     public <K, V> GlobalKTable<K, V> globalTable(final String topic,
                                                  final ConsumedInternal<K, V> consumed,
-                                                 final String queryableStoreName) {
-        final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
-        return doGlobalTable(consumed, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
-            consumed.keySerde(),
-            consumed.valueSerde(),
-            false,
-            Collections.<String, String>emptyMap(),
-            true));
-    }
-
-    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                 final Serde<V> valSerde,
-                                                 final String topic,
-                                                 final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return doGlobalTable(new ConsumedInternal<>(keySerde, valSerde, null, null), topic, storeSupplier);
-    }
-
-    @SuppressWarnings("unchecked")
-    private <K, V> GlobalKTable<K, V> doGlobalTable(final ConsumedInternal<K, V> consumed,
-                                                    final String topic,
-                                                    final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+                                                 final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        // explicitly disable logging for global stores
+        materialized.withLoggingDisabled();
+        final StoreBuilder storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
         final String sourceName = newName(KStreamImpl.SOURCE_NAME);
         final String processorName = newName(KTableImpl.SOURCE_NAME);
-        final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
+        final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name());
 
 
         final Deserializer<K> keyDeserializer = consumed.keySerde() == null ? null : consumed.keySerde().deserializer();
         final Deserializer<V> valueDeserializer = consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer();
 
-        internalTopologyBuilder.addGlobalStore(storeSupplier,
+        internalTopologyBuilder.addGlobalStore(storeBuilder,
                                                sourceName,
                                                consumed.timestampExtractor(),
                                                keyDeserializer,
@@ -161,9 +156,10 @@ public class InternalStreamsBuilder {
                                                topic,
                                                processorName,
                                                tableSource);
-        return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
+        return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
     }
 
+
     public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
         return KStreamImpl.merge(this, streams);
     }
@@ -172,35 +168,31 @@ public class InternalStreamsBuilder {
         return prefix + String.format("%010d", index.getAndIncrement());
     }
 
-    String newStoreName(final String prefix) {
+    public String newStoreName(final String prefix) {
         return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
     }
 
-    public synchronized void addStateStore(final StateStoreSupplier supplier,
-                                           final String... processorNames) {
-        internalTopologyBuilder.addStateStore(supplier, processorNames);
-    }
-
-    public synchronized void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
-                                            final String sourceName,
-                                            final Deserializer keyDeserializer,
-                                            final Deserializer valueDeserializer,
-                                            final String topic,
-                                            final String processorName,
-                                            final ProcessorSupplier stateUpdateSupplier) {
-        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
-            valueDeserializer, topic, processorName, stateUpdateSupplier);
+    public synchronized void addStateStore(final StoreBuilder builder) {
+        internalTopologyBuilder.addStateStore(builder);
     }
 
-    public synchronized void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+    public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
                                             final String sourceName,
-                                            final TimestampExtractor timestampExtractor,
-                                            final Deserializer keyDeserializer,
-                                            final Deserializer valueDeserializer,
                                             final String topic,
+                                            final ConsumedInternal consumed,
                                             final String processorName,
                                             final ProcessorSupplier stateUpdateSupplier) {
-        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
-            valueDeserializer, topic, processorName, stateUpdateSupplier);
+        // explicitly disable logging for global stores
+        storeBuilder.withLoggingDisabled();
+        final Deserializer keyDeserializer = consumed.keySerde() == null ? null : consumed.keySerde().deserializer();
+        final Deserializer valueDeserializer = consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer();
+        internalTopologyBuilder.addGlobalStore(storeBuilder,
+                                               sourceName,
+                                               consumed.timestampExtractor(),
+                                               keyDeserializer,
+                                               valueDeserializer,
+                                               topic,
+                                               processorName,
+                                               stateUpdateSupplier);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 067bcfc..a42db0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -401,7 +402,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         return builder.table(topic,
                              new ConsumedInternal<>(keySerde, valSerde, new FailOnInvalidTimestamp(), null),
-                             internalStoreName);
+                             new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(internalStoreName)
+                                     .withKeySerde(keySerde)
+                                     .withValueSerde(valSerde),
+                                     queryableStoreName != null));
     }
 
     @Override
@@ -413,7 +417,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         to(keySerde, valSerde, partitioner, topic);
 
-        return builder.table(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic, storeSupplier);
+        final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(Consumed.with(keySerde, valSerde, new FailOnInvalidTimestamp(), null));
+        return builder.table(topic, consumed, storeSupplier);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 0ee610f..9f186fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -30,8 +30,9 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
     public MaterializedInternal(final Materialized<K, V, S> materialized) {
         this(materialized, true);
     }
-
-    MaterializedInternal(final Materialized<K, V, S> materialized, final boolean queryable) {
+    
+    public MaterializedInternal(final Materialized<K, V, S> materialized,
+                                final boolean queryable) {
         super(materialized);
         this.queryable = queryable;
     }
@@ -67,7 +68,7 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
         return cachingEnabled;
     }
 
-    public boolean isQueryable() {
+    boolean isQueryable() {
         return queryable;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 193d0e1..d47af88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
 import org.slf4j.Logger;
@@ -557,7 +556,7 @@ public class InternalTopologyBuilder {
     }
 
 
-    public final void addGlobalStore(final KeyValueStoreBuilder storeBuilder,
+    public final void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
                                      final String sourceName,
                                      final TimestampExtractor timestampExtractor,
                                      final Deserializer keyDeserializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 73c5484..f1ae6da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -505,7 +505,8 @@ public class KafkaStreamsTest {
         CLUSTER.createTopic(topic);
         final StreamsBuilder builder = new StreamsBuilder();
 
-        builder.table(Serdes.String(), Serdes.String(), topic, topic);
+        final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
+        builder.table(topic, consumed);
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final CountDownLatch latch = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index e50f4d0..6c7b2b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -32,11 +32,11 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -583,13 +583,11 @@ public class EosIntegrationTest {
         String[] storeNames = null;
         if (withState) {
             storeNames = new String[] {storeName};
-            final StateStoreSupplier storeSupplier = Stores.create(storeName)
-                .withLongKeys()
-                .withLongValues()
-                .persistent()
-                .build();
+            final StoreBuilder<KeyValueStore<Long, Long>> storeBuilder
+                    = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.Long(), Serdes.Long())
+                    .withCachingEnabled();
 
-            builder.addStateStore(storeSupplier);
+            builder.addStateStore(storeBuilder);
         }
 
         final KStream<Long, Long> input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);