You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/03/19 21:29:56 UTC

[kafka] branch trunk updated: MINOR: Fix generic types in StreamsBuilder and Topology (#8273)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 21cfd0b  MINOR: Fix generic types in StreamsBuilder and Topology (#8273)
21cfd0b is described below

commit 21cfd0b453c878780236f623249b3b5ddec52e6e
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu Mar 19 14:29:15 2020 -0700

    MINOR: Fix generic types in StreamsBuilder and Topology (#8273)
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Guozhang Wang <gu...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../examples/docs/DeveloperGuideTesting.java       |   4 +-
 .../org/apache/kafka/streams/StreamsBuilder.java   |  48 ++++---
 .../java/org/apache/kafka/streams/Topology.java    |  83 ++++++-----
 .../streams/kstream/SessionWindowedSerializer.java |   2 +-
 .../internals/CogroupedStreamAggregateBuilder.java |  31 ++---
 .../internals/GroupedStreamAggregateBuilder.java   |   3 +-
 .../kstream/internals/InternalStreamsBuilder.java  | 111 ++++++++-------
 .../kstream/internals/KStreamAggregate.java        |   7 +-
 .../internals/graph/BaseRepartitionNode.java       |   4 +-
 .../kstream/internals/graph/GlobalStoreNode.java   |  19 +--
 .../GroupedTableOperationRepartitionNode.java      |   6 +-
 .../internals/graph/KTableKTableJoinNode.java      |   6 +-
 .../graph/OptimizableRepartitionNode.java          |   6 +-
 .../internals/graph/ProcessorGraphNode.java        |   2 +-
 .../kstream/internals/graph/StateStoreNode.java    |   8 +-
 .../internals/graph/StatefulProcessorNode.java     |  11 +-
 .../internals/InternalTopologyBuilder.java         | 154 +++++++++++----------
 .../state/internals/MeteredSessionStore.java       |   1 -
 .../org/apache/kafka/streams/KafkaStreamsTest.java |   5 +-
 .../org/apache/kafka/streams/TopologyTest.java     |  44 +++---
 .../integration/GlobalThreadShutDownOrderTest.java |   2 +-
 .../integration/RegexSourceIntegrationTest.java    |   3 +-
 .../internals/graph/GraphGraceSearchUtilTest.java  |  17 ++-
 .../processor/internals/ProcessorTopologyTest.java |   2 +-
 .../state/internals/CachingKeyValueStoreTest.java  |   4 +-
 .../internals/InMemoryKeyValueLoggedStoreTest.java |   7 +-
 .../state/internals/InMemoryKeyValueStoreTest.java |   7 +-
 .../state/internals/InMemoryLRUCacheStoreTest.java |   9 +-
 .../InMemoryTimeOrderedKeyValueBufferTest.java     |  10 +-
 .../state/internals/RocksDBKeyValueStoreTest.java  |   7 +-
 .../kafka/streams/scala/StreamsBuilder.scala       |   8 +-
 .../kafka/streams/MockProcessorContextTest.java    |   4 +-
 .../kafka/streams/TopologyTestDriverTest.java      |   3 +-
 33 files changed, 325 insertions(+), 313 deletions(-)

diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index e380f20..28fccfa 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -145,14 +145,14 @@ public class DeveloperGuideTesting {
         assertThat(outputTopic.isEmpty(), is(true));
     }
 
-    public class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
+    public static class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
         @Override
         public Processor<String, Long> get() {
             return new CustomMaxAggregator();
         }
     }
 
-    public class CustomMaxAggregator implements Processor<String, Long> {
+    public static class CustomMaxAggregator implements Processor<String, Long> {
         ProcessorContext context;
         private KeyValueStore<String, Long> store;
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 173ca67..997fe64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -474,7 +474,7 @@ public class StreamsBuilder {
      * @return itself
      * @throws TopologyException if state store supplier is already added
      */
-    public synchronized StreamsBuilder addStateStore(final StoreBuilder builder) {
+    public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder) {
         Objects.requireNonNull(builder, "builder can't be null");
         internalStreamsBuilder.addStateStore(builder);
         return this;
@@ -483,22 +483,23 @@ public class StreamsBuilder {
     /**
      * @deprecated use {@link #addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)} instead
      */
-    @SuppressWarnings("unchecked")
     @Deprecated
-    public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder,
-                                                      final String topic,
-                                                      final String sourceName,
-                                                      final Consumed consumed,
-                                                      final String processorName,
-                                                      final ProcessorSupplier stateUpdateSupplier) {
+    public synchronized <K, V>  StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                              final String topic,
+                                                              final String sourceName,
+                                                              final Consumed<K, V> consumed,
+                                                              final String processorName,
+                                                              final ProcessorSupplier<K, V> stateUpdateSupplier) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
-        internalStreamsBuilder.addGlobalStore(storeBuilder,
-                                              sourceName,
-                                              topic,
-                                              new ConsumedInternal<>(consumed),
-                                              processorName,
-                                              stateUpdateSupplier);
+        internalStreamsBuilder.addGlobalStore(
+            storeBuilder,
+            sourceName,
+            topic,
+            new ConsumedInternal<>(consumed),
+            processorName,
+            stateUpdateSupplier
+        );
         return this;
     }
 
@@ -527,17 +528,18 @@ public class StreamsBuilder {
      * @return itself
      * @throws TopologyException if the processor of state is already registered
      */
-    @SuppressWarnings("unchecked")
-    public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder,
-                                                      final String topic,
-                                                      final Consumed consumed,
-                                                      final ProcessorSupplier stateUpdateSupplier) {
+    public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                             final String topic,
+                                                             final Consumed<K, V> consumed,
+                                                             final ProcessorSupplier<K, V> stateUpdateSupplier) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
-        internalStreamsBuilder.addGlobalStore(storeBuilder,
-                topic,
-                new ConsumedInternal<>(consumed),
-                stateUpdateSupplier);
+        internalStreamsBuilder.addGlobalStore(
+            storeBuilder,
+            topic,
+            new ConsumedInternal<>(consumed),
+            stateUpdateSupplier
+        );
         return this;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index d13e4a8..cf1d3fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -252,8 +252,8 @@ public class Topology {
      * @throws TopologyException if processor is already added or if topics have already been registered by another source
      */
     public synchronized Topology addSource(final String name,
-                                           final Deserializer keyDeserializer,
-                                           final Deserializer valueDeserializer,
+                                           final Deserializer<?> keyDeserializer,
+                                           final Deserializer<?> valueDeserializer,
                                            final String... topics) {
         internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valueDeserializer, topics);
         return this;
@@ -278,8 +278,8 @@ public class Topology {
      * @throws TopologyException if processor is already added or if topics have already been registered by name
      */
     public synchronized Topology addSource(final String name,
-                                           final Deserializer keyDeserializer,
-                                           final Deserializer valueDeserializer,
+                                           final Deserializer<?> keyDeserializer,
+                                           final Deserializer<?> valueDeserializer,
                                            final Pattern topicPattern) {
         internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valueDeserializer, topicPattern);
         return this;
@@ -307,8 +307,8 @@ public class Topology {
     @SuppressWarnings("overloads")
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
-                                           final Deserializer keyDeserializer,
-                                           final Deserializer valueDeserializer,
+                                           final Deserializer<?> keyDeserializer,
+                                           final Deserializer<?> valueDeserializer,
                                            final String... topics) {
         internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topics);
         return this;
@@ -335,8 +335,8 @@ public class Topology {
      */
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
-                                           final Deserializer keyDeserializer,
-                                           final Deserializer valueDeserializer,
+                                           final Deserializer<?> keyDeserializer,
+                                           final Deserializer<?> valueDeserializer,
                                            final Pattern topicPattern) {
         internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topicPattern);
         return this;
@@ -364,8 +364,8 @@ public class Topology {
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
                                            final TimestampExtractor timestampExtractor,
-                                           final Deserializer keyDeserializer,
-                                           final Deserializer valueDeserializer,
+                                           final Deserializer<?> keyDeserializer,
+                                           final Deserializer<?> valueDeserializer,
                                            final String... topics) {
         internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topics);
         return this;
@@ -396,8 +396,8 @@ public class Topology {
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
                                            final TimestampExtractor timestampExtractor,
-                                           final Deserializer keyDeserializer,
-                                           final Deserializer valueDeserializer,
+                                           final Deserializer<?> keyDeserializer,
+                                           final Deserializer<?> valueDeserializer,
                                            final Pattern topicPattern) {
         internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
         return this;
@@ -651,6 +651,7 @@ public class Topology {
      * @return itself
      * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
      */
+    @SuppressWarnings("rawtypes")
     public synchronized Topology addProcessor(final String name,
                                               final ProcessorSupplier supplier,
                                               final String... parentNames) {
@@ -666,7 +667,7 @@ public class Topology {
      * @return itself
      * @throws TopologyException if state store supplier is already added
      */
-    public synchronized Topology addStateStore(final StoreBuilder storeBuilder,
+    public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
                                                final String... processorNames) {
         internalTopologyBuilder.addStateStore(storeBuilder, processorNames);
         return this;
@@ -695,16 +696,23 @@ public class Topology {
      * @return itself
      * @throws TopologyException if the processor of state is already registered
      */
-    @SuppressWarnings("unchecked")
-    public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
-                                                final String sourceName,
-                                                final Deserializer keyDeserializer,
-                                                final Deserializer valueDeserializer,
-                                                final String topic,
-                                                final String processorName,
-                                                final ProcessorSupplier stateUpdateSupplier) {
-        internalTopologyBuilder.addGlobalStore(storeBuilder, sourceName, null, keyDeserializer,
-            valueDeserializer, topic, processorName, stateUpdateSupplier);
+    public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                       final String sourceName,
+                                                       final Deserializer<K> keyDeserializer,
+                                                       final Deserializer<V> valueDeserializer,
+                                                       final String topic,
+                                                       final String processorName,
+                                                       final ProcessorSupplier<K, V> stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(
+            storeBuilder,
+            sourceName,
+            null,
+            keyDeserializer,
+            valueDeserializer,
+            topic,
+            processorName,
+            stateUpdateSupplier
+        );
         return this;
     }
 
@@ -732,17 +740,24 @@ public class Topology {
      * @return itself
      * @throws TopologyException if the processor of state is already registered
      */
-    @SuppressWarnings("unchecked")
-    public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
-                                                final String sourceName,
-                                                final TimestampExtractor timestampExtractor,
-                                                final Deserializer keyDeserializer,
-                                                final Deserializer valueDeserializer,
-                                                final String topic,
-                                                final String processorName,
-                                                final ProcessorSupplier stateUpdateSupplier) {
-        internalTopologyBuilder.addGlobalStore(storeBuilder, sourceName, timestampExtractor, keyDeserializer,
-            valueDeserializer, topic, processorName, stateUpdateSupplier);
+    public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                       final String sourceName,
+                                                       final TimestampExtractor timestampExtractor,
+                                                       final Deserializer<K> keyDeserializer,
+                                                       final Deserializer<V> valueDeserializer,
+                                                       final String topic,
+                                                       final String processorName,
+                                                       final ProcessorSupplier<K, V> stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(
+            storeBuilder,
+            sourceName,
+            timestampExtractor,
+            keyDeserializer,
+            valueDeserializer,
+            topic,
+            processorName,
+            stateUpdateSupplier
+        );
         return this;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
index 00491c7..a1f4876 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
@@ -50,7 +50,7 @@ public class SessionWindowedSerializer<T> implements WindowedSerializer<T> {
             final String propertyName = isKey ? StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS : StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS;
             final String value = (String) configs.get(propertyName);
             try {
-                inner = Serde.class.cast(Utils.newInstance(value, Serde.class)).serializer();
+                inner = (Utils.newInstance(value, Serde.class)).serializer();
                 inner.configure(configs, isKey);
             } catch (final ClassNotFoundException e) {
                 throw new ConfigException(propertyName, value, "Serde class " + value + " could not be found.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index f4e45467..81f2d64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -38,7 +38,6 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 class CogroupedStreamAggregateBuilder<K, VOut> {
@@ -52,7 +51,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
     <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
                                                        final Initializer<VOut> initializer,
                                                        final NamedInternal named,
-                                                       final StoreBuilder<? extends StateStore> storeBuilder,
+                                                       final StoreBuilder<?> storeBuilder,
                                                        final Serde<KR> keySerde,
                                                        final Serde<VOut> valSerde,
                                                        final String queryableName,
@@ -89,8 +88,8 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
         final Collection<StreamsGraphNode> processors = new ArrayList<>();
         boolean stateCreated = false;
         int counter = 0;
-        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode(
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
                 kGroupedStream.getValue(),
                 initializer,
                 named.suffixWithOrElseGet(
@@ -127,14 +126,14 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
             builder);
     }
 
-    private <W extends Window> StatefulProcessorNode getStatefulProcessorNode(final Aggregator<? super K, ? super Object, VOut> aggregator,
-                                                                              final Initializer<VOut> initializer,
-                                                                              final String processorName,
-                                                                              final boolean stateCreated,
-                                                                              final StoreBuilder<? extends StateStore> storeBuilder,
-                                                                              final Windows<W> windows,
-                                                                              final SessionWindows sessionWindows,
-                                                                              final Merger<? super K, VOut> sessionMerger) {
+    private <W extends Window> StatefulProcessorNode<K, ?> getStatefulProcessorNode(final Aggregator<? super K, Object, VOut> aggregator,
+                                                                                    final Initializer<VOut> initializer,
+                                                                                    final String processorName,
+                                                                                    final boolean stateCreated,
+                                                                                    final StoreBuilder<?> storeBuilder,
+                                                                                    final Windows<W> windows,
+                                                                                    final SessionWindows sessionWindows,
+                                                                                    final Merger<? super K, VOut> sessionMerger) {
 
         final ProcessorSupplier<K, ?> kStreamAggregate;
 
@@ -164,19 +163,17 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                             new String[]{storeBuilder.name()}
                     );
         }
+
         return statefulProcessorNode;
     }
 
-    /**
-     * @return the new sourceName of the repartitioned source
-     */
     @SuppressWarnings("unchecked")
-    private <VIn> String createRepartitionSource(final String repartitionTopicNamePrefix,
+    private <VIn> void createRepartitionSource(final String repartitionTopicNamePrefix,
                                                  final OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder,
                                                  final Serde<K> keySerde,
                                                  final Serde<?> valueSerde) {
 
-        return KStreamImpl.createRepartitionedSource(builder,
+        KStreamImpl.createRepartitionedSource(builder,
                 keySerde,
                 (Serde<VIn>) valueSerde,
                 repartitionTopicNamePrefix,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 13e7218..3f3c199 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Collections;
@@ -68,7 +67,7 @@ class GroupedStreamAggregateBuilder<K, V> {
     }
 
     <KR, VR> KTable<KR, VR> build(final NamedInternal functionName,
-                                  final StoreBuilder<? extends StateStore> storeBuilder,
+                                  final StoreBuilder<?> storeBuilder,
                                   final KStreamAggProcessorSupplier<K, KR, V, VR> aggregateSupplier,
                                   final String queryableStoreName,
                                   final Serde<KR> keySerde,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 9509431..a374c42 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -64,7 +64,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     private final AtomicInteger index = new AtomicInteger(0);
 
     private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
-    private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap<>();
+    private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap<>();
     private final LinkedHashSet<StreamsGraphNode> mergeNodes = new LinkedHashSet<>();
     private final LinkedHashSet<StreamsGraphNode> tableSourceNodes = new LinkedHashSet<>();
 
@@ -197,41 +197,45 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
     }
 
-    public synchronized void addStateStore(final StoreBuilder builder) {
-        addGraphNode(root, new StateStoreNode(builder));
+    public synchronized void addStateStore(final StoreBuilder<?> builder) {
+        addGraphNode(root, new StateStoreNode<>(builder));
     }
 
-    public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
-                                            final String sourceName,
-                                            final String topic,
-                                            final ConsumedInternal consumed,
-                                            final String processorName,
-                                            final ProcessorSupplier stateUpdateSupplier) {
-
-        final StreamsGraphNode globalStoreNode = new GlobalStoreNode(storeBuilder,
-                                                                     sourceName,
-                                                                     topic,
-                                                                     consumed,
-                                                                     processorName,
-                                                                     stateUpdateSupplier);
+    public synchronized <K, V> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                   final String sourceName,
+                                                   final String topic,
+                                                   final ConsumedInternal<K, V> consumed,
+                                                   final String processorName,
+                                                   final ProcessorSupplier<K, V> stateUpdateSupplier) {
+
+        final StreamsGraphNode globalStoreNode = new GlobalStoreNode<>(
+            storeBuilder,
+            sourceName,
+            topic,
+            consumed,
+            processorName,
+            stateUpdateSupplier
+        );
 
         addGraphNode(root, globalStoreNode);
     }
 
-    public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
-                                            final String topic,
-                                            final ConsumedInternal consumed,
-                                            final ProcessorSupplier stateUpdateSupplier) {
+    public synchronized <K, V> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                   final String topic,
+                                                   final ConsumedInternal<K, V> consumed,
+                                                   final ProcessorSupplier<K, V> stateUpdateSupplier) {
         // explicitly disable logging for global stores
         storeBuilder.withLoggingDisabled();
         final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
         final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
-        addGlobalStore(storeBuilder,
-                       sourceName,
-                       topic,
-                       consumed,
-                       processorName,
-                       stateUpdateSupplier);
+        addGlobalStore(
+            storeBuilder,
+            sourceName,
+            topic,
+            consumed,
+            processorName,
+            stateUpdateSupplier
+        );
     }
 
     void addGraphNode(final StreamsGraphNode parent,
@@ -242,7 +246,6 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         maybeAddNodeForOptimizationMetadata(child);
     }
 
-
     void addGraphNode(final Collection<StreamsGraphNode> parents,
                       final StreamsGraphNode child) {
         Objects.requireNonNull(parents, "parent node can't be null");
@@ -271,7 +274,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         } else if (node instanceof OptimizableRepartitionNode) {
             final StreamsGraphNode parentNode = getKeyChangingParentNode(node);
             if (parentNode != null) {
-                keyChangingOperationsToOptimizableRepartitionNodes.get(parentNode).add((OptimizableRepartitionNode) node);
+                keyChangingOperationsToOptimizableRepartitionNodes.get(parentNode).add((OptimizableRepartitionNode<?, ?>) node);
             }
         } else if (node.isMergeNode()) {
             mergeNodes.add(node);
@@ -322,16 +325,16 @@ public class InternalStreamsBuilder implements InternalNameProvider {
 
     private void optimizeKTableSourceTopics() {
         LOG.debug("Marking KTable source nodes to optimize using source topic for changelogs ");
-        tableSourceNodes.forEach(node -> ((TableSourceNode) node).reuseSourceTopicForChangeLog(true));
+        tableSourceNodes.forEach(node -> ((TableSourceNode<?, ?>) node).reuseSourceTopicForChangeLog(true));
     }
 
-    @SuppressWarnings("unchecked")
     private void maybeOptimizeRepartitionOperations() {
         maybeUpdateKeyChangingRepartitionNodeMap();
-        final Iterator<Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>> entryIterator =  keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
+        final Iterator<Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>>> entryIterator =
+            keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
 
         while (entryIterator.hasNext()) {
-            final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry = entryIterator.next();
+            final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>> entry = entryIterator.next();
 
             final StreamsGraphNode keyChangingNode = entry.getKey();
 
@@ -339,7 +342,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                 continue;
             }
 
-            final GroupedInternal groupedInternal = new GroupedInternal(getRepartitionSerdes(entry.getValue()));
+            final GroupedInternal<?, ?> groupedInternal = new GroupedInternal<>(getRepartitionSerdes(entry.getValue()));
 
             final String repartitionTopicName = getFirstRepartitionTopicName(entry.getValue());
             //passing in the name of the first repartition topic, re-used to create the optimized repartition topic
@@ -350,7 +353,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
             // re-use parent buildPriority to make sure the single repartition graph node is evaluated before downstream nodes
             optimizedSingleRepartition.setBuildPriority(keyChangingNode.buildPriority());
 
-            for (final OptimizableRepartitionNode repartitionNodeToBeReplaced : entry.getValue()) {
+            for (final OptimizableRepartitionNode<?, ?> repartitionNodeToBeReplaced : entry.getValue()) {
 
                 final StreamsGraphNode keyChangingNodeChild = findParentNodeMatching(repartitionNodeToBeReplaced, gn -> gn.parentNodes().contains(keyChangingNode));
 
@@ -408,7 +411,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry : mergeNodesToKeyChangers.entrySet()) {
             final StreamsGraphNode mergeKey = entry.getKey();
             final Collection<StreamsGraphNode> keyChangingParents = entry.getValue();
-            final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes = new LinkedHashSet<>();
+            final LinkedHashSet<OptimizableRepartitionNode<?, ?>> repartitionNodes = new LinkedHashSet<>();
             for (final StreamsGraphNode keyChangingParent : keyChangingParents) {
                 repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
                 mergeNodeKeyChangingParentsToRemove.add(keyChangingParent);
@@ -421,17 +424,19 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private OptimizableRepartitionNode createRepartitionNode(final String repartitionTopicName,
-                                                             final Serde keySerde,
-                                                             final Serde valueSerde) {
+    private <K, V> OptimizableRepartitionNode<K, V> createRepartitionNode(final String repartitionTopicName,
+                                                                          final Serde<K> keySerde,
+                                                                          final Serde<V> valueSerde) {
 
-        final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
-        KStreamImpl.createRepartitionedSource(this,
-                                              keySerde,
-                                              valueSerde,
-                                              repartitionTopicName,
-                                              repartitionNodeBuilder);
+        final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder =
+            OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+        KStreamImpl.createRepartitionedSource(
+                this,
+                keySerde,
+                valueSerde,
+                repartitionTopicName,
+                repartitionNodeBuilder
+        );
 
         // ensures setting the repartition topic to the name of the
         // first repartition topic to get merged
@@ -452,22 +457,22 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         return null;
     }
 
-    private String getFirstRepartitionTopicName(final Collection<OptimizableRepartitionNode> repartitionNodes) {
+    private String getFirstRepartitionTopicName(final Collection<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
         return repartitionNodes.iterator().next().repartitionTopic();
     }
 
     @SuppressWarnings("unchecked")
-    private GroupedInternal getRepartitionSerdes(final Collection<OptimizableRepartitionNode> repartitionNodes) {
-        Serde keySerde = null;
-        Serde valueSerde = null;
+    private <K, V> GroupedInternal<K, V> getRepartitionSerdes(final Collection<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
+        Serde<K> keySerde = null;
+        Serde<V> valueSerde = null;
 
-        for (final OptimizableRepartitionNode repartitionNode : repartitionNodes) {
+        for (final OptimizableRepartitionNode<?, ?> repartitionNode : repartitionNodes) {
             if (keySerde == null && repartitionNode.keySerde() != null) {
-                keySerde = repartitionNode.keySerde();
+                keySerde = (Serde<K>) repartitionNode.keySerde();
             }
 
             if (valueSerde == null && repartitionNode.valueSerde() != null) {
-                valueSerde = repartitionNode.valueSerde();
+                valueSerde = (Serde<V>) repartitionNode.valueSerde();
             }
 
             if (keySerde != null && valueSerde != null) {
@@ -475,7 +480,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
             }
         }
 
-        return new GroupedInternal(Grouped.with(keySerde, valueSerde));
+        return new GroupedInternal<>(Grouped.with(keySerde, valueSerde));
     }
 
     private StreamsGraphNode findParentNodeMatching(final StreamsGraphNode startSeekingNode,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 252d6e8..3c3bdd0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -58,7 +58,6 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
 
     private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
         private TimestampedKeyValueStore<K, T> store;
-        private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
         private TimestampedTupleForwarder<K, T> tupleForwarder;
 
@@ -66,8 +65,10 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            metrics = (StreamsMetricsImpl) context.metrics();
-            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+            droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(
+                Thread.currentThread().getName(),
+                context.taskId().toString(),
+                (StreamsMetricsImpl) context.metrics());
             store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
             tupleForwarder = new TimestampedTupleForwarder<>(
                 store,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
index 2cc1539..0fa61f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
@@ -29,12 +29,12 @@ public abstract class BaseRepartitionNode<K, V> extends StreamsGraphNode {
     protected final String sinkName;
     protected final String sourceName;
     protected final String repartitionTopic;
-    protected final ProcessorParameters processorParameters;
+    protected final ProcessorParameters<K, V> processorParameters;
     protected final StreamPartitioner<K, V> partitioner;
 
     BaseRepartitionNode(final String nodeName,
                         final String sourceName,
-                        final ProcessorParameters processorParameters,
+                        final ProcessorParameters<K, V> processorParameters,
                         final Serde<K> keySerde,
                         final Serde<V> valueSerde,
                         final String sinkName,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
index a844de6..3e077f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
@@ -14,31 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 
-public class GlobalStoreNode extends StateStoreNode {
-
+public class GlobalStoreNode<K, V, S extends StateStore> extends StateStoreNode<S> {
 
     private final String sourceName;
     private final String topic;
-    private final ConsumedInternal consumed;
+    private final ConsumedInternal<K, V> consumed;
     private final String processorName;
-    private final ProcessorSupplier stateUpdateSupplier;
+    private final ProcessorSupplier<K, V> stateUpdateSupplier;
 
 
-    public GlobalStoreNode(final StoreBuilder<KeyValueStore> storeBuilder,
+    public GlobalStoreNode(final StoreBuilder<S> storeBuilder,
                            final String sourceName,
                            final String topic,
-                           final ConsumedInternal consumed,
+                           final ConsumedInternal<K, V> consumed,
                            final String processorName,
-                           final ProcessorSupplier stateUpdateSupplier) {
+                           final ProcessorSupplier<K, V> stateUpdateSupplier) {
 
         super(storeBuilder);
         this.sourceName = sourceName;
@@ -48,9 +46,7 @@ public class GlobalStoreNode extends StateStoreNode {
         this.stateUpdateSupplier = stateUpdateSupplier;
     }
 
-
     @Override
-    @SuppressWarnings("unchecked")
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         storeBuilder.withLoggingDisabled();
         topologyBuilder.addGlobalStore(storeBuilder,
@@ -64,7 +60,6 @@ public class GlobalStoreNode extends StateStoreNode {
 
     }
 
-
     @Override
     public String toString() {
         return "GlobalStoreNode{" +
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
index a3f79c5..2617f46 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
@@ -34,7 +34,7 @@ public class GroupedTableOperationRepartitionNode<K, V> extends BaseRepartitionN
                                                  final String sinkName,
                                                  final String sourceName,
                                                  final String repartitionTopic,
-                                                 final ProcessorParameters processorParameters) {
+                                                 final ProcessorParameters<K, V> processorParameters) {
 
         super(
             nodeName,
@@ -115,7 +115,7 @@ public class GroupedTableOperationRepartitionNode<K, V> extends BaseRepartitionN
         private String nodeName;
         private String sourceName;
         private String repartitionTopic;
-        private ProcessorParameters processorParameters;
+        private ProcessorParameters<K, V> processorParameters;
 
         private GroupedTableOperationRepartitionNodeBuilder() {
         }
@@ -150,7 +150,7 @@ public class GroupedTableOperationRepartitionNode<K, V> extends BaseRepartitionN
             return this;
         }
 
-        public GroupedTableOperationRepartitionNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters processorParameters) {
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
             this.processorParameters = processorParameters;
             return this;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 542726b..1879bb7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -82,7 +82,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
     }
 
     public String queryableStoreName() {
-        return ((KTableKTableJoinMerger) mergeProcessorParameters().processorSupplier()).getQueryableName();
+        return ((KTableKTableJoinMerger<K, VR>) mergeProcessorParameters().processorSupplier()).getQueryableName();
     }
 
     /**
@@ -212,8 +212,8 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
                 joinOtherProcessorParameters,
                 new ProcessorParameters<>(
                     KTableKTableJoinMerger.of(
-                        (KTableProcessorSupplier<K, V1, VR>) (joinThisProcessorParameters.processorSupplier()),
-                        (KTableProcessorSupplier<K, V2, VR>) (joinOtherProcessorParameters.processorSupplier()),
+                        (KTableProcessorSupplier<K, V1, VR>) joinThisProcessorParameters.processorSupplier(),
+                        (KTableProcessorSupplier<K, V2, VR>) joinOtherProcessorParameters.processorSupplier(),
                         queryableStoreName),
                     nodeName),
                 thisJoinSide,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
index e3cf2b8..727b6aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
@@ -29,7 +29,7 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, V>
 
     OptimizableRepartitionNode(final String nodeName,
                                final String sourceName,
-                               final ProcessorParameters processorParameters,
+                               final ProcessorParameters<K, V> processorParameters,
                                final Serde<K> keySerde,
                                final Serde<V> valueSerde,
                                final String sinkName,
@@ -116,7 +116,7 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, V>
     public static final class OptimizableRepartitionNodeBuilder<K, V> {
 
         private String nodeName;
-        private ProcessorParameters processorParameters;
+        private ProcessorParameters<K, V> processorParameters;
         private Serde<K> keySerde;
         private Serde<V> valueSerde;
         private String sinkName;
@@ -127,7 +127,7 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, V>
         private OptimizableRepartitionNodeBuilder() {
         }
 
-        public OptimizableRepartitionNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters processorParameters) {
+        public OptimizableRepartitionNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
             this.processorParameters = processorParameters;
             return this;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
index 5c75a09..6eea0c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
@@ -43,7 +43,7 @@ public class ProcessorGraphNode<K, V> extends StreamsGraphNode {
         this.processorParameters = processorParameters;
     }
 
-    public ProcessorParameters processorParameters() {
+    public ProcessorParameters<K, V> processorParameters() {
         return processorParameters;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
index ea42cec..239ed9f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
@@ -14,17 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream.internals.graph;
 
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.StoreBuilder;
 
-public class StateStoreNode extends StreamsGraphNode {
+public class StateStoreNode<S extends StateStore> extends StreamsGraphNode {
 
-    protected final StoreBuilder storeBuilder;
+    protected final StoreBuilder<S> storeBuilder;
 
-    public StateStoreNode(final StoreBuilder storeBuilder) {
+    public StateStoreNode(final StoreBuilder<S> storeBuilder) {
         super(storeBuilder.name());
 
         this.storeBuilder = storeBuilder;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
index 6ed2917..6acd854 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
@@ -14,13 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream.internals.graph;
 
-
 import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.StoreBuilder;
 
@@ -31,13 +28,13 @@ import java.util.stream.Stream;
 public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
 
     private final String[] storeNames;
-    private final StoreBuilder<? extends StateStore> storeBuilder;
+    private final StoreBuilder<?> storeBuilder;
 
     /**
      * Create a node representing a stateful processor, where the named stores have already been registered.
      */
     public StatefulProcessorNode(final ProcessorParameters<K, V> processorParameters,
-                                 final Set<StoreBuilder<? extends StateStore>> preRegisteredStores,
+                                 final Set<StoreBuilder<?>> preRegisteredStores,
                                  final Set<KTableValueGetterSupplier<?, ?>> valueGetterSuppliers) {
         super(processorParameters.processorName(), processorParameters);
         final Stream<String> registeredStoreNames = preRegisteredStores.stream().map(StoreBuilder::name);
@@ -65,7 +62,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
      */
     public StatefulProcessorNode(final String nodeName,
                                  final ProcessorParameters<K, V> processorParameters,
-                                 final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder) {
+                                 final StoreBuilder<?> materializedKTableStoreBuilder) {
         super(nodeName, processorParameters);
 
         this.storeNames = null;
@@ -84,7 +81,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
 
         final String processorName = processorParameters().processorName();
-        final ProcessorSupplier processorSupplier = processorParameters().processorSupplier();
+        final ProcessorSupplier<K, V> processorSupplier = processorParameters().processorSupplier();
 
         topologyBuilder.addProcessor(processorName, processorSupplier, parentNodeNames());
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 92b9d33..95881ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -59,9 +59,9 @@ public class InternalTopologyBuilder {
     private static final String[] NO_PREDECESSORS = {};
 
     // node factories in a topological order
-    private final Map<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
+    private final Map<String, NodeFactory<?, ?>> nodeFactories = new LinkedHashMap<>();
 
-    private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();
+    private final Map<String, StateStoreFactory<?>> stateFactories = new HashMap<>();
 
     private final Map<String, StoreBuilder<?>> globalStateBuilders = new LinkedHashMap<>();
 
@@ -124,15 +124,15 @@ public class InternalTopologyBuilder {
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
-    public static class StateStoreFactory {
-        private final StoreBuilder<?> builder;
+    public static class StateStoreFactory<S extends StateStore> {
+        private final StoreBuilder<S> builder;
         private final Set<String> users = new HashSet<>();
 
-        private StateStoreFactory(final StoreBuilder<?> builder) {
+        private StateStoreFactory(final StoreBuilder<S> builder) {
             this.builder = builder;
         }
 
-        public StateStore build() {
+        public S build() {
             return builder.build();
         }
 
@@ -174,7 +174,7 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private static abstract class NodeFactory {
+    private static abstract class NodeFactory<K, V> {
         final String name;
         final String[] predecessors;
 
@@ -184,18 +184,18 @@ public class InternalTopologyBuilder {
             this.predecessors = predecessors;
         }
 
-        public abstract ProcessorNode<?, ?> build();
+        public abstract ProcessorNode<K, V> build();
 
         abstract AbstractNode describe();
     }
 
-    private static class ProcessorNodeFactory extends NodeFactory {
-        private final ProcessorSupplier<?, ?> supplier;
+    private static class ProcessorNodeFactory<K, V> extends NodeFactory<K, V> {
+        private final ProcessorSupplier<K, V> supplier;
         private final Set<String> stateStoreNames = new HashSet<>();
 
         ProcessorNodeFactory(final String name,
                              final String[] predecessors,
-                             final ProcessorSupplier<?, ?> supplier) {
+                             final ProcessorSupplier<K, V> supplier) {
             super(name, predecessors.clone());
             this.supplier = supplier;
         }
@@ -205,7 +205,7 @@ public class InternalTopologyBuilder {
         }
 
         @Override
-        public ProcessorNode<?, ?> build() {
+        public ProcessorNode<K, V> build() {
             return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
         }
 
@@ -219,19 +219,19 @@ public class InternalTopologyBuilder {
     // even if it can be matched by multiple regex patterns. Only used by SourceNodeFactory
     private final Map<String, Pattern> topicToPatterns = new HashMap<>();
 
-    private class SourceNodeFactory extends NodeFactory {
+    private class SourceNodeFactory<K, V> extends NodeFactory<K, V> {
         private final List<String> topics;
         private final Pattern pattern;
-        private final Deserializer<?> keyDeserializer;
-        private final Deserializer<?> valDeserializer;
+        private final Deserializer<K> keyDeserializer;
+        private final Deserializer<V> valDeserializer;
         private final TimestampExtractor timestampExtractor;
 
         private SourceNodeFactory(final String name,
                                   final String[] topics,
                                   final Pattern pattern,
                                   final TimestampExtractor timestampExtractor,
-                                  final Deserializer<?> keyDeserializer,
-                                  final Deserializer<?> valDeserializer) {
+                                  final Deserializer<K> keyDeserializer,
+                                  final Deserializer<V> valDeserializer) {
             super(name, NO_PREDECESSORS);
             this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<>();
             this.pattern = pattern;
@@ -267,7 +267,7 @@ public class InternalTopologyBuilder {
         }
 
         @Override
-        public ProcessorNode<?, ?> build() {
+        public ProcessorNode<K, V> build() {
             final List<String> sourceTopics = nodeToSourceTopics.get(name);
 
             // if it is subscribed via patterns, it is possible that the topic metadata has not been updated
@@ -290,7 +290,7 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private class SinkNodeFactory<K, V> extends NodeFactory {
+    private class SinkNodeFactory<K, V> extends NodeFactory<K, V> {
         private final Serializer<K> keySerializer;
         private final Serializer<V> valSerializer;
         private final StreamPartitioner<? super K, ? super V> partitioner;
@@ -310,9 +310,9 @@ public class InternalTopologyBuilder {
         }
 
         @Override
-        public ProcessorNode<?, ?> build() {
+        public ProcessorNode<K, V> build() {
             if (topicExtractor instanceof StaticTopicNameExtractor) {
-                final String topic = ((StaticTopicNameExtractor<?, ?>) topicExtractor).topicName;
+                final String topic = ((StaticTopicNameExtractor<K, V>) topicExtractor).topicName;
                 if (internalTopicNames.contains(topic)) {
                     // prefix the internal topic name with the application id
                     return new SinkNode<>(name, new StaticTopicNameExtractor<>(decorateTopic(topic)), keySerializer, valSerializer, partitioner);
@@ -325,8 +325,8 @@ public class InternalTopologyBuilder {
         }
 
         @Override
-        Sink describe() {
-            return new Sink(name, topicExtractor);
+        Sink<K, V> describe() {
+            return new Sink<>(name, topicExtractor);
         }
     }
 
@@ -346,7 +346,7 @@ public class InternalTopologyBuilder {
 
         // maybe strip out caching layers
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) == 0L) {
-            for (final StateStoreFactory storeFactory : stateFactories.values()) {
+            for (final StateStoreFactory<?> storeFactory : stateFactories.values()) {
                 storeFactory.builder.withCachingDisabled();
             }
 
@@ -384,7 +384,7 @@ public class InternalTopologyBuilder {
             sourceTopicNames.add(topic);
         }
 
-        nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
+        nodeFactories.put(name, new SourceNodeFactory<>(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
         nodeToSourceTopics.put(name, Arrays.asList(topics));
         nodeGrouper.add(name);
         nodeGroups = null;
@@ -423,7 +423,7 @@ public class InternalTopologyBuilder {
 
         maybeAddToResetList(earliestResetPatterns, latestResetPatterns, offsetReset, topicPattern);
 
-        nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
+        nodeFactories.put(name, new SourceNodeFactory<>(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
         nodeToSourcePatterns.put(name, topicPattern);
         nodeGrouper.add(name);
         nodeGroups = null;
@@ -505,7 +505,7 @@ public class InternalTopologyBuilder {
             }
         }
 
-        nodeFactories.put(name, new ProcessorNodeFactory(name, predecessorNames, supplier));
+        nodeFactories.put(name, new ProcessorNodeFactory<>(name, predecessorNames, supplier));
         nodeGrouper.add(name);
         nodeGrouper.unite(name, predecessorNames);
         nodeGroups = null;
@@ -524,7 +524,7 @@ public class InternalTopologyBuilder {
             throw new TopologyException("StateStore " + storeBuilder.name() + " is already added.");
         }
 
-        stateFactories.put(storeBuilder.name(), new StateStoreFactory(storeBuilder));
+        stateFactories.put(storeBuilder.name(), new StateStoreFactory<>(storeBuilder));
 
         if (processorNames != null) {
             for (final String processorName : processorNames) {
@@ -535,14 +535,14 @@ public class InternalTopologyBuilder {
         nodeGroups = null;
     }
 
-    public final void addGlobalStore(final StoreBuilder<?> storeBuilder,
-                                     final String sourceName,
-                                     final TimestampExtractor timestampExtractor,
-                                     final Deserializer<?> keyDeserializer,
-                                     final Deserializer<?> valueDeserializer,
-                                     final String topic,
-                                     final String processorName,
-                                     final ProcessorSupplier<?, ?> stateUpdateSupplier) {
+    public final <K, V> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                            final String sourceName,
+                                            final TimestampExtractor timestampExtractor,
+                                            final Deserializer<K> keyDeserializer,
+                                            final Deserializer<V> valueDeserializer,
+                                            final String topic,
+                                            final String processorName,
+                                            final ProcessorSupplier<K, V> stateUpdateSupplier) {
         Objects.requireNonNull(storeBuilder, "store builder must not be null");
         validateGlobalStoreArguments(sourceName,
                                      topic,
@@ -555,17 +555,21 @@ public class InternalTopologyBuilder {
         final String[] topics = {topic};
         final String[] predecessors = {sourceName};
 
-        final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName,
+        final ProcessorNodeFactory<K, V> nodeFactory = new ProcessorNodeFactory<>(
+            processorName,
             predecessors,
-            stateUpdateSupplier);
+            stateUpdateSupplier
+        );
 
         globalTopics.add(topic);
-        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName,
+        nodeFactories.put(sourceName, new SourceNodeFactory<>(
+            sourceName,
             topics,
             null,
             timestampExtractor,
             keyDeserializer,
-            valueDeserializer));
+            valueDeserializer)
+        );
         nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
         nodeGrouper.add(sourceName);
         nodeFactory.addStateStore(storeBuilder.name());
@@ -665,7 +669,7 @@ public class InternalTopologyBuilder {
             throw new TopologyException("Processor " + processorName + " is not added yet.");
         }
 
-        final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
+        final StateStoreFactory<?> stateStoreFactory = stateFactories.get(stateStoreName);
         final Iterator<String> iter = stateStoreFactory.users().iterator();
         if (iter.hasNext()) {
             final String user = iter.next();
@@ -673,9 +677,9 @@ public class InternalTopologyBuilder {
         }
         stateStoreFactory.users().add(processorName);
 
-        final NodeFactory nodeFactory = nodeFactories.get(processorName);
+        final NodeFactory<?, ?> nodeFactory = nodeFactories.get(processorName);
         if (nodeFactory instanceof ProcessorNodeFactory) {
-            final ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
+            final ProcessorNodeFactory<?, ?> processorNodeFactory = (ProcessorNodeFactory<?, ?>) nodeFactory;
             processorNodeFactory.addStateStore(stateStoreName);
             connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
         } else {
@@ -683,21 +687,21 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private Set<SourceNodeFactory> findSourcesForProcessorPredecessors(final String[] predecessors) {
-        final Set<SourceNodeFactory> sourceNodes = new HashSet<>();
+    private Set<SourceNodeFactory<?, ?>> findSourcesForProcessorPredecessors(final String[] predecessors) {
+        final Set<SourceNodeFactory<?, ?>> sourceNodes = new HashSet<>();
         for (final String predecessor : predecessors) {
-            final NodeFactory nodeFactory = nodeFactories.get(predecessor);
+            final NodeFactory<?, ?> nodeFactory = nodeFactories.get(predecessor);
             if (nodeFactory instanceof SourceNodeFactory) {
-                sourceNodes.add((SourceNodeFactory) nodeFactory);
+                sourceNodes.add((SourceNodeFactory<?, ?>) nodeFactory);
             } else if (nodeFactory instanceof ProcessorNodeFactory) {
-                sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory) nodeFactory).predecessors));
+                sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory<?, ?>) nodeFactory).predecessors));
             }
         }
         return sourceNodes;
     }
 
-    private void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
-                                                              final ProcessorNodeFactory processorNodeFactory) {
+    private <K, V> void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
+                                                                     final ProcessorNodeFactory<K, V> processorNodeFactory) {
         // we should never update the mapping from state store names to source topics if the store name already exists
         // in the map; this scenario is possible, for example, that a state store underlying a source KTable is
         // connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.
@@ -709,10 +713,10 @@ public class InternalTopologyBuilder {
 
         final Set<String> sourceTopics = new HashSet<>();
         final Set<Pattern> sourcePatterns = new HashSet<>();
-        final Set<SourceNodeFactory> sourceNodesForPredecessor =
+        final Set<SourceNodeFactory<?, ?>> sourceNodesForPredecessor =
             findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
 
-        for (final SourceNodeFactory sourceNodeFactory : sourceNodesForPredecessor) {
+        for (final SourceNodeFactory<?, ?> sourceNodeFactory : sourceNodesForPredecessor) {
             if (sourceNodeFactory.pattern != null) {
                 sourcePatterns.add(sourceNodeFactory.pattern);
             } else {
@@ -849,7 +853,7 @@ public class InternalTopologyBuilder {
 
         // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
         // also make sure the state store map values following the insertion ordering
-        for (final NodeFactory factory : nodeFactories.values()) {
+        for (final NodeFactory<?, ?> factory : nodeFactories.values()) {
             if (nodeGroup == null || nodeGroup.contains(factory.name)) {
                 final ProcessorNode<?, ?> node = factory.build();
                 processorMap.put(node.name(), node);
@@ -857,13 +861,13 @@ public class InternalTopologyBuilder {
                 if (factory instanceof ProcessorNodeFactory) {
                     buildProcessorNode(processorMap,
                                        stateStoreMap,
-                                       (ProcessorNodeFactory) factory,
+                                       (ProcessorNodeFactory<?, ?>) factory,
                                        node);
 
                 } else if (factory instanceof SourceNodeFactory) {
                     buildSourceNode(topicSourceMap,
                                     repartitionTopics,
-                                    (SourceNodeFactory) factory,
+                                    (SourceNodeFactory<?, ?>) factory,
                                     (SourceNode<?, ?>) node);
 
                 } else if (factory instanceof SinkNodeFactory) {
@@ -913,7 +917,7 @@ public class InternalTopologyBuilder {
 
     private void buildSourceNode(final Map<String, SourceNode<?, ?>> topicSourceMap,
                                  final Set<String> repartitionTopics,
-                                 final SourceNodeFactory sourceNodeFactory,
+                                 final SourceNodeFactory<?, ?> sourceNodeFactory,
                                  final SourceNode<?, ?> node) {
 
         final List<String> topics = (sourceNodeFactory.pattern != null) ?
@@ -934,7 +938,7 @@ public class InternalTopologyBuilder {
 
     private void buildProcessorNode(final Map<String, ProcessorNode<?, ?>> processorMap,
                                     final Map<String, StateStore> stateStoreMap,
-                                    final ProcessorNodeFactory factory,
+                                    final ProcessorNodeFactory<?, ?> factory,
                                     final ProcessorNode<?, ?> node) {
 
         for (final String predecessor : factory.predecessors) {
@@ -944,7 +948,7 @@ public class InternalTopologyBuilder {
         for (final String stateStoreName : factory.stateStoreNames) {
             if (!stateStoreMap.containsKey(stateStoreName)) {
                 if (stateFactories.containsKey(stateStoreName)) {
-                    final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
+                    final StateStoreFactory<?> stateStoreFactory = stateFactories.get(stateStoreName);
 
                     // remember the changelog topic if this state store is change-logging enabled
                     if (stateStoreFactory.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
@@ -1033,7 +1037,7 @@ public class InternalTopologyBuilder {
 
                 // if the node is connected to a state store whose changelog topics are not predefined,
                 // add to the changelog topics
-                for (final StateStoreFactory stateFactory : stateFactories.values()) {
+                for (final StateStoreFactory<?> stateFactory : stateFactories.values()) {
                     if (stateFactory.users().contains(node) && storeToChangelogTopic.containsKey(stateFactory.name())) {
                         final String topicName = storeToChangelogTopic.get(stateFactory.name());
                         if (!stateChangelogTopics.containsKey(topicName)) {
@@ -1059,7 +1063,7 @@ public class InternalTopologyBuilder {
     private void setRegexMatchedTopicsToSourceNodes() {
         if (hasSubscriptionUpdates()) {
             for (final String nodeName : nodeToSourcePatterns.keySet()) {
-                final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(nodeName);
+                final SourceNodeFactory<?, ?> sourceNode = (SourceNodeFactory<?, ?>) nodeFactories.get(nodeName);
                 final List<String> sourceTopics = sourceNode.getTopics(subscriptionUpdates);
                 //need to update nodeToSourceTopics and sourceTopicNames with topics matched from given regex
                 nodeToSourceTopics.put(nodeName, sourceTopics);
@@ -1093,8 +1097,8 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory factory,
-                                                           final String name) {
+    private <S extends StateStore> InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory<S> factory,
+                                                                                  final String name) {
         if (factory.isWindowStore()) {
             final WindowedChangelogTopicConfig config = new WindowedChangelogTopicConfig(name, factory.logConfig());
             config.setRetentionMs(factory.retentionPeriod());
@@ -1238,10 +1242,10 @@ public class InternalTopologyBuilder {
     }
 
     private boolean isGlobalSource(final String nodeName) {
-        final NodeFactory nodeFactory = nodeFactories.get(nodeName);
+        final NodeFactory<?, ?> nodeFactory = nodeFactories.get(nodeName);
 
         if (nodeFactory instanceof SourceNodeFactory) {
-            final List<String> topics = ((SourceNodeFactory) nodeFactory).topics;
+            final List<String> topics = ((SourceNodeFactory<?, ?>) nodeFactory).topics;
             return topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0));
         }
         return false;
@@ -1280,7 +1284,7 @@ public class InternalTopologyBuilder {
                 description.addGlobalStore(new GlobalStore(
                     node,
                     processorNode,
-                    ((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
+                    ((ProcessorNodeFactory<?, ?>) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
                     nodeToSourceTopics.get(node).get(0),
                     id
                 ));
@@ -1513,8 +1517,9 @@ public class InternalTopologyBuilder {
             // omit successor to avoid infinite loops
             return name.equals(source.name)
                 && Objects.equals(topics, source.topics)
-                && (topicPattern == null ? source.topicPattern == null :
-                    topicPattern.pattern().equals(source.topicPattern.pattern()));
+                && (topicPattern == null ?
+                        source.topicPattern == null :
+                        topicPattern.pattern().equals(source.topicPattern.pattern()));
         }
 
         @Override
@@ -1567,11 +1572,11 @@ public class InternalTopologyBuilder {
         }
     }
 
-    public final static class Sink extends AbstractNode implements TopologyDescription.Sink {
-        private final TopicNameExtractor<?, ?> topicNameExtractor;
+    public final static class Sink<K, V> extends AbstractNode implements TopologyDescription.Sink {
+        private final TopicNameExtractor<K, V> topicNameExtractor;
 
         public Sink(final String name,
-                    final TopicNameExtractor<?, ?> topicNameExtractor) {
+                    final TopicNameExtractor<K, V> topicNameExtractor) {
             super(name);
             this.topicNameExtractor = topicNameExtractor;
         }
@@ -1585,14 +1590,14 @@ public class InternalTopologyBuilder {
         @Override
         public String topic() {
             if (topicNameExtractor instanceof StaticTopicNameExtractor) {
-                return ((StaticTopicNameExtractor<?, ?>) topicNameExtractor).topicName;
+                return ((StaticTopicNameExtractor<K, V>) topicNameExtractor).topicName;
             } else {
                 return null;
             }
         }
 
         @Override
-        public TopicNameExtractor<?, ?> topicNameExtractor() {
+        public TopicNameExtractor<K, V> topicNameExtractor() {
             if (topicNameExtractor instanceof StaticTopicNameExtractor) {
                 return null;
             } else {
@@ -1614,6 +1619,7 @@ public class InternalTopologyBuilder {
                 + nodeNames(predecessors);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
         public boolean equals(final Object o) {
             if (this == o) {
@@ -1623,7 +1629,7 @@ public class InternalTopologyBuilder {
                 return false;
             }
 
-            final Sink sink = (Sink) o;
+            final Sink<K, V> sink = (Sink<K, V>) o;
             return name.equals(sink.name)
                 && topicNameExtractor.equals(sink.topicNameExtractor)
                 && predecessors.equals(sink.predecessors);
@@ -1921,7 +1927,7 @@ public class InternalTopologyBuilder {
         return sourceTopicNames;
     }
 
-    public synchronized Map<String, StateStoreFactory> stateStores() {
+    public synchronized Map<String, StateStoreFactory<?>> stateStores() {
         return stateFactories;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 178704f..3e541e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -69,7 +69,6 @@ public class MeteredSessionStore<K, V>
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        //noinspection unchecked
         serdes = new StateSerdes<>(
             ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
             keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 33cb6b9..7087085 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -915,13 +915,14 @@ public class KafkaStreamsTest {
         final StoreBuilder<KeyValueStore<String, String>> globalStoreBuilder = Stores.keyValueStoreBuilder(
             isPersistentStore ? Stores.persistentKeyValueStore(globalStoreName) : Stores.inMemoryKeyValueStore(globalStoreName),
             Serdes.String(), Serdes.String()).withLoggingDisabled();
-        topology.addGlobalStore(globalStoreBuilder,
+        topology.addGlobalStore(
+            globalStoreBuilder,
             "global",
             Serdes.String().deserializer(),
             Serdes.String().deserializer(),
             globalTopicName,
             globalTopicName + "-processor",
-            new MockProcessorSupplier<byte[], byte[]>());
+            new MockProcessorSupplier<>());
         return topology;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 7ea9b2b..d0147e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -55,8 +55,8 @@ import static org.junit.Assert.fail;
 
 public class TopologyTest {
 
-    private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
-    private final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
+    private final StoreBuilder<MockKeyValueStore> storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
+    private final KeyValueStoreBuilder<?, ?> globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
     private final Topology topology = new Topology();
     private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
 
@@ -87,7 +87,7 @@ public class TopologyTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullNameWhenAddingProcessor() {
-        topology.addProcessor(null, () -> new MockProcessorSupplier().get());
+        topology.addProcessor(null, () -> new MockProcessorSupplier<>().get());
     }
 
     @Test(expected = NullPointerException.class)
@@ -169,9 +169,9 @@ public class TopologyTest {
     @Test
     public void shouldNotAllowToAddProcessorWithSameName() {
         topology.addSource("source", "topic-1");
-        topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+        topology.addProcessor("processor", new MockProcessorSupplier<>(), "source");
         try {
-            topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+            topology.addProcessor("processor", new MockProcessorSupplier<>(), "source");
             fail("Should throw TopologyException for duplicate processor name");
         } catch (final TopologyException expected) { }
     }
@@ -180,7 +180,7 @@ public class TopologyTest {
     public void shouldNotAllowToAddProcessorWithEmptyParents() {
         topology.addSource("source", "topic-1");
         try {
-            topology.addProcessor("processor", new MockProcessorSupplier());
+            topology.addProcessor("processor", new MockProcessorSupplier<>());
             fail("Should throw TopologyException for processor without at least one parent node");
         } catch (final TopologyException expected) { }
     }
@@ -189,19 +189,19 @@ public class TopologyTest {
     public void shouldNotAllowToAddProcessorWithNullParents() {
         topology.addSource("source", "topic-1");
         try {
-            topology.addProcessor("processor", new MockProcessorSupplier(), (String) null);
+            topology.addProcessor("processor", new MockProcessorSupplier<>(), (String) null);
             fail("Should throw NullPointerException for processor when null parent names are provided");
         } catch (final NullPointerException expected) { }
     }
 
     @Test(expected = TopologyException.class)
     public void shouldFailOnUnknownSource() {
-        topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+        topology.addProcessor("processor", new MockProcessorSupplier<>(), "source");
     }
 
     @Test(expected = TopologyException.class)
     public void shouldFailIfNodeIsItsOwnParent() {
-        topology.addProcessor("processor", new MockProcessorSupplier(), "processor");
+        topology.addProcessor("processor", new MockProcessorSupplier<>(), "processor");
     }
 
     @Test
@@ -217,7 +217,7 @@ public class TopologyTest {
     @Test
     public void shouldNotAllowToAddSinkWithEmptyParents() {
         topology.addSource("source", "topic-1");
-        topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+        topology.addProcessor("processor", new MockProcessorSupplier<>(), "source");
         try {
             topology.addSink("sink", "topic-2");
             fail("Should throw TopologyException for sink without at least one parent node");
@@ -227,7 +227,7 @@ public class TopologyTest {
     @Test
     public void shouldNotAllowToAddSinkWithNullParents() {
         topology.addSource("source", "topic-1");
-        topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+        topology.addProcessor("processor", new MockProcessorSupplier<>(), "source");
         try {
             topology.addSink("sink", "topic-2", (String) null);
             fail("Should throw NullPointerException for sink when null parent names are provided");
@@ -333,12 +333,12 @@ public class TopologyTest {
         }
     }
 
-    private static class LocalMockProcessorSupplier implements ProcessorSupplier {
+    private static class LocalMockProcessorSupplier implements ProcessorSupplier<Object, Object> {
         final static String STORE_NAME = "store";
 
         @Override
-        public Processor get() {
-            return new Processor() {
+        public Processor<Object, Object> get() {
+            return new Processor<Object, Object>() {
                 @Override
                 public void init(final ProcessorContext context) {
                     context.getStateStore(STORE_NAME);
@@ -364,7 +364,7 @@ public class TopologyTest {
             null,
             "anyTopicName",
             "sameName",
-            new MockProcessorSupplier());
+            new MockProcessorSupplier<>());
     }
 
     @Test
@@ -375,16 +375,16 @@ public class TopologyTest {
     @Test
     public void sinkShouldReturnNullTopicWithDynamicRouting() {
         final TopologyDescription.Sink expectedSinkNode =
-            new InternalTopologyBuilder.Sink("sink", (key, value, record) -> record.topic() + "-" + key);
+            new InternalTopologyBuilder.Sink<>("sink", (key, value, record) -> record.topic() + "-" + key);
 
         assertThat(expectedSinkNode.topic(), equalTo(null));
     }
 
     @Test
     public void sinkShouldReturnTopicNameExtractorWithDynamicRouting() {
-        final TopicNameExtractor topicNameExtractor = (key, value, record) -> record.topic() + "-" + key;
+        final TopicNameExtractor<?, ?> topicNameExtractor = (key, value, record) -> record.topic() + "-" + key;
         final TopologyDescription.Sink expectedSinkNode =
-            new InternalTopologyBuilder.Sink("sink", topicNameExtractor);
+            new InternalTopologyBuilder.Sink<>("sink", topicNameExtractor);
 
         assertThat(expectedSinkNode.topicNameExtractor(), equalTo(topicNameExtractor));
     }
@@ -1127,10 +1127,10 @@ public class TopologyTest {
             parentNames[i] = parents[i].name();
         }
 
-        topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
+        topology.addProcessor(processorName, new MockProcessorSupplier<>(), parentNames);
         if (newStores) {
             for (final String store : storeNames) {
-                final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
+                final StoreBuilder<?> storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
                 EasyMock.expect(storeBuilder.name()).andReturn(store).anyTimes();
                 EasyMock.replay(storeBuilder);
                 topology.addStateStore(storeBuilder, processorName);
@@ -1174,7 +1174,7 @@ public class TopologyTest {
                                                                 final String globalTopicName,
                                                                 final String processorName,
                                                                 final int id) {
-        final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
+        final KeyValueStoreBuilder<?, ?> globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
         EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).anyTimes();
         EasyMock.replay(globalStoreBuilder);
         topology.addGlobalStore(
@@ -1185,7 +1185,7 @@ public class TopologyTest {
             null,
             globalTopicName,
             processorName,
-            new MockProcessorSupplier());
+            new MockProcessorSupplier<>());
 
         final TopologyDescription.GlobalStore expectedGlobalStore = new InternalTopologyBuilder.GlobalStore(
             sourceName,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 8a76164..272cfa8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -116,7 +116,7 @@ public class GlobalThreadShutDownOrderTest {
             storeBuilder,
             globalStoreTopic,
             Consumed.with(Serdes.String(), Serdes.Long()),
-            new MockProcessorSupplier());
+            new MockProcessorSupplier<>());
 
         builder
             .stream(streamTopic, stringLongConsumed)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index cddc531..1463739 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -227,7 +228,7 @@ public class RegexSourceIntegrationTest {
     public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException {
 
         final ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
-        final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("testStateStore", false);
+        final StoreBuilder<KeyValueStore<Object, Object>> storeBuilder = new MockKeyValueStoreBuilder("testStateStore", false);
         final long thirtySecondTimeout = 30 * 1000;
 
         final TopologyWrapper topology = new TopologyWrapper();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 4280e9c..0d75452 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.junit.Test;
 
@@ -63,7 +62,7 @@ public class GraphGraceSearchUtilTest {
                 },
                 "dummy"
             ),
-            (StoreBuilder<? extends StateStore>) null
+            (StoreBuilder<?>) null
         );
 
         final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
@@ -91,7 +90,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            (StoreBuilder<? extends StateStore>) null
+            (StoreBuilder<?>) null
         );
 
         final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@@ -114,7 +113,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            (StoreBuilder<? extends StateStore>) null
+            (StoreBuilder<?>) null
         );
 
         final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@@ -129,7 +128,7 @@ public class GraphGraceSearchUtilTest {
             new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>(
                 windows, "asdf", null, null, null
             ), "asdf"),
-            (StoreBuilder<? extends StateStore>) null
+            (StoreBuilder<?>) null
         );
 
         final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>(
@@ -147,7 +146,7 @@ public class GraphGraceSearchUtilTest {
                 },
                 "dummy"
             ),
-            (StoreBuilder<? extends StateStore>) null
+            (StoreBuilder<?>) null
         );
         graceGrandparent.addChild(statefulParent);
 
@@ -173,7 +172,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            (StoreBuilder<? extends StateStore>) null
+            (StoreBuilder<?>) null
         );
 
         final ProcessorGraphNode<String, Long> statelessParent = new ProcessorGraphNode<>("stateless", null);
@@ -200,7 +199,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            (StoreBuilder<? extends StateStore>) null
+            (StoreBuilder<?>) null
         );
 
         final StatefulProcessorNode<String, Long> rightParent = new StatefulProcessorNode<>(
@@ -214,7 +213,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            (StoreBuilder<? extends StateStore>) null
+            (StoreBuilder<?>) null
         );
 
         final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index e5d12e6..64c472f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -494,7 +494,7 @@ public class ProcessorTopologyTest {
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
                 Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String());
         topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic")
-                .addProcessor(processor, () -> new StatefulProcessor(storeSupplier.name()), "source")
+                .addProcessor(processor, (ProcessorSupplier<String, String>) () -> new StatefulProcessor(storeSupplier.name()), "source")
                 .addStateStore(storeBuilder, processor);
         return topology.getInternalBuilder("anyAppId").buildTopology();
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 71e15d4..f6f00b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -86,13 +86,13 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @SuppressWarnings("unchecked")
     @Override
     protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
-        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
                 Stores.persistentKeyValueStore("cache-store"),
                 (Serde<K>) context.keySerde(),
                 (Serde<V>) context.valueSerde())
                 .withCachingEnabled();
 
-        final KeyValueStore<K, V> store = (KeyValueStore<K, V>) storeBuilder.build();
+        final KeyValueStore<K, V> store = storeBuilder.build();
         store.init(context, store);
         return store;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index 0848970..7c0d16c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -30,15 +29,15 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {
     @SuppressWarnings("unchecked")
     @Override
     protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
-        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
             Stores.inMemoryKeyValueStore("my-store"),
             (Serde<K>) context.keySerde(),
             (Serde<V>) context.valueSerde())
             .withLoggingEnabled(Collections.singletonMap("retention.ms", "1000"));
 
-        final StateStore store = storeBuilder.build();
+        final KeyValueStore<K, V> store = storeBuilder.build();
         store.init(context, store);
 
-        return (KeyValueStore<K, V>) store;
+        return store;
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index ef5d6dc..62f8949 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -33,14 +32,14 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @SuppressWarnings("unchecked")
     @Override
     protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
-        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
                 Stores.inMemoryKeyValueStore("my-store"),
                 (Serde<K>) context.keySerde(),
                 (Serde<V>) context.valueSerde());
 
-        final StateStore store = storeBuilder.build();
+        final KeyValueStore<K, V> store = storeBuilder.build();
         store.init(context, store);
-        return (KeyValueStore<K, V>) store;
+        return store;
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 1bd4045..2a86cdd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -19,13 +19,11 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
 
-
 import java.util.Arrays;
 import java.util.List;
 
@@ -39,16 +37,15 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
     @SuppressWarnings("unchecked")
     @Override
     protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
-
-        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
                 Stores.lruMap("my-store", 10),
                 (Serde<K>) context.keySerde(),
                 (Serde<V>) context.valueSerde());
 
-        final StateStore store = storeBuilder.build();
+        final KeyValueStore<K, V> store = storeBuilder.build();
         store.init(context, store);
 
-        return (KeyValueStore<K, V>) store;
+        return store;
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
index 3f0b347..90f4850 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
@@ -44,8 +44,9 @@ public class InMemoryTimeOrderedKeyValueBufferTest {
         final String expect = "3";
         final Map<String, String> logConfig = new HashMap<>();
         logConfig.put("min.insync.replicas", expect);
-        final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null)
-            .withLoggingEnabled(logConfig);
+        final StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<Object, Object>> builder =
+            new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null)
+                .withLoggingEnabled(logConfig);
 
         assertThat(builder.logConfig(), is(singletonMap("min.insync.replicas", expect)));
         assertThat(builder.loggingEnabled(), is(true));
@@ -53,8 +54,9 @@ public class InMemoryTimeOrderedKeyValueBufferTest {
 
     @Test
     public void bufferShouldAllowLoggingDisablement() {
-        final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null)
-            .withLoggingDisabled();
+        final StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<Object, Object>> builder
+            = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null)
+                .withLoggingDisabled();
 
         assertThat(builder.logConfig(), is(emptyMap()));
         assertThat(builder.loggingEnabled(), is(false));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 8db040e..504aa9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -40,14 +39,14 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @SuppressWarnings("unchecked")
     @Override
     protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
-        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
                 Stores.persistentKeyValueStore("my-store"),
                 (Serde<K>) context.keySerde(),
                 (Serde<V>) context.valueSerde());
 
-        final StateStore store = storeBuilder.build();
+        final KeyValueStore<K, V> store = storeBuilder.build();
         store.init(context, store);
-        return (KeyValueStore<K, V>) store;
+        return store;
     }
 
     public static class TheRocksDbConfigSetter implements RocksDBConfigSetter {
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 55cd5fa..0c05d8f 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -176,10 +176,10 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
    *
    * @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
    */
-  def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
-                     topic: String,
-                     consumed: Consumed[_, _],
-                     stateUpdateSupplier: ProcessorSupplier[_, _]): StreamsBuilderJ =
+  def addGlobalStore[K, V](storeBuilder: StoreBuilder[_ <: StateStore],
+                           topic: String,
+                           consumed: Consumed[K, V],
+                           stateUpdateSupplier: ProcessorSupplier[K, V]): StreamsBuilderJ =
     inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)
 
   def build(): Topology = inner.build()
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 32c479c..fe901ed 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -247,12 +247,12 @@ public class MockProcessorContextTest {
 
         final MockProcessorContext context = new MockProcessorContext();
 
-        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+        final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
                 Stores.inMemoryKeyValueStore("my-state"),
                 Serdes.String(),
                 Serdes.Long()).withLoggingDisabled();
 
-        final KeyValueStore<String, Long> store = (KeyValueStore<String, Long>) storeBuilder.build();
+        final KeyValueStore<String, Long> store = storeBuilder.build();
 
         store.init(context, store);
 
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 60ab516..fb5585f 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -371,7 +370,7 @@ public class TopologyTestDriverTest {
 
         for (final String sourceTopicName : sourceTopicNames) {
             topology.addGlobalStore(
-                Stores.<Bytes, byte[]>keyValueStoreBuilder(
+                Stores.keyValueStoreBuilder(
                     Stores.inMemoryKeyValueStore(
                         sourceTopicName + "-globalStore"),
                     null,