You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/10/05 00:40:38 UTC

[kafka] branch 2.1 updated: MINOR: Fix generic type of ProcessorParameters (#5741)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new fa3fe3e  MINOR: Fix generic type of ProcessorParameters (#5741)
fa3fe3e is described below

commit fa3fe3e68059068ac894552078155b16c21404aa
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Oct 4 19:37:53 2018 -0500

    MINOR: Fix generic type of ProcessorParameters (#5741)
    
    In unrelated recent work, I noticed some warnings about the missing type parameters on ProcessorParameters.
    
    While investigating it, it seems like there was a bug in the creation of repartition topics.
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../internals/GroupedStreamAggregateBuilder.java   |   7 +-
 .../kstream/internals/KGroupedTableImpl.java       |  22 +--
 .../streams/kstream/internals/KStreamImpl.java     | 184 +++++++++++----------
 .../internals/graph/ProcessorParameters.java       |  12 +-
 .../internals/graph/StatefulProcessorNode.java     |   6 +-
 .../internals/graph/GraphGraceSearchUtilTest.java  |   8 +-
 6 files changed, 125 insertions(+), 114 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 3439cf5..8e6f990 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -84,16 +84,17 @@ class GroupedStreamAggregateBuilder<K, V> {
             builder.addGraphNode(parentNode, repartitionNode);
             parentNode = repartitionNode;
         }
-        final StatefulProcessorNode.StatefulProcessorNodeBuilder<K, T> statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder();
+        final StatefulProcessorNode.StatefulProcessorNodeBuilder<K, V> statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder();
+
+        final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName);
 
-        final ProcessorParameters processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName);
         statefulProcessorNodeBuilder
             .withProcessorParameters(processorParameters)
             .withNodeName(aggFunctionName)
             .withRepartitionRequired(repartitionRequired)
             .withStoreBuilder(storeBuilder);
 
-        final StatefulProcessorNode<K, T> statefulProcessorNode = statefulProcessorNodeBuilder.build();
+        final StatefulProcessorNode<K, V> statefulProcessorNode = statefulProcessorNodeBuilder.build();
 
         builder.addGraphNode(parentNode, statefulProcessorNode);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index c97576b..013028d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -47,7 +47,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
 
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
-    protected final String userSpecifiedName;
+    private final String userSpecifiedName;
 
     private final Initializer<Long> countInitializer = () -> 0L;
 
@@ -72,7 +72,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
         final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME);
         final String funcName = builder.newProcessorName(functionName);
         final String repartitionTopic = (userSpecifiedName != null ? userSpecifiedName : materialized.storeName())
-                + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
+            + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
         final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, sourceName, repartitionTopic);
 
@@ -98,16 +98,18 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
                                 builder);
     }
 
-    private <T> StatefulProcessorNode getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
-                                                               final String functionName,
-                                                               final ProcessorSupplier<K, Change<V>> aggregateSupplier) {
+    private <T> StatefulProcessorNode<K, Change<V>> getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
+                                                                             final String functionName,
+                                                                             final ProcessorSupplier<K, Change<V>> aggregateSupplier) {
 
         final ProcessorParameters<K, Change<V>> aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName);
 
-        return StatefulProcessorNode.statefulProcessorNodeBuilder()
+        return StatefulProcessorNode
+            .<K, Change<V>>statefulProcessorNodeBuilder()
             .withNodeName(functionName)
             .withProcessorParameters(aggregateFunctionProcessorParams)
-            .withStoreBuilder(new KeyValueStoreMaterializer<>(materialized).materialize()).build();
+            .withStoreBuilder(new KeyValueStoreMaterializer<>(materialized).materialize())
+            .build();
     }
 
     private GroupedTableOperationRepartitionNode<K, V> createRepartitionNode(final String sinkName,
@@ -164,9 +166,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
         }
 
         final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(materializedInternal.storeName(),
-                countInitializer,
-                countAdder,
-                countSubtractor);
+                                                                                        countInitializer,
+                                                                                        countAdder,
+                                                                                        countSubtractor);
 
         return doAggregate(aggregateSupplier, AGGREGATE_NAME, materializedInternal);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 96fa8b9..49dbbd1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
+import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
@@ -165,7 +166,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @Override
-    public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
+    public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
 
         final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper);
@@ -178,10 +179,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
 
-    private <K1> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
+    private <KR> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
         final String name = builder.newProcessorName(KEY_SELECT_NAME);
 
-        final KStreamMap<K, V, K1, V> kStreamMap = new KStreamMap<>((key, value) -> new KeyValue<>(mapper.apply(key, value), value));
+        final KStreamMap<K, V, KR, V> kStreamMap = new KStreamMap<>((key, value) -> new KeyValue<>(mapper.apply(key, value), value));
 
         final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(kStreamMap, name);
 
@@ -189,7 +190,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
+    public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         final String name = builder.newProcessorName(MAP_NAME);
 
@@ -212,7 +213,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
 
     @Override
-    public <V1> KStream<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) {
+    public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper) {
         return mapValues(withKey(mapper));
     }
 
@@ -250,7 +251,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
+    public <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         final String name = builder.newProcessorName(FLATMAP_NAME);
 
@@ -271,7 +272,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @Override
-    public <V1> KStream<K, V1> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
+    public <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper) {
         return flatMapValues(withKey(mapper));
     }
 
@@ -446,19 +447,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
+    public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
                                               final String... stateStoreNames) {
         Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
         final String name = builder.newProcessorName(TRANSFORM_NAME);
 
-        final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamTransform<>(transformerSupplier), name);
-
+        final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
+            name,
+            new ProcessorParameters<>(new KStreamTransform<>(transformerSupplier), name),
+            stateStoreNames,
+            null,
+            true
+        );
 
-        final StatefulProcessorNode<K1, V1> transformNode = new StatefulProcessorNode<>(name,
-                                                                                        processorParameters,
-                                                                                        stateStoreNames,
-                                                                                        null,
-                                                                                        true);
         transformNode.keyChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, transformNode);
 
@@ -467,7 +468,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @Override
-    public <V1> KStream<K, V1> transformValues(final ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier,
+    public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
                                                final String... stateStoreNames) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
 
@@ -486,14 +487,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
                                                   final String... stateStoreNames) {
         final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
 
+        final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
+            name,
+            new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name),
+            stateStoreNames,
+            null,
+            repartitionRequired
+        );
 
-        final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name);
-
-        final StatefulProcessorNode<K, VR> transformNode = new StatefulProcessorNode<>(name,
-                                                                                       processorParameters,
-                                                                                       stateStoreNames,
-                                                                                       null,
-                                                                                       repartitionRequired);
         transformNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, transformNode);
 
@@ -508,19 +509,21 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         Objects.requireNonNull(processorSupplier, "ProcessSupplier cant' be null");
         final String name = builder.newProcessorName(PROCESSOR_NAME);
 
-        final ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
-        final StatefulProcessorNode<K, V> processNode = new StatefulProcessorNode<>(name,
-                                                                                    processorParameters,
-                                                                                    stateStoreNames,
-                                                                                    null,
-                                                                                    repartitionRequired);
+        final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>(
+            name,
+            new ProcessorParameters<>(processorSupplier, name),
+            stateStoreNames,
+            null,
+            repartitionRequired
+        );
+
         builder.addGraphNode(this.streamsGraphNode, processNode);
     }
 
     @Override
-    public <V1, R> KStream<K, R> join(final KStream<K, V1> other,
-                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-                                      final JoinWindows windows) {
+    public <VO, VR> KStream<K, VR> join(final KStream<K, VO> other,
+                                        final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+                                        final JoinWindows windows) {
         return join(other, joiner, windows, Joined.with(null, null, null));
     }
 
@@ -539,9 +542,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @Override
-    public <V1, R> KStream<K, R> outerJoin(final KStream<K, V1> other,
-                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-                                           final JoinWindows windows) {
+    public <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> other,
+                                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+                                             final JoinWindows windows) {
         return outerJoin(other, joiner, windows, Joined.with(null, null, null));
     }
 
@@ -553,28 +556,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         return doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, true));
     }
 
-    private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other,
-                                         final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-                                         final JoinWindows windows,
-                                         final Joined<K, V, V1> joined,
-                                         final KStreamImplJoin join) {
+    private <VO, VR> KStream<K, VR> doJoin(final KStream<K, VO> other,
+                                           final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+                                           final JoinWindows windows,
+                                           final Joined<K, V, VO> joined,
+                                           final KStreamImplJoin join) {
         Objects.requireNonNull(other, "other KStream can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(windows, "windows can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
 
         KStreamImpl<K, V> joinThis = this;
-        KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other;
+        KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>) other;
 
         if (joinThis.repartitionRequired) {
             final String leftJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-left" : joinThis.name;
-            joinThis = joinThis.repartitionForJoin(Joined.with(joined.keySerde(), joined.valueSerde(), joined.otherValueSerde(), leftJoinRepartitionTopicName));
+            joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, joined.keySerde(), joined.valueSerde());
         }
 
         if (joinOther.repartitionRequired) {
             final String rightJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-right" : joinOther.name;
-            final Joined newJoined = Joined.with(joined.keySerde(), joined.valueSerde(), joined.otherValueSerde(), rightJoinRepartitionTopicName);
-            joinOther = joinOther.repartitionForJoin(newJoined);
+            joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, joined.keySerde(), joined.otherValueSerde());
         }
 
         joinThis.ensureJoinableWith(joinOther);
@@ -591,18 +593,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     /**
      * Repartition a stream. This is required on join operations occurring after
      * an operation that changes the key, i.e, selectKey, map(..), flatMap(..).
-     *
-     * @param joined joined control object
-     * @return a new {@link KStreamImpl}
      */
-    private KStreamImpl<K, V> repartitionForJoin(final Joined<K, V, ?> joined) {
-        final Serde<K> repartitionKeySerde = joined.keySerde() != null ? joined.keySerde() : keySerde;
-        final Serde<V> repartitionValueSerde = joined.valueSerde() != null ? joined.valueSerde() : valSerde;
-        final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+    private KStreamImpl<K, V> repartitionForJoin(final String repartitionName,
+                                                 final Serde<K> keySerdeOverride,
+                                                 final Serde<V> valueSerdeOverride) {
+        final Serde<K> repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : keySerde;
+        final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valSerde;
+        final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder =
+            OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
         final String repartitionedSourceName = createRepartitionedSource(builder,
                                                                          repartitionKeySerde,
                                                                          repartitionValueSerde,
-                                                                         joined.name(),
+                                                                         repartitionName,
                                                                          optimizableRepartitionNodeBuilder);
 
         final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build();
@@ -615,7 +617,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
                                                      final Serde<K1> keySerde,
                                                      final Serde<V1> valSerde,
                                                      final String repartitionTopicNamePrefix,
-                                                     final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K1, V1> optimizableRepartitionNodeBuilder) {
+                                                     final OptimizableRepartitionNodeBuilder<K1, V1> optimizableRepartitionNodeBuilder) {
 
 
         final String repartitionTopic = repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX;
@@ -644,9 +646,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @Override
-    public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other,
-                                          final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-                                          final JoinWindows windows) {
+    public <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> other,
+                                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+                                            final JoinWindows windows) {
         return leftJoin(other, joiner, windows, Joined.with(null, null, null));
     }
 
@@ -667,21 +669,24 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @Override
-    public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
-                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+    public <VO, VR> KStream<K, VR> join(final KTable<K, VO> other,
+                                        final ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
         return join(other, joiner, Joined.with(null, null, null));
     }
 
     @Override
-    public <VT, VR> KStream<K, VR> join(final KTable<K, VT> other,
-                                        final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
-                                        final Joined<K, V, VT> joined) {
+    public <VO, VR> KStream<K, VR> join(final KTable<K, VO> other,
+                                        final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+                                        final Joined<K, V, VO> joined) {
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
         if (repartitionRequired) {
-            final Joined<K, V, ?> updatedJoined = joined.name() != null ? joined : joined.withName(name);
-            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(updatedJoined);
+            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
+                joined.name() != null ? joined.name() : name,
+                joined.keySerde(),
+                joined.valueSerde()
+            );
             return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, false);
         } else {
             return doStreamTableJoin(other, joiner, joined, false);
@@ -689,20 +694,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @Override
-    public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+    public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
         return leftJoin(other, joiner, Joined.with(null, null, null));
     }
 
     @Override
-    public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other,
-                                            final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
-                                            final Joined<K, V, VT> joined) {
+    public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> other,
+                                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+                                            final Joined<K, V, VO> joined) {
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
         if (repartitionRequired) {
-            final Joined<K, V, ?> updatedJoined = joined.name() != null ? joined : joined.withName(name);
-            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(updatedJoined);
+            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
+                joined.name() != null ? joined.name() : name,
+                joined.keySerde(),
+                joined.valueSerde()
+            );
             return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, true);
         } else {
             return doStreamTableJoin(other, joiner, joined, true);
@@ -710,28 +718,28 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @Override
-    public <K1, V1, V2> KStream<K, V2> join(final GlobalKTable<K1, V1> globalTable,
-                                            final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper,
-                                            final ValueJoiner<? super V, ? super V1, ? extends V2> joiner) {
+    public <KG, VG, VR> KStream<K, VR> join(final GlobalKTable<KG, VG> globalTable,
+                                            final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
+                                            final ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
         return globalTableJoin(globalTable, keyMapper, joiner, false);
     }
 
     @Override
-    public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable,
-                                              final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper,
-                                              final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+    public <KG, VG, VR> KStream<K, VR> leftJoin(final GlobalKTable<KG, VG> globalTable,
+                                                final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
+                                                final ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
         return globalTableJoin(globalTable, keyMapper, joiner, true);
     }
 
-    private <K1, V1, V2> KStream<K, V2> globalTableJoin(final GlobalKTable<K1, V1> globalTable,
-                                                        final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper,
-                                                        final ValueJoiner<? super V, ? super V1, ? extends V2> joiner,
+    private <KG, VG, VR> KStream<K, VR> globalTableJoin(final GlobalKTable<KG, VG> globalTable,
+                                                        final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
+                                                        final ValueJoiner<? super V, ? super VG, ? extends VR> joiner,
                                                         final boolean leftJoin) {
         Objects.requireNonNull(globalTable, "globalTable can't be null");
         Objects.requireNonNull(keyMapper, "keyMapper can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
-        final KTableValueGetterSupplier<K1, V1> valueGetterSupplier = ((GlobalKTableImpl<K1, V1>) globalTable).valueGetterSupplier();
+        final KTableValueGetterSupplier<KG, VG> valueGetterSupplier = ((GlobalKTableImpl<KG, VG>) globalTable).valueGetterSupplier();
         final String name = builder.newProcessorName(LEFTJOIN_NAME);
 
         final ProcessorSupplier<K, V> processorSupplier = new KStreamGlobalKTableJoin<>(
@@ -753,18 +761,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @SuppressWarnings("unchecked")
-    private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
-                                                    final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-                                                    final Joined<K, V, V1> joined,
-                                                    final boolean leftJoin) {
+    private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> other,
+                                                      final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+                                                      final Joined<K, V, VO> joined,
+                                                      final boolean leftJoin) {
         Objects.requireNonNull(other, "other KTable can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
-        final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, V1>) other);
+        final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, VO>) other);
 
         final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
         final ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>(
-            ((KTableImpl<K, ?, V1>) other).valueGetterSupplier(),
+            ((KTableImpl<K, ?, VO>) other).valueGetterSupplier(),
             joiner,
             leftJoin
         );
@@ -785,7 +793,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @Override
-    public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector) {
+    public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector) {
         return groupBy(selector, Grouped.with(null, valSerde));
     }
 
@@ -906,7 +914,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
             final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamName, otherWindowStreamProcessorParams);
             builder.addGraphNode(otherStreamsGraphNode, otherWindowedStreamsNode);
 
-            final KStreamKStreamJoin<K1, R, ? super V1, ? super V2> joinThis = new KStreamKStreamJoin<>(
+            final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(
                 otherWindowStore.name(),
                 windows.beforeMs,
                 windows.afterMs,
@@ -914,7 +922,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
                 leftOuter
             );
 
-            final KStreamKStreamJoin<K1, R, ? super V2, ? super V1> joinOther = new KStreamKStreamJoin<>(
+            final KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(
                 thisWindowStore.name(),
                 windows.afterMs,
                 windows.beforeMs,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
index 4b767b4..4251dfa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
@@ -28,17 +28,17 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
  */
 public class ProcessorParameters<K, V> {
 
-    private final ProcessorSupplier<? super K, ? super V> processorSupplier;
+    private final ProcessorSupplier<K, V> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<? super K, ? super V> processorSupplier,
+    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
                                final String processorName) {
 
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
 
-    public ProcessorSupplier<? super K, ? super V> processorSupplier() {
+    public ProcessorSupplier<K, V> processorSupplier() {
         return processorSupplier;
     }
 
@@ -49,8 +49,8 @@ public class ProcessorParameters<K, V> {
     @Override
     public String toString() {
         return "ProcessorParameters{" +
-               "processor class=" + processorSupplier.get().getClass() +
-               ", processor name='" + processorName + '\'' +
-               '}';
+            "processor class=" + processorSupplier.get().getClass() +
+            ", processor name='" + processorName + '\'' +
+            '}';
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
index c2b445e..2dc6aad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
@@ -32,7 +32,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
 
 
     public StatefulProcessorNode(final String nodeName,
-                                 final ProcessorParameters processorParameters,
+                                 final ProcessorParameters<K, V> processorParameters,
                                  final String[] storeNames,
                                  final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder,
                                  final boolean repartitionRequired) {
@@ -75,7 +75,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
 
     public static final class StatefulProcessorNodeBuilder<K, V> {
 
-        private ProcessorParameters processorSupplier;
+        private ProcessorParameters<K, V> processorSupplier;
         private String nodeName;
         private boolean repartitionRequired;
         private String[] storeNames;
@@ -84,7 +84,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
         private StatefulProcessorNodeBuilder() {
         }
 
-        public StatefulProcessorNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters processorParameters) {
+        public StatefulProcessorNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
             this.processorSupplier = processorParameters;
             return this;
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 37265fa..20ce3ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -49,12 +49,12 @@ public class GraphGraceSearchUtilTest {
         final StatefulProcessorNode<String, Long> gracelessAncestor = new StatefulProcessorNode<>(
             "stateful",
             new ProcessorParameters<>(
-                () -> new Processor<Object, Object>() {
+                () -> new Processor<String, Long>() {
                     @Override
                     public void init(final ProcessorContext context) {}
 
                     @Override
-                    public void process(final Object key, final Object value) {}
+                    public void process(final String key, final Long value) {}
 
                     @Override
                     public void close() {}
@@ -141,12 +141,12 @@ public class GraphGraceSearchUtilTest {
         final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>(
             "stateful",
             new ProcessorParameters<>(
-                () -> new Processor<Object, Object>() {
+                () -> new Processor<String, Long>() {
                     @Override
                     public void init(final ProcessorContext context) {}
 
                     @Override
-                    public void process(final Object key, final Object value) {}
+                    public void process(final String key, final Long value) {}
 
                     @Override
                     public void close() {}