You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/06 22:38:27 UTC

[2/2] kafka git commit: KAFKA-5651; Follow-up: add with method to Materialized

KAFKA-5651; Follow-up: add with method to Materialized

Add a `with(Serde keySerde, Serde valSerde)` to `Materialized` for cases where people don't care about the state store name.

Author: Damian Guy <da...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Matthias J. Sax <ma...@confluent.io>

Closes #4009 from dguy/materialized


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23a01405
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23a01405
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23a01405

Branch: refs/heads/trunk
Commit: 23a014052d39521a3af471b3f95809c2164820f7
Parents: 105ab47
Author: Damian Guy <da...@gmail.com>
Authored: Fri Oct 6 15:38:23 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 6 15:38:23 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/StreamsBuilder.java    | 31 ++++----
 .../kafka/streams/kstream/KGroupedStream.java   |  6 +-
 .../kafka/streams/kstream/KGroupedTable.java    |  2 +-
 .../kafka/streams/kstream/KStreamBuilder.java   |  2 +-
 .../kafka/streams/kstream/Materialized.java     | 18 +++++
 .../GroupedStreamAggregateBuilder.java          |  2 +-
 .../kstream/internals/InternalNameProvider.java | 23 ++++++
 .../internals/InternalStreamsBuilder.java       | 25 ++++---
 .../kstream/internals/KGroupedStreamImpl.java   | 14 ++--
 .../kstream/internals/KGroupedTableImpl.java    | 34 +++++----
 .../streams/kstream/internals/KStreamImpl.java  | 54 +++++++-------
 .../streams/kstream/internals/KTableImpl.java   | 50 +++++++------
 .../internals/KeyValueStoreMaterializer.java    |  6 +-
 .../kstream/internals/MaterializedInternal.java | 21 +++---
 .../internals/SessionWindowedKStreamImpl.java   | 11 ++-
 .../internals/TimeWindowedKStreamImpl.java      | 14 ++--
 .../KStreamAggregationIntegrationTest.java      |  6 +-
 .../kstream/internals/AbstractStreamTest.java   |  2 +-
 .../internals/InternalStreamsBuilderTest.java   | 40 ++++++-----
 .../kstream/internals/KTableForeachTest.java    |  6 +-
 .../internals/MaterializedInternalTest.java     | 76 ++++++++++++++++++++
 .../SessionWindowedKStreamImplTest.java         | 16 ++++-
 .../internals/TimeWindowedKStreamImplTest.java  |  2 +-
 .../KeyValueStoreMaterializerTest.java          | 23 ++++--
 24 files changed, 330 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 94d19ae..b5cc6d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -224,7 +224,7 @@ public class StreamsBuilder {
         materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
         return internalStreamsBuilder.table(topic,
                                             new ConsumedInternal<>(consumed),
-                                            new MaterializedInternal<>(materialized));
+                                            new MaterializedInternal<>(materialized, internalStreamsBuilder, topic));
     }
 
     /**
@@ -273,11 +273,9 @@ public class StreamsBuilder {
         return internalStreamsBuilder.table(topic,
                                             new ConsumedInternal<>(consumed),
                                             new MaterializedInternal<>(
-                                                    Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(
-                                                            internalStreamsBuilder.newStoreName(topic))
-                                                    .withKeySerde(consumed.keySerde)
-                                                    .withValueSerde(consumed.valueSerde),
-                                                    false));
+                                                    Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde),
+                                                    internalStreamsBuilder,
+                                                    topic));
     }
 
     /**
@@ -301,11 +299,12 @@ public class StreamsBuilder {
                                                   final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
+                = new MaterializedInternal<>(materialized, internalStreamsBuilder, topic);
         return internalStreamsBuilder.table(topic,
                                             new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
                                                                                  materializedInternal.valueSerde())),
-                                            new MaterializedInternal<>(materialized));
+                                            materializedInternal);
     }
 
     /**
@@ -329,11 +328,12 @@ public class StreamsBuilder {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized =
-                new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(
-                        internalStreamsBuilder.newStoreName(topic))
-                                                   .withKeySerde(consumed.keySerde)
-                                                   .withValueSerde(consumed.valueSerde),
-                                           false);
+                new MaterializedInternal<>(
+                        Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde),
+                        internalStreamsBuilder,
+                        topic);
+
+
         return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), materialized);
     }
 
@@ -399,7 +399,7 @@ public class StreamsBuilder {
         materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
         return internalStreamsBuilder.globalTable(topic,
                                                   new ConsumedInternal<>(consumed),
-                                                  new MaterializedInternal<>(materialized));
+                                                  new MaterializedInternal<>(materialized, internalStreamsBuilder, topic));
     }
 
     /**
@@ -431,7 +431,8 @@ public class StreamsBuilder {
                                                               final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized, internalStreamsBuilder, topic);
         return internalStreamsBuilder.globalTable(topic,
                                                   new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
                                                                                        materializedInternal.valueSerde())),

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index b3945f7..6347960 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -1173,7 +1173,7 @@ public interface KGroupedStream<K, V> {
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.with(null, aggValueSerde))}
      */
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -1346,7 +1346,7 @@ public interface KGroupedStream<K, V> {
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
      * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
-     * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)} aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
+     * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)} aggregate(initializer, aggregator, Materialized.with(null, aggValueSerde))}
      */
     @Deprecated
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
@@ -1509,7 +1509,7 @@ public interface KGroupedStream<K, V> {
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
      * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
-     * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
+     * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.with(null, aggValueSerde))}
      */
     @Deprecated
     <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index d0a38cc..4d2bb29 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -855,7 +855,7 @@ public interface KGroupedTable<K, V> {
      * @param <VR>          the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
-     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.with(null, aggValueSerde))}
      */
     @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 6b51c86..77745d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -1251,7 +1251,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * @return a new unique name
      */
     public String newName(final String prefix) {
-        return internalStreamsBuilder.newName(prefix);
+        return internalStreamsBuilder.newProcessorName(prefix);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index dd7165c..48dd12e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -139,6 +139,24 @@ public class Materialized<K, V, S extends StateStore> {
     }
 
     /**
+     * Materialize a {@link StateStore} with the provided key and value {@link Serde}s.
+     * An internal name will be used for the store.
+     *
+     * @param keySerde      the key {@link Serde} to use. If the {@link Serde} is null, then the default key
+     *                      serde from configs will be used
+     * @param valueSerde    the value {@link Serde} to use. If the {@link Serde} is null, then the default value
+     *                      serde from configs will be used
+     * @param <K>           key type
+     * @param <V>           value type
+     * @param <S>           store type
+     * @return a new {@link Materialized} instance with the given key and value serdes
+     */
+    public static <K, V, S extends StateStore> Materialized<K, V, S> with(final Serde<K> keySerde,
+                                                                          final Serde<V> valueSerde) {
+        return new Materialized<K, V, S>((String) null).withKeySerde(keySerde).withValueSerde(valueSerde);
+    }
+
+    /**
      * Set the valueSerde the materialized {@link StateStore} will use.
      *
      * @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 6fb7a35..e4429cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -65,7 +65,7 @@ class GroupedStreamAggregateBuilder<K, V> {
                            final String functionName,
                            final StoreBuilder storeBuilder,
                            final boolean isQueryable) {
-        final String aggFunctionName = builder.newName(functionName);
+        final String aggFunctionName = builder.newProcessorName(functionName);
         final String sourceName = repartitionIfRequired(storeBuilder.name());
         builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
         builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java
new file mode 100644
index 0000000..8d8ebfc
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+public interface InternalNameProvider {
+    String newProcessorName(String prefix);
+
+    String newStoreName(String prefix);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 0df1524..4308e5d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -32,7 +32,7 @@ import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
-public class InternalStreamsBuilder {
+public class InternalStreamsBuilder implements InternalNameProvider {
 
     final InternalTopologyBuilder internalTopologyBuilder;
 
@@ -44,7 +44,7 @@ public class InternalStreamsBuilder {
 
     public <K, V> KStream<K, V> stream(final Collection<String> topics,
                                        final ConsumedInternal<K, V> consumed) {
-        final String name = newName(KStreamImpl.SOURCE_NAME);
+        final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
 
         internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
                                           name,
@@ -57,7 +57,7 @@ public class InternalStreamsBuilder {
     }
 
     public <K, V> KStream<K, V> stream(final Pattern topicPattern, final ConsumedInternal<K, V> consumed) {
-        final String name = newName(KStreamImpl.SOURCE_NAME);
+        final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
 
         internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
                                           name,
@@ -74,8 +74,8 @@ public class InternalStreamsBuilder {
                                      final ConsumedInternal<K, V> consumed,
                                      final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        final String source = newName(KStreamImpl.SOURCE_NAME);
-        final String name = newName(KTableImpl.SOURCE_NAME);
+        final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
+        final String name = newProcessorName(KTableImpl.SOURCE_NAME);
 
         final KTable<K, V> kTable = createKTable(consumed,
                                                  topic,
@@ -94,10 +94,11 @@ public class InternalStreamsBuilder {
     public <K, V> KTable<K, V> table(final String topic,
                                      final ConsumedInternal<K, V> consumed,
                                      final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
-        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
+        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized)
+                .materialize();
 
-        final String source = newName(KStreamImpl.SOURCE_NAME);
-        final String name = newName(KTableImpl.SOURCE_NAME);
+        final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
+        final String name = newProcessorName(KTableImpl.SOURCE_NAME);
         final KTable<K, V> kTable = createKTable(consumed,
                                                  topic,
                                                  storeBuilder.name(),
@@ -141,8 +142,8 @@ public class InternalStreamsBuilder {
         // explicitly disable logging for global stores
         materialized.withLoggingDisabled();
         final StoreBuilder storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
-        final String sourceName = newName(KStreamImpl.SOURCE_NAME);
-        final String processorName = newName(KTableImpl.SOURCE_NAME);
+        final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
+        final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
         final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name());
 
 
@@ -160,10 +161,12 @@ public class InternalStreamsBuilder {
         return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
     }
 
-    public String newName(final String prefix) {
+    @Override
+    public String newProcessorName(final String prefix) {
         return prefix + String.format("%010d", index.getAndIncrement());
     }
 
+    @Override
     public String newStoreName(final String prefix) {
         return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index dafaa62..45ae7da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -109,7 +109,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized);
+                = new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
         return doAggregate(
                 new KStreamReduce<K, V>(materializedInternal.storeName(), reducer),
                 REDUCE_NAME,
@@ -171,7 +171,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                                      final Aggregator<? super K, ? super V, VR> aggregator,
                                                      final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
         final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized);
+                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
         return doAggregate(
                 new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
                 AGGREGATE_NAME,
@@ -183,10 +183,10 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                         final Aggregator<? super K, ? super V, VR> aggregator) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
-        final String storeName = builder.newStoreName(AGGREGATE_NAME);
-
         MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
-                new MaterializedInternal<>(Materialized.<K, VR, KeyValueStore<Bytes, byte[]>>as(storeName), false);
+                new MaterializedInternal<>(Materialized.<K, VR, KeyValueStore<Bytes, byte[]>>with(keySerde, null),
+                                           builder,
+                                           AGGREGATE_NAME);
         return doAggregate(new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
                            AGGREGATE_NAME,
                            materializedInternal);
@@ -277,7 +277,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized);
+                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
         if (materializedInternal.valueSerde() == null) {
             materialized.withValueSerde(Serdes.Long());
         }
@@ -495,7 +495,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
             final String functionName,
             final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier) {
 
-        final String aggFunctionName = builder.newName(functionName);
+        final String aggFunctionName = builder.newProcessorName(functionName);
 
         final String sourceName = repartitionIfRequired(storeSupplier.name());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 507944a..d5a4e71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Collections;
@@ -46,8 +47,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
-    protected final Serde<? extends K> keySerde;
-    protected final Serde<? extends V> valSerde;
+    protected final Serde<K> keySerde;
+    protected final Serde<V> valSerde;
     private boolean isQueryable = true;
     private final Initializer<Long> countInitializer = new Initializer<Long>() {
         @Override
@@ -73,8 +74,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     KGroupedTableImpl(final InternalStreamsBuilder builder,
                       final String name,
                       final String sourceName,
-                      final Serde<? extends K> keySerde,
-                      final Serde<? extends V> valSerde) {
+                      final Serde<K> keySerde,
+                      final Serde<V> valSerde) {
         super(builder, name, Collections.singleton(sourceName));
         this.keySerde = keySerde;
         this.valSerde = valSerde;
@@ -141,10 +142,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     @SuppressWarnings("deprecation")
     private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
                                          final String functionName,
-                                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
-        final String sinkName = builder.newName(KStreamImpl.SINK_NAME);
-        final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
-        final String funcName = builder.newName(functionName);
+                                         final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME);
+        final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME);
+        final String funcName = builder.newProcessorName(functionName);
 
         buildAggregate(aggregateSupplier,
                        storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX,
@@ -184,15 +185,16 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
                                          final String functionName,
                                          final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
-        final String sinkName = builder.newName(KStreamImpl.SINK_NAME);
-        final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
-        final String funcName = builder.newName(functionName);
+        final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME);
+        final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME);
+        final String funcName = builder.newProcessorName(functionName);
 
         buildAggregate(aggregateSupplier,
                        materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX,
                        funcName,
                        sourceName, sinkName);
-        builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized).materialize(), funcName);
+        builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized)
+                                                              .materialize(), funcName);
 
         // return the KTable representation with the intermediate topic as the sources
         return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), materialized.storeName(), isQueryable);
@@ -215,7 +217,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         Objects.requireNonNull(subtractor, "subtractor can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized);
+                = new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
         final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(materializedInternal.storeName(),
                                                                                      adder,
                                                                                      subtractor);
@@ -255,6 +257,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
                          materialized);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                         final Aggregator<? super K, ? super V, VR> adder,
@@ -265,7 +268,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         Objects.requireNonNull(subtractor, "subtractor can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
-                new MaterializedInternal<>(materialized);
+                new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
         final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(materializedInternal.storeName(),
                                                                                         initializer,
                                                                                         adder,

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index cbbe848..8e80315 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -123,7 +123,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public KStream<K, V> filter(final Predicate<? super K, ? super V> predicate) {
         Objects.requireNonNull(predicate, "predicate can't be null");
-        String name = builder.newName(FILTER_NAME);
+        String name = builder.newProcessorName(FILTER_NAME);
 
         builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
 
@@ -133,7 +133,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
         Objects.requireNonNull(predicate, "predicate can't be null");
-        String name = builder.newName(FILTER_NAME);
+        String name = builder.newProcessorName(FILTER_NAME);
 
         builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
 
@@ -147,7 +147,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     private <K1> String internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
-        String name = builder.newName(KEY_SELECT_NAME);
+        String name = builder.newProcessorName(KEY_SELECT_NAME);
         builder.internalTopologyBuilder.addProcessor(
             name,
             new KStreamMap<>(
@@ -166,7 +166,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <K1, V1> KStream<K1, V1> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        String name = builder.newName(MAP_NAME);
+        String name = builder.newProcessorName(MAP_NAME);
 
         builder.internalTopologyBuilder.addProcessor(name, new KStreamMap<>(mapper), this.name);
 
@@ -177,7 +177,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <V1> KStream<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        String name = builder.newName(MAPVALUES_NAME);
+        String name = builder.newProcessorName(MAPVALUES_NAME);
 
         builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
 
@@ -247,7 +247,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public void print(final Printed<K, V> printed) {
         Objects.requireNonNull(printed, "printed can't be null");
         final PrintedInternal<K, V> printedInternal = new PrintedInternal<>(printed);
-        final String name = builder.newName(PRINTING_NAME);
+        final String name = builder.newProcessorName(PRINTING_NAME);
         builder.internalTopologyBuilder.addProcessor(name, printedInternal.build(this.name), this.name);
     }
 
@@ -320,7 +320,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <K1, V1> KStream<K1, V1> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        String name = builder.newName(FLATMAP_NAME);
+        String name = builder.newProcessorName(FLATMAP_NAME);
 
         builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
 
@@ -330,7 +330,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <V1> KStream<K, V1> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        String name = builder.newName(FLATMAPVALUES_NAME);
+        String name = builder.newProcessorName(FLATMAPVALUES_NAME);
 
         builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
 
@@ -346,13 +346,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         for (final Predicate<? super K, ? super V> predicate : predicates) {
             Objects.requireNonNull(predicate, "predicates can't have null values");
         }
-        String branchName = builder.newName(BRANCH_NAME);
+        String branchName = builder.newProcessorName(BRANCH_NAME);
 
         builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
 
         KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
         for (int i = 0; i < predicates.length; i++) {
-            String childName = builder.newName(BRANCHCHILD_NAME);
+            String childName = builder.newProcessorName(BRANCHCHILD_NAME);
 
             builder.internalTopologyBuilder.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
 
@@ -371,7 +371,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     private KStream<K, V> merge(final InternalStreamsBuilder builder,
                                 final KStream<K, V> stream) {
         KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) stream;
-        String name = builder.newName(MERGE_NAME);
+        String name = builder.newProcessorName(MERGE_NAME);
         String[] parentNames = {this.name, streamImpl.name};
         Set<String> allSourceNodes = new HashSet<>();
 
@@ -406,7 +406,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public void foreach(final ForeachAction<? super K, ? super V> action) {
         Objects.requireNonNull(action, "action can't be null");
-        String name = builder.newName(FOREACH_NAME);
+        String name = builder.newProcessorName(FOREACH_NAME);
 
         builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<>(action, false), this.name);
     }
@@ -414,7 +414,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action) {
         Objects.requireNonNull(action, "action can't be null");
-        final String name = builder.newName(PEEK_NAME);
+        final String name = builder.newProcessorName(PEEK_NAME);
 
         builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<>(action, true), this.name);
 
@@ -482,7 +482,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @SuppressWarnings("unchecked")
     private void to(final String topic, final ProducedInternal<K, V> produced) {
-        final String name = builder.newName(SINK_NAME);
+        final String name = builder.newProcessorName(SINK_NAME);
         final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer();
         final Serializer<V> valSerializer = produced.valueSerde() == null ? null : produced.valueSerde().serializer();
         final StreamPartitioner<? super K, ? super V> partitioner = produced.streamPartitioner();
@@ -500,7 +500,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
                                               final String... stateStoreNames) {
         Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
-        String name = builder.newName(TRANSFORM_NAME);
+        String name = builder.newProcessorName(TRANSFORM_NAME);
 
         builder.internalTopologyBuilder.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
         if (stateStoreNames != null && stateStoreNames.length > 0) {
@@ -514,7 +514,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public <V1> KStream<K, V1> transformValues(final ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier,
                                                final String... stateStoreNames) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
-        String name = builder.newName(TRANSFORMVALUES_NAME);
+        String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
 
         builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
         if (stateStoreNames != null && stateStoreNames.length > 0) {
@@ -527,7 +527,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
                         final String... stateStoreNames) {
-        final String name = builder.newName(PROCESSOR_NAME);
+        final String name = builder.newProcessorName(PROCESSOR_NAME);
 
         builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
         if (stateStoreNames != null && stateStoreNames.length > 0) {
@@ -645,9 +645,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         String baseName = topicNamePrefix != null ? topicNamePrefix : name;
 
         String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
-        String sinkName = builder.newName(SINK_NAME);
-        String filterName = builder.newName(FILTER_NAME);
-        String sourceName = builder.newName(SOURCE_NAME);
+        String sinkName = builder.newProcessorName(SINK_NAME);
+        String filterName = builder.newProcessorName(FILTER_NAME);
+        String sourceName = builder.newProcessorName(SOURCE_NAME);
 
         builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
         builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
@@ -753,7 +753,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(joiner, "joiner can't be null");
 
         final KTableValueGetterSupplier<K1, V1> valueGetterSupplier = ((GlobalKTableImpl<K1, V1>) globalTable).valueGetterSupplier();
-        final String name = builder.newName(LEFTJOIN_NAME);
+        final String name = builder.newProcessorName(LEFTJOIN_NAME);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamGlobalKTableJoin<>(valueGetterSupplier, joiner, keyMapper, leftJoin), this.name);
         return new KStreamImpl<>(builder, name, sourceNodes, false);
     }
@@ -767,7 +767,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
-        final String name = builder.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
+        final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name);
         builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).internalStoreName());
         builder.internalTopologyBuilder.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
@@ -886,11 +886,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                    final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
                                                    final JoinWindows windows,
                                                    final Joined<K1, V1, V2> joined) {
-            String thisWindowStreamName = builder.newName(WINDOWED_NAME);
-            String otherWindowStreamName = builder.newName(WINDOWED_NAME);
-            String joinThisName = rightOuter ? builder.newName(OUTERTHIS_NAME) : builder.newName(JOINTHIS_NAME);
-            String joinOtherName = leftOuter ? builder.newName(OUTEROTHER_NAME) : builder.newName(JOINOTHER_NAME);
-            String joinMergeName = builder.newName(MERGE_NAME);
+            String thisWindowStreamName = builder.newProcessorName(WINDOWED_NAME);
+            String otherWindowStreamName = builder.newProcessorName(WINDOWED_NAME);
+            String joinThisName = rightOuter ? builder.newProcessorName(OUTERTHIS_NAME) : builder.newProcessorName(JOINTHIS_NAME);
+            String joinOtherName = leftOuter ? builder.newProcessorName(OUTEROTHER_NAME) : builder.newProcessorName(JOINOTHER_NAME);
+            String joinMergeName = builder.newProcessorName(MERGE_NAME);
 
             final StoreBuilder<WindowStore<K1, V1>> thisWindow =
                 createWindowedStateStore(windows, joined.keySerde(), joined.valueSerde(), joinThisName + "-store");

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index db8de1a..8c79dec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -146,7 +146,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                   final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
                                   final boolean isFilterNot) {
         Objects.requireNonNull(predicate, "predicate can't be null");
-        String name = builder.newName(FILTER_NAME);
+        String name = builder.newProcessorName(FILTER_NAME);
         String internalStoreName = null;
         if (storeSupplier != null) {
             internalStoreName = storeSupplier.name();
@@ -162,7 +162,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
                                   final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized,
                                   final boolean filterNot) {
-        String name = builder.newName(FILTER_NAME);
+        String name = builder.newProcessorName(FILTER_NAME);
 
         KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this,
                                                                                 predicate,
@@ -193,7 +193,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(predicate, "predicate can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        return doFilter(predicate, new MaterializedInternal<>(materialized), false);
+        return doFilter(predicate, new MaterializedInternal<>(materialized, builder, FILTER_NAME), false);
     }
 
     @SuppressWarnings("deprecation")
@@ -225,7 +225,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                   final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(predicate, "predicate can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        return doFilter(predicate, new MaterializedInternal<>(materialized), true);
+        return doFilter(predicate, new MaterializedInternal<>(materialized, builder, FILTER_NAME), true);
     }
 
     @SuppressWarnings("deprecation")
@@ -252,7 +252,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                            final Serde<V1> valueSerde,
                                            final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(mapper);
-        String name = builder.newName(MAPVALUES_NAME);
+        String name = builder.newProcessorName(MAPVALUES_NAME);
         String internalStoreName = null;
         if (storeSupplier != null) {
             internalStoreName = storeSupplier.name();
@@ -278,13 +278,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         Objects.requireNonNull(mapper, "mapper can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized);
-        final String name = builder.newName(MAPVALUES_NAME);
+                = new MaterializedInternal<>(materialized, builder, MAPVALUES_NAME);
+        final String name = builder.newProcessorName(MAPVALUES_NAME);
         final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this,
                                                                                           mapper,
                                                                                           materializedInternal.storeName());
         builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
-        builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
+        builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal)
+                                                              .materialize(),
                                                       name);
         return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true);
     }
@@ -335,7 +336,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                       final Serde<V> valSerde,
                       final String label) {
         Objects.requireNonNull(label, "label can't be null");
-        final String name = builder.newName(PRINTING_NAME);
+        final String name = builder.newProcessorName(PRINTING_NAME);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label)), this.name);
     }
 
@@ -374,7 +375,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         if (filePath.trim().isEmpty()) {
             throw new TopologyException("filePath can't be an empty string");
         }
-        String name = builder.newName(PRINTING_NAME);
+        String name = builder.newProcessorName(PRINTING_NAME);
         try {
             PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
             builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, defaultKeyValueMapper, label)), this.name);
@@ -387,7 +388,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     @Override
     public void foreach(final ForeachAction<? super K, ? super V> action) {
         Objects.requireNonNull(action, "action can't be null");
-        String name = builder.newName(FOREACH_NAME);
+        String name = builder.newProcessorName(FOREACH_NAME);
         KStreamPeek<K, Change<V>> processorSupplier = new KStreamPeek<>(new ForeachAction<K, Change<V>>() {
             @Override
             public void apply(K key, Change<V> value) {
@@ -404,16 +405,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                 final StreamPartitioner<? super K, ? super V> partitioner,
                                 final String topic,
                                 final String queryableStoreName) {
-        final String internalStoreName = queryableStoreName != null ? queryableStoreName : builder.newStoreName(KTableImpl.TOSTREAM_NAME);
-
         to(keySerde, valSerde, partitioner, topic);
 
         return builder.table(topic,
                              new ConsumedInternal<>(keySerde, valSerde, new FailOnInvalidTimestamp(), null),
-                             new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(internalStoreName)
-                                     .withKeySerde(keySerde)
-                                     .withValueSerde(valSerde),
-                                     queryableStoreName != null));
+                             new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(keySerde, valSerde),
+                                     builder,
+                                     KTableImpl.TOSTREAM_NAME));
     }
 
     @SuppressWarnings("deprecation")
@@ -543,7 +541,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @Override
     public KStream<K, V> toStream() {
-        String name = builder.newName(TOSTREAM_NAME);
+        String name = builder.newProcessorName(TOSTREAM_NAME);
 
         builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<K, Change<V>, V>(new ValueMapper<Change<V>, V>() {
             @Override
@@ -573,7 +571,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        return doJoin(other, joiner, new MaterializedInternal<>(materialized), false, false);
+        return doJoin(other, joiner, new MaterializedInternal<>(materialized, builder, MERGE_NAME), false, false);
     }
 
     @SuppressWarnings("deprecation")
@@ -604,7 +602,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
-        return doJoin(other, joiner, new MaterializedInternal<>(materialized), true, true);
+        return doJoin(other, joiner, new MaterializedInternal<>(materialized, builder, MERGE_NAME), true, true);
     }
 
     @SuppressWarnings("deprecation")
@@ -637,7 +635,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
         return doJoin(other,
                       joiner,
-                      new MaterializedInternal<>(materialized),
+                      new MaterializedInternal<>(materialized, builder, MERGE_NAME),
                       true,
                       false);
     }
@@ -684,7 +682,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
-        final String joinMergeName = builder.newName(MERGE_NAME);
+        final String joinMergeName = builder.newProcessorName(MERGE_NAME);
         final String internalQueryableName = storeSupplier == null ? null : storeSupplier.name();
         final KTable<K, R> result = buildJoin((AbstractStream<K>) other,
                                               joiner,
@@ -709,7 +707,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         final String internalQueryableName = materialized == null ? null : materialized.storeName();
-        final String joinMergeName = builder.newName(MERGE_NAME);
+        final String joinMergeName = builder.newProcessorName(MERGE_NAME);
         final KTable<K, VR> result = buildJoin((AbstractStream<K>) other,
                                                joiner,
                                                leftOuter,
@@ -741,8 +739,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
             ((KTableImpl) other).enableSendingOldValues();
         }
 
-        final String joinThisName = builder.newName(JOINTHIS_NAME);
-        final String joinOtherName = builder.newName(JOINOTHER_NAME);
+        final String joinThisName = builder.newProcessorName(JOINTHIS_NAME);
+        final String joinOtherName = builder.newProcessorName(JOINOTHER_NAME);
 
 
         final KTableKTableAbstractJoin<K, R, V, V1> joinThis;
@@ -792,7 +790,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                                   final Serialized<K1, V1> serialized) {
         Objects.requireNonNull(selector, "selector can't be null");
         Objects.requireNonNull(serialized, "serialized can't be null");
-        String selectName = builder.newName(SELECT_NAME);
+        String selectName = builder.newProcessorName(SELECT_NAME);
 
         KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 1d702f2..c8cd35d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -29,10 +29,14 @@ public class KeyValueStoreMaterializer<K, V> {
         this.materialized = materialized;
     }
 
+    /**
+     * @return  StoreBuilder
+     */
     public StoreBuilder<KeyValueStore<K, V>> materialize() {
         KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier();
         if (supplier == null) {
-            supplier = Stores.persistentKeyValueStore(materialized.storeName());
+            final String name = materialized.storeName();
+            supplier = Stores.persistentKeyValueStore(name);
         }
         final StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder(supplier,
                                                                                       materialized.keySerde(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 9f186fd..c933b86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -27,21 +27,24 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
 
     private final boolean queryable;
 
-    public MaterializedInternal(final Materialized<K, V, S> materialized) {
-        this(materialized, true);
-    }
-    
+
     public MaterializedInternal(final Materialized<K, V, S> materialized,
-                                final boolean queryable) {
+                                final InternalNameProvider nameProvider,
+                                final String generatedStorePrefix) {
         super(materialized);
-        this.queryable = queryable;
+        if (storeName() == null) {
+            queryable = false;
+            storeName = nameProvider.newStoreName(generatedStorePrefix);
+        } else {
+            queryable = true;
+        }
     }
 
     public String storeName() {
-        if (storeName != null) {
-            return storeName;
+        if (storeSupplier != null) {
+            return storeSupplier.name();
         }
-        return storeSupplier.name();
+        return storeName;
     }
 
     public StoreSupplier<S> storeSupplier() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 6644c92..34e5bd7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -83,7 +83,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
     public KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized);
+                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
         if (materializedInternal.valueSerde() == null) {
             materialized.withValueSerde(Serdes.Long());
         }
@@ -93,6 +93,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
                          materialized);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                 final Aggregator<? super K, ? super V, T> aggregator,
@@ -100,7 +101,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
-        return doAggregate(initializer, aggregator, sessionMerger, null);
+        return doAggregate(initializer, aggregator, sessionMerger, (Serde<T>) valSerde);
     }
 
     @SuppressWarnings("unchecked")
@@ -113,7 +114,11 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal
+                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
         return (KTable<Windowed<K>, VR>) aggregateBuilder.build(
                 new KStreamSessionWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator, sessionMerger),
                 AGGREGATE_NAME,

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index daba4c3..5e54770 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -73,7 +73,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
     public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized);
+                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
         if (materializedInternal.valueSerde() == null) {
             materialized.withValueSerde(Serdes.Long());
         }
@@ -81,12 +81,13 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
     }
 
 
+    @SuppressWarnings("unchecked")
     @Override
     public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                   final Aggregator<? super K, ? super V, VR> aggregator) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
-        return doAggregate(initializer, aggregator, null);
+        return doAggregate(initializer, aggregator, (Serde<VR>) valSerde);
     }
 
     @SuppressWarnings("unchecked")
@@ -108,7 +109,11 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal
+                = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+        if (materializedInternal.keySerde() == null) {
+            materializedInternal.withKeySerde(keySerde);
+        }
         return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows,
                                                                                              materializedInternal.storeName(),
                                                                                              initializer,
@@ -135,7 +140,8 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal
+                = new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
 
         return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, materializedInternal.storeName(), reducer),
                                                                REDUCE_NAME,

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 350facf..4169eb2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -38,6 +39,7 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Serialized;
@@ -48,6 +50,7 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestUtils;
@@ -308,7 +311,8 @@ public class KStreamAggregationIntegrationTest {
         groupedStream.windowedBy(TimeWindows.of(500L))
                 .aggregate(
                         initializer,
-                        aggregator
+                        aggregator,
+                        Materialized.<String, Integer, WindowStore<Bytes, byte[]>>with(null, Serdes.Integer())
                 )
                 .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 8e1a8a1..e16d8e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -64,7 +64,7 @@ public class AbstractStreamTest {
         }
 
         KStream<K, V> randomFilter() {
-            String name = builder.newName("RANDOM-FILTER-");
+            String name = builder.newProcessorName("RANDOM-FILTER-");
             builder.internalTopologyBuilder.addProcessor(name, new ExtendedKStreamDummy(), this.name);
             return new KStreamImpl<>(builder, name, sourceNodes, false);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 68d0e24..05a0214 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -61,8 +61,9 @@ public class InternalStreamsBuilderTest {
 
     private KStreamTestDriver driver = null;
     private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>();
+    private final String storePrefix = "prefix-";
     private MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
-            = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-store"), false);
+            = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-store"), builder, storePrefix);
 
     @Before
     public void setUp() {
@@ -79,15 +80,15 @@ public class InternalStreamsBuilderTest {
 
     @Test
     public void testNewName() {
-        assertEquals("X-0000000000", builder.newName("X-"));
-        assertEquals("Y-0000000001", builder.newName("Y-"));
-        assertEquals("Z-0000000002", builder.newName("Z-"));
+        assertEquals("X-0000000000", builder.newProcessorName("X-"));
+        assertEquals("Y-0000000001", builder.newProcessorName("Y-"));
+        assertEquals("Z-0000000002", builder.newProcessorName("Z-"));
 
         final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
 
-        assertEquals("X-0000000000", newBuilder.newName("X-"));
-        assertEquals("Y-0000000001", newBuilder.newName("Y-"));
-        assertEquals("Z-0000000002", newBuilder.newName("Z-"));
+        assertEquals("X-0000000000", newBuilder.newProcessorName("X-"));
+        assertEquals("Y-0000000001", newBuilder.newProcessorName("Y-"));
+        assertEquals("Z-0000000002", newBuilder.newProcessorName("Z-"));
     }
 
     @Test
@@ -141,16 +142,18 @@ public class InternalStreamsBuilderTest {
         KTable table1 = builder.table("topic2",
                                       consumed,
                                       new MaterializedInternal<>(
-                                              Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("topic2"),
-                                              false));
+                                              Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(null, null),
+                                              builder,
+                                              storePrefix));
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.build(null);
 
         assertEquals(1, topology.stateStores().size());
-        assertEquals("topic2", topology.stateStores().get(0).name());
+        final String storeName = "prefix-STATE-STORE-0000000000";
+        assertEquals(storeName, topology.stateStores().get(0).name());
 
         assertEquals(1, topology.storeToChangelogTopic().size());
-        assertEquals("topic2", topology.storeToChangelogTopic().get("topic2"));
+        assertEquals("topic2", topology.storeToChangelogTopic().get(storeName));
         assertNull(table1.queryableStoreName());
     }
 
@@ -160,7 +163,8 @@ public class InternalStreamsBuilderTest {
                             consumed,
                             new MaterializedInternal<>(
                                     Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"),
-                                    false));
+                                    builder,
+                                    storePrefix));
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
         final List<StateStore> stateStores = topology.globalStateStores();
@@ -184,11 +188,11 @@ public class InternalStreamsBuilderTest {
         builder.globalTable("table",
                             consumed,
                             new MaterializedInternal<>(
-                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global1")));
+                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global1"), builder, storePrefix));
         builder.globalTable("table2",
                             consumed,
                             new MaterializedInternal<>(
-                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global2")));
+                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global2"), builder, storePrefix));
 
         doBuildGlobalTopologyWithAllGlobalTables();
     }
@@ -201,14 +205,14 @@ public class InternalStreamsBuilderTest {
         final GlobalKTable<String, String> globalTable = builder.globalTable("table",
                                                                              consumed,
                                                                              new MaterializedInternal<>(
-                                                                                     Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(one)));
+                                                                                     Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(one), builder, storePrefix));
         final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2",
                                                                               consumed,
                                                                               new MaterializedInternal<>(
-                                                                                      Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(two)));
+                                                                                      Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(two), builder, storePrefix));
 
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("not-global"), false);
+                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("not-global"), builder, storePrefix);
         builder.table("not-global", consumed, materialized);
 
         final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
@@ -243,7 +247,7 @@ public class InternalStreamsBuilderTest {
         final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed);
 
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("table-store"), false);
+                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("table-store"), builder, storePrefix);
         final KTable<String, String> table = builder.table("table-topic", consumed, materialized);
         assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index 23e0b59..d8b3a5f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -85,9 +85,9 @@ public class KTableForeachTest {
         StreamsBuilder builder = new StreamsBuilder();
         KTable<Integer, String> table = builder.table(topicName,
                                                       Consumed.with(intSerde, stringSerde),
-                                                      new MaterializedInternal<>(Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName)
-                                                                                         .withKeySerde(intSerde)
-                                                                                         .withValueSerde(stringSerde)));
+                                                      Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName)
+                                                              .withKeySerde(intSerde)
+                                                              .withValueSerde(stringSerde));
         table.foreach(action);
 
         // Then

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
new file mode 100644
index 0000000..5fd76f3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(EasyMockRunner.class)
+public class MaterializedInternalTest {
+
+    @Mock(type = MockType.NICE)
+    private InternalNameProvider nameProvider;
+
+    @Mock(type = MockType.NICE)
+    private KeyValueBytesStoreSupplier supplier;
+    private final String prefix = "prefix";
+
+    @Test
+    public void shouldGenerateStoreNameWithPrefixIfProvidedNameIsNull() {
+        final String generatedName = prefix + "-store";
+        EasyMock.expect(nameProvider.newStoreName(prefix)).andReturn(generatedName);
+
+        EasyMock.replay(nameProvider);
+
+        final MaterializedInternal<Object, Object, StateStore> materialized
+                = new MaterializedInternal<>(Materialized.with(null, null), nameProvider, prefix);
+
+        assertThat(materialized.storeName(), equalTo(generatedName));
+        EasyMock.verify(nameProvider);
+    }
+
+    @Test
+    public void shouldUseProvidedStoreNameWhenSet() {
+        final String storeName = "store-name";
+        final MaterializedInternal<Object, Object, StateStore> materialized
+                = new MaterializedInternal<>(Materialized.as(storeName), nameProvider, prefix);
+        assertThat(materialized.storeName(), equalTo(storeName));
+    }
+
+    @Test
+    public void shouldUseStoreNameOfSupplierWhenProvided() {
+        final String storeName = "other-store-name";
+        EasyMock.expect(supplier.name()).andReturn(storeName).anyTimes();
+        EasyMock.replay(supplier);
+        final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized
+                = new MaterializedInternal<>(Materialized.as(supplier), nameProvider, prefix);
+        assertThat(materialized.storeName(), equalTo(storeName));
+    }
+}
\ No newline at end of file