You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/10/05 00:38:32 UTC
[kafka] branch trunk updated: MINOR: Fix generic type of
ProcessorParameters (#5741)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d76805f MINOR: Fix generic type of ProcessorParameters (#5741)
d76805f is described below
commit d76805f0fd2d65e9fb5582e4e0d30c1bd3fccd89
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Oct 4 19:37:53 2018 -0500
MINOR: Fix generic type of ProcessorParameters (#5741)
In unrelated recent work, I noticed some warnings about the missing type parameters on ProcessorParameters.
While investigating it, it seems like there was a bug in the creation of repartition topics.
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../internals/GroupedStreamAggregateBuilder.java | 7 +-
.../kstream/internals/KGroupedTableImpl.java | 22 +--
.../streams/kstream/internals/KStreamImpl.java | 184 +++++++++++----------
.../internals/graph/ProcessorParameters.java | 12 +-
.../internals/graph/StatefulProcessorNode.java | 6 +-
.../internals/graph/GraphGraceSearchUtilTest.java | 8 +-
6 files changed, 125 insertions(+), 114 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 3439cf5..8e6f990 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -84,16 +84,17 @@ class GroupedStreamAggregateBuilder<K, V> {
builder.addGraphNode(parentNode, repartitionNode);
parentNode = repartitionNode;
}
- final StatefulProcessorNode.StatefulProcessorNodeBuilder<K, T> statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder();
+ final StatefulProcessorNode.StatefulProcessorNodeBuilder<K, V> statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder();
+
+ final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName);
- final ProcessorParameters processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName);
statefulProcessorNodeBuilder
.withProcessorParameters(processorParameters)
.withNodeName(aggFunctionName)
.withRepartitionRequired(repartitionRequired)
.withStoreBuilder(storeBuilder);
- final StatefulProcessorNode<K, T> statefulProcessorNode = statefulProcessorNodeBuilder.build();
+ final StatefulProcessorNode<K, V> statefulProcessorNode = statefulProcessorNodeBuilder.build();
builder.addGraphNode(parentNode, statefulProcessorNode);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index c97576b..013028d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -47,7 +47,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
private static final String REDUCE_NAME = "KTABLE-REDUCE-";
- protected final String userSpecifiedName;
+ private final String userSpecifiedName;
private final Initializer<Long> countInitializer = () -> 0L;
@@ -72,7 +72,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME);
final String funcName = builder.newProcessorName(functionName);
final String repartitionTopic = (userSpecifiedName != null ? userSpecifiedName : materialized.storeName())
- + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
+ + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, sourceName, repartitionTopic);
@@ -98,16 +98,18 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
builder);
}
- private <T> StatefulProcessorNode getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
- final String functionName,
- final ProcessorSupplier<K, Change<V>> aggregateSupplier) {
+ private <T> StatefulProcessorNode<K, Change<V>> getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
+ final String functionName,
+ final ProcessorSupplier<K, Change<V>> aggregateSupplier) {
final ProcessorParameters<K, Change<V>> aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName);
- return StatefulProcessorNode.statefulProcessorNodeBuilder()
+ return StatefulProcessorNode
+ .<K, Change<V>>statefulProcessorNodeBuilder()
.withNodeName(functionName)
.withProcessorParameters(aggregateFunctionProcessorParams)
- .withStoreBuilder(new KeyValueStoreMaterializer<>(materialized).materialize()).build();
+ .withStoreBuilder(new KeyValueStoreMaterializer<>(materialized).materialize())
+ .build();
}
private GroupedTableOperationRepartitionNode<K, V> createRepartitionNode(final String sinkName,
@@ -164,9 +166,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
}
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(materializedInternal.storeName(),
- countInitializer,
- countAdder,
- countSubtractor);
+ countInitializer,
+ countAdder,
+ countSubtractor);
return doAggregate(aggregateSupplier, AGGREGATE_NAME, materializedInternal);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 96fa8b9..49dbbd1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
+import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
@@ -165,7 +166,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
- public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
+ public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper);
@@ -178,10 +179,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
- private <K1> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
+ private <KR> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
final String name = builder.newProcessorName(KEY_SELECT_NAME);
- final KStreamMap<K, V, K1, V> kStreamMap = new KStreamMap<>((key, value) -> new KeyValue<>(mapper.apply(key, value), value));
+ final KStreamMap<K, V, KR, V> kStreamMap = new KStreamMap<>((key, value) -> new KeyValue<>(mapper.apply(key, value), value));
final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(kStreamMap, name);
@@ -189,7 +190,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
- public <K1, V1> KStream<K1, V1> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
+ public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
final String name = builder.newProcessorName(MAP_NAME);
@@ -212,7 +213,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
- public <V1> KStream<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) {
+ public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper) {
return mapValues(withKey(mapper));
}
@@ -250,7 +251,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
- public <K1, V1> KStream<K1, V1> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
+ public <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
final String name = builder.newProcessorName(FLATMAP_NAME);
@@ -271,7 +272,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
- public <V1> KStream<K, V1> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
+ public <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper) {
return flatMapValues(withKey(mapper));
}
@@ -446,19 +447,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
- public <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
+ public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
final String name = builder.newProcessorName(TRANSFORM_NAME);
- final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamTransform<>(transformerSupplier), name);
-
+ final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
+ name,
+ new ProcessorParameters<>(new KStreamTransform<>(transformerSupplier), name),
+ stateStoreNames,
+ null,
+ true
+ );
- final StatefulProcessorNode<K1, V1> transformNode = new StatefulProcessorNode<>(name,
- processorParameters,
- stateStoreNames,
- null,
- true);
transformNode.keyChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, transformNode);
@@ -467,7 +468,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
- public <V1> KStream<K, V1> transformValues(final ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier,
+ public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
@@ -486,14 +487,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final String... stateStoreNames) {
final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
+ final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
+ name,
+ new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name),
+ stateStoreNames,
+ null,
+ repartitionRequired
+ );
- final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name);
-
- final StatefulProcessorNode<K, VR> transformNode = new StatefulProcessorNode<>(name,
- processorParameters,
- stateStoreNames,
- null,
- repartitionRequired);
transformNode.setValueChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, transformNode);
@@ -508,19 +509,21 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
Objects.requireNonNull(processorSupplier, "ProcessSupplier cant' be null");
final String name = builder.newProcessorName(PROCESSOR_NAME);
- final ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
- final StatefulProcessorNode<K, V> processNode = new StatefulProcessorNode<>(name,
- processorParameters,
- stateStoreNames,
- null,
- repartitionRequired);
+ final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>(
+ name,
+ new ProcessorParameters<>(processorSupplier, name),
+ stateStoreNames,
+ null,
+ repartitionRequired
+ );
+
builder.addGraphNode(this.streamsGraphNode, processNode);
}
@Override
- public <V1, R> KStream<K, R> join(final KStream<K, V1> other,
- final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
- final JoinWindows windows) {
+ public <VO, VR> KStream<K, VR> join(final KStream<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+ final JoinWindows windows) {
return join(other, joiner, windows, Joined.with(null, null, null));
}
@@ -539,9 +542,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
- public <V1, R> KStream<K, R> outerJoin(final KStream<K, V1> other,
- final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
- final JoinWindows windows) {
+ public <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+ final JoinWindows windows) {
return outerJoin(other, joiner, windows, Joined.with(null, null, null));
}
@@ -553,28 +556,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
return doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, true));
}
- private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other,
- final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
- final JoinWindows windows,
- final Joined<K, V, V1> joined,
- final KStreamImplJoin join) {
+ private <VO, VR> KStream<K, VR> doJoin(final KStream<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+ final JoinWindows windows,
+ final Joined<K, V, VO> joined,
+ final KStreamImplJoin join) {
Objects.requireNonNull(other, "other KStream can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(windows, "windows can't be null");
Objects.requireNonNull(joined, "joined can't be null");
KStreamImpl<K, V> joinThis = this;
- KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other;
+ KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>) other;
if (joinThis.repartitionRequired) {
final String leftJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-left" : joinThis.name;
- joinThis = joinThis.repartitionForJoin(Joined.with(joined.keySerde(), joined.valueSerde(), joined.otherValueSerde(), leftJoinRepartitionTopicName));
+ joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, joined.keySerde(), joined.valueSerde());
}
if (joinOther.repartitionRequired) {
final String rightJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-right" : joinOther.name;
- final Joined newJoined = Joined.with(joined.keySerde(), joined.valueSerde(), joined.otherValueSerde(), rightJoinRepartitionTopicName);
- joinOther = joinOther.repartitionForJoin(newJoined);
+ joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, joined.keySerde(), joined.otherValueSerde());
}
joinThis.ensureJoinableWith(joinOther);
@@ -591,18 +593,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
/**
* Repartition a stream. This is required on join operations occurring after
* an operation that changes the key, i.e, selectKey, map(..), flatMap(..).
- *
- * @param joined joined control object
- * @return a new {@link KStreamImpl}
*/
- private KStreamImpl<K, V> repartitionForJoin(final Joined<K, V, ?> joined) {
- final Serde<K> repartitionKeySerde = joined.keySerde() != null ? joined.keySerde() : keySerde;
- final Serde<V> repartitionValueSerde = joined.valueSerde() != null ? joined.valueSerde() : valSerde;
- final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+ private KStreamImpl<K, V> repartitionForJoin(final String repartitionName,
+ final Serde<K> keySerdeOverride,
+ final Serde<V> valueSerdeOverride) {
+ final Serde<K> repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : keySerde;
+ final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valSerde;
+ final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder =
+ OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
final String repartitionedSourceName = createRepartitionedSource(builder,
repartitionKeySerde,
repartitionValueSerde,
- joined.name(),
+ repartitionName,
optimizableRepartitionNodeBuilder);
final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build();
@@ -615,7 +617,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final Serde<K1> keySerde,
final Serde<V1> valSerde,
final String repartitionTopicNamePrefix,
- final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K1, V1> optimizableRepartitionNodeBuilder) {
+ final OptimizableRepartitionNodeBuilder<K1, V1> optimizableRepartitionNodeBuilder) {
final String repartitionTopic = repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX;
@@ -644,9 +646,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
- public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other,
- final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
- final JoinWindows windows) {
+ public <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+ final JoinWindows windows) {
return leftJoin(other, joiner, windows, Joined.with(null, null, null));
}
@@ -667,21 +669,24 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
- public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
- final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+ public <VO, VR> KStream<K, VR> join(final KTable<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
return join(other, joiner, Joined.with(null, null, null));
}
@Override
- public <VT, VR> KStream<K, VR> join(final KTable<K, VT> other,
- final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
- final Joined<K, V, VT> joined) {
+ public <VO, VR> KStream<K, VR> join(final KTable<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+ final Joined<K, V, VO> joined) {
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
if (repartitionRequired) {
- final Joined<K, V, ?> updatedJoined = joined.name() != null ? joined : joined.withName(name);
- final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(updatedJoined);
+ final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
+ joined.name() != null ? joined.name() : name,
+ joined.keySerde(),
+ joined.valueSerde()
+ );
return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, false);
} else {
return doStreamTableJoin(other, joiner, joined, false);
@@ -689,20 +694,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
- public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+ public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
return leftJoin(other, joiner, Joined.with(null, null, null));
}
@Override
- public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other,
- final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
- final Joined<K, V, VT> joined) {
+ public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+ final Joined<K, V, VO> joined) {
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
if (repartitionRequired) {
- final Joined<K, V, ?> updatedJoined = joined.name() != null ? joined : joined.withName(name);
- final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(updatedJoined);
+ final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
+ joined.name() != null ? joined.name() : name,
+ joined.keySerde(),
+ joined.valueSerde()
+ );
return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, true);
} else {
return doStreamTableJoin(other, joiner, joined, true);
@@ -710,28 +718,28 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
- public <K1, V1, V2> KStream<K, V2> join(final GlobalKTable<K1, V1> globalTable,
- final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper,
- final ValueJoiner<? super V, ? super V1, ? extends V2> joiner) {
+ public <KG, VG, VR> KStream<K, VR> join(final GlobalKTable<KG, VG> globalTable,
+ final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
+ final ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
return globalTableJoin(globalTable, keyMapper, joiner, false);
}
@Override
- public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable,
- final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper,
- final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+ public <KG, VG, VR> KStream<K, VR> leftJoin(final GlobalKTable<KG, VG> globalTable,
+ final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
+ final ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
return globalTableJoin(globalTable, keyMapper, joiner, true);
}
- private <K1, V1, V2> KStream<K, V2> globalTableJoin(final GlobalKTable<K1, V1> globalTable,
- final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper,
- final ValueJoiner<? super V, ? super V1, ? extends V2> joiner,
+ private <KG, VG, VR> KStream<K, VR> globalTableJoin(final GlobalKTable<KG, VG> globalTable,
+ final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
+ final ValueJoiner<? super V, ? super VG, ? extends VR> joiner,
final boolean leftJoin) {
Objects.requireNonNull(globalTable, "globalTable can't be null");
Objects.requireNonNull(keyMapper, "keyMapper can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
- final KTableValueGetterSupplier<K1, V1> valueGetterSupplier = ((GlobalKTableImpl<K1, V1>) globalTable).valueGetterSupplier();
+ final KTableValueGetterSupplier<KG, VG> valueGetterSupplier = ((GlobalKTableImpl<KG, VG>) globalTable).valueGetterSupplier();
final String name = builder.newProcessorName(LEFTJOIN_NAME);
final ProcessorSupplier<K, V> processorSupplier = new KStreamGlobalKTableJoin<>(
@@ -753,18 +761,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@SuppressWarnings("unchecked")
- private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
- final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
- final Joined<K, V, V1> joined,
- final boolean leftJoin) {
+ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+ final Joined<K, V, VO> joined,
+ final boolean leftJoin) {
Objects.requireNonNull(other, "other KTable can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
- final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, V1>) other);
+ final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, VO>) other);
final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
final ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>(
- ((KTableImpl<K, ?, V1>) other).valueGetterSupplier(),
+ ((KTableImpl<K, ?, VO>) other).valueGetterSupplier(),
joiner,
leftJoin
);
@@ -785,7 +793,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
- public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector) {
+ public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector) {
return groupBy(selector, Grouped.with(null, valSerde));
}
@@ -906,7 +914,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamName, otherWindowStreamProcessorParams);
builder.addGraphNode(otherStreamsGraphNode, otherWindowedStreamsNode);
- final KStreamKStreamJoin<K1, R, ? super V1, ? super V2> joinThis = new KStreamKStreamJoin<>(
+ final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(
otherWindowStore.name(),
windows.beforeMs,
windows.afterMs,
@@ -914,7 +922,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
leftOuter
);
- final KStreamKStreamJoin<K1, R, ? super V2, ? super V1> joinOther = new KStreamKStreamJoin<>(
+ final KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(
thisWindowStore.name(),
windows.afterMs,
windows.beforeMs,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
index 4b767b4..4251dfa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
@@ -28,17 +28,17 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
*/
public class ProcessorParameters<K, V> {
- private final ProcessorSupplier<? super K, ? super V> processorSupplier;
+ private final ProcessorSupplier<K, V> processorSupplier;
private final String processorName;
- public ProcessorParameters(final ProcessorSupplier<? super K, ? super V> processorSupplier,
+ public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
final String processorName) {
this.processorSupplier = processorSupplier;
this.processorName = processorName;
}
- public ProcessorSupplier<? super K, ? super V> processorSupplier() {
+ public ProcessorSupplier<K, V> processorSupplier() {
return processorSupplier;
}
@@ -49,8 +49,8 @@ public class ProcessorParameters<K, V> {
@Override
public String toString() {
return "ProcessorParameters{" +
- "processor class=" + processorSupplier.get().getClass() +
- ", processor name='" + processorName + '\'' +
- '}';
+ "processor class=" + processorSupplier.get().getClass() +
+ ", processor name='" + processorName + '\'' +
+ '}';
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
index c2b445e..2dc6aad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
@@ -32,7 +32,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
public StatefulProcessorNode(final String nodeName,
- final ProcessorParameters processorParameters,
+ final ProcessorParameters<K, V> processorParameters,
final String[] storeNames,
final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder,
final boolean repartitionRequired) {
@@ -75,7 +75,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
public static final class StatefulProcessorNodeBuilder<K, V> {
- private ProcessorParameters processorSupplier;
+ private ProcessorParameters<K, V> processorSupplier;
private String nodeName;
private boolean repartitionRequired;
private String[] storeNames;
@@ -84,7 +84,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
private StatefulProcessorNodeBuilder() {
}
- public StatefulProcessorNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters processorParameters) {
+ public StatefulProcessorNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
this.processorSupplier = processorParameters;
return this;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 37265fa..20ce3ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -49,12 +49,12 @@ public class GraphGraceSearchUtilTest {
final StatefulProcessorNode<String, Long> gracelessAncestor = new StatefulProcessorNode<>(
"stateful",
new ProcessorParameters<>(
- () -> new Processor<Object, Object>() {
+ () -> new Processor<String, Long>() {
@Override
public void init(final ProcessorContext context) {}
@Override
- public void process(final Object key, final Object value) {}
+ public void process(final String key, final Long value) {}
@Override
public void close() {}
@@ -141,12 +141,12 @@ public class GraphGraceSearchUtilTest {
final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>(
"stateful",
new ProcessorParameters<>(
- () -> new Processor<Object, Object>() {
+ () -> new Processor<String, Long>() {
@Override
public void init(final ProcessorContext context) {}
@Override
- public void process(final Object key, final Object value) {}
+ public void process(final String key, final Long value) {}
@Override
public void close() {}