You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/03/19 21:29:56 UTC
[kafka] branch trunk updated: MINOR: Fix generic types in
StreamsBuilder and Topology (#8273)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 21cfd0b MINOR: Fix generic types in StreamsBuilder and Topology (#8273)
21cfd0b is described below
commit 21cfd0b453c878780236f623249b3b5ddec52e6e
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu Mar 19 14:29:15 2020 -0700
MINOR: Fix generic types in StreamsBuilder and Topology (#8273)
Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Guozhang Wang <gu...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../examples/docs/DeveloperGuideTesting.java | 4 +-
.../org/apache/kafka/streams/StreamsBuilder.java | 48 ++++---
.../java/org/apache/kafka/streams/Topology.java | 83 ++++++-----
.../streams/kstream/SessionWindowedSerializer.java | 2 +-
.../internals/CogroupedStreamAggregateBuilder.java | 31 ++---
.../internals/GroupedStreamAggregateBuilder.java | 3 +-
.../kstream/internals/InternalStreamsBuilder.java | 111 ++++++++-------
.../kstream/internals/KStreamAggregate.java | 7 +-
.../internals/graph/BaseRepartitionNode.java | 4 +-
.../kstream/internals/graph/GlobalStoreNode.java | 19 +--
.../GroupedTableOperationRepartitionNode.java | 6 +-
.../internals/graph/KTableKTableJoinNode.java | 6 +-
.../graph/OptimizableRepartitionNode.java | 6 +-
.../internals/graph/ProcessorGraphNode.java | 2 +-
.../kstream/internals/graph/StateStoreNode.java | 8 +-
.../internals/graph/StatefulProcessorNode.java | 11 +-
.../internals/InternalTopologyBuilder.java | 154 +++++++++++----------
.../state/internals/MeteredSessionStore.java | 1 -
.../org/apache/kafka/streams/KafkaStreamsTest.java | 5 +-
.../org/apache/kafka/streams/TopologyTest.java | 44 +++---
.../integration/GlobalThreadShutDownOrderTest.java | 2 +-
.../integration/RegexSourceIntegrationTest.java | 3 +-
.../internals/graph/GraphGraceSearchUtilTest.java | 17 ++-
.../processor/internals/ProcessorTopologyTest.java | 2 +-
.../state/internals/CachingKeyValueStoreTest.java | 4 +-
.../internals/InMemoryKeyValueLoggedStoreTest.java | 7 +-
.../state/internals/InMemoryKeyValueStoreTest.java | 7 +-
.../state/internals/InMemoryLRUCacheStoreTest.java | 9 +-
.../InMemoryTimeOrderedKeyValueBufferTest.java | 10 +-
.../state/internals/RocksDBKeyValueStoreTest.java | 7 +-
.../kafka/streams/scala/StreamsBuilder.scala | 8 +-
.../kafka/streams/MockProcessorContextTest.java | 4 +-
.../kafka/streams/TopologyTestDriverTest.java | 3 +-
33 files changed, 325 insertions(+), 313 deletions(-)
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index e380f20..28fccfa 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -145,14 +145,14 @@ public class DeveloperGuideTesting {
assertThat(outputTopic.isEmpty(), is(true));
}
- public class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
+ public static class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
@Override
public Processor<String, Long> get() {
return new CustomMaxAggregator();
}
}
- public class CustomMaxAggregator implements Processor<String, Long> {
+ public static class CustomMaxAggregator implements Processor<String, Long> {
ProcessorContext context;
private KeyValueStore<String, Long> store;
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 173ca67..997fe64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -474,7 +474,7 @@ public class StreamsBuilder {
* @return itself
* @throws TopologyException if state store supplier is already added
*/
- public synchronized StreamsBuilder addStateStore(final StoreBuilder builder) {
+ public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder) {
Objects.requireNonNull(builder, "builder can't be null");
internalStreamsBuilder.addStateStore(builder);
return this;
@@ -483,22 +483,23 @@ public class StreamsBuilder {
/**
* @deprecated use {@link #addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)} instead
*/
- @SuppressWarnings("unchecked")
@Deprecated
- public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder,
- final String topic,
- final String sourceName,
- final Consumed consumed,
- final String processorName,
- final ProcessorSupplier stateUpdateSupplier) {
+ public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
+ final String topic,
+ final String sourceName,
+ final Consumed<K, V> consumed,
+ final String processorName,
+ final ProcessorSupplier<K, V> stateUpdateSupplier) {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
- internalStreamsBuilder.addGlobalStore(storeBuilder,
- sourceName,
- topic,
- new ConsumedInternal<>(consumed),
- processorName,
- stateUpdateSupplier);
+ internalStreamsBuilder.addGlobalStore(
+ storeBuilder,
+ sourceName,
+ topic,
+ new ConsumedInternal<>(consumed),
+ processorName,
+ stateUpdateSupplier
+ );
return this;
}
@@ -527,17 +528,18 @@ public class StreamsBuilder {
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
- @SuppressWarnings("unchecked")
- public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder,
- final String topic,
- final Consumed consumed,
- final ProcessorSupplier stateUpdateSupplier) {
+ public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
+ final String topic,
+ final Consumed<K, V> consumed,
+ final ProcessorSupplier<K, V> stateUpdateSupplier) {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
- internalStreamsBuilder.addGlobalStore(storeBuilder,
- topic,
- new ConsumedInternal<>(consumed),
- stateUpdateSupplier);
+ internalStreamsBuilder.addGlobalStore(
+ storeBuilder,
+ topic,
+ new ConsumedInternal<>(consumed),
+ stateUpdateSupplier
+ );
return this;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index d13e4a8..cf1d3fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -252,8 +252,8 @@ public class Topology {
* @throws TopologyException if processor is already added or if topics have already been registered by another source
*/
public synchronized Topology addSource(final String name,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
+ final Deserializer<?> keyDeserializer,
+ final Deserializer<?> valueDeserializer,
final String... topics) {
internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valueDeserializer, topics);
return this;
@@ -278,8 +278,8 @@ public class Topology {
* @throws TopologyException if processor is already added or if topics have already been registered by name
*/
public synchronized Topology addSource(final String name,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
+ final Deserializer<?> keyDeserializer,
+ final Deserializer<?> valueDeserializer,
final Pattern topicPattern) {
internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valueDeserializer, topicPattern);
return this;
@@ -307,8 +307,8 @@ public class Topology {
@SuppressWarnings("overloads")
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
+ final Deserializer<?> keyDeserializer,
+ final Deserializer<?> valueDeserializer,
final String... topics) {
internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topics);
return this;
@@ -335,8 +335,8 @@ public class Topology {
*/
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
+ final Deserializer<?> keyDeserializer,
+ final Deserializer<?> valueDeserializer,
final Pattern topicPattern) {
internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topicPattern);
return this;
@@ -364,8 +364,8 @@ public class Topology {
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
+ final Deserializer<?> keyDeserializer,
+ final Deserializer<?> valueDeserializer,
final String... topics) {
internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topics);
return this;
@@ -396,8 +396,8 @@ public class Topology {
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
+ final Deserializer<?> keyDeserializer,
+ final Deserializer<?> valueDeserializer,
final Pattern topicPattern) {
internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
return this;
@@ -651,6 +651,7 @@ public class Topology {
* @return itself
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
*/
+ @SuppressWarnings("rawtypes")
public synchronized Topology addProcessor(final String name,
final ProcessorSupplier supplier,
final String... parentNames) {
@@ -666,7 +667,7 @@ public class Topology {
* @return itself
* @throws TopologyException if state store supplier is already added
*/
- public synchronized Topology addStateStore(final StoreBuilder storeBuilder,
+ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
final String... processorNames) {
internalTopologyBuilder.addStateStore(storeBuilder, processorNames);
return this;
@@ -695,16 +696,23 @@ public class Topology {
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
- @SuppressWarnings("unchecked")
- public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
- final String sourceName,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
- final String topic,
- final String processorName,
- final ProcessorSupplier stateUpdateSupplier) {
- internalTopologyBuilder.addGlobalStore(storeBuilder, sourceName, null, keyDeserializer,
- valueDeserializer, topic, processorName, stateUpdateSupplier);
+ public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
+ final String sourceName,
+ final Deserializer<K> keyDeserializer,
+ final Deserializer<V> valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier<K, V> stateUpdateSupplier) {
+ internalTopologyBuilder.addGlobalStore(
+ storeBuilder,
+ sourceName,
+ null,
+ keyDeserializer,
+ valueDeserializer,
+ topic,
+ processorName,
+ stateUpdateSupplier
+ );
return this;
}
@@ -732,17 +740,24 @@ public class Topology {
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
- @SuppressWarnings("unchecked")
- public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
- final String sourceName,
- final TimestampExtractor timestampExtractor,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
- final String topic,
- final String processorName,
- final ProcessorSupplier stateUpdateSupplier) {
- internalTopologyBuilder.addGlobalStore(storeBuilder, sourceName, timestampExtractor, keyDeserializer,
- valueDeserializer, topic, processorName, stateUpdateSupplier);
+ public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
+ final String sourceName,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer<K> keyDeserializer,
+ final Deserializer<V> valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier<K, V> stateUpdateSupplier) {
+ internalTopologyBuilder.addGlobalStore(
+ storeBuilder,
+ sourceName,
+ timestampExtractor,
+ keyDeserializer,
+ valueDeserializer,
+ topic,
+ processorName,
+ stateUpdateSupplier
+ );
return this;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
index 00491c7..a1f4876 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
@@ -50,7 +50,7 @@ public class SessionWindowedSerializer<T> implements WindowedSerializer<T> {
final String propertyName = isKey ? StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS : StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS;
final String value = (String) configs.get(propertyName);
try {
- inner = Serde.class.cast(Utils.newInstance(value, Serde.class)).serializer();
+ inner = (Utils.newInstance(value, Serde.class)).serializer();
inner.configure(configs, isKey);
} catch (final ClassNotFoundException e) {
throw new ConfigException(propertyName, value, "Serde class " + value + " could not be found.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index f4e45467..81f2d64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -38,7 +38,6 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.StoreBuilder;
class CogroupedStreamAggregateBuilder<K, VOut> {
@@ -52,7 +51,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
<KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
- final StoreBuilder<? extends StateStore> storeBuilder,
+ final StoreBuilder<?> storeBuilder,
final Serde<KR> keySerde,
final Serde<VOut> valSerde,
final String queryableName,
@@ -89,8 +88,8 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Collection<StreamsGraphNode> processors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
- for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
- final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode(
+ for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+ final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
kGroupedStream.getValue(),
initializer,
named.suffixWithOrElseGet(
@@ -127,14 +126,14 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
builder);
}
- private <W extends Window> StatefulProcessorNode getStatefulProcessorNode(final Aggregator<? super K, ? super Object, VOut> aggregator,
- final Initializer<VOut> initializer,
- final String processorName,
- final boolean stateCreated,
- final StoreBuilder<? extends StateStore> storeBuilder,
- final Windows<W> windows,
- final SessionWindows sessionWindows,
- final Merger<? super K, VOut> sessionMerger) {
+ private <W extends Window> StatefulProcessorNode<K, ?> getStatefulProcessorNode(final Aggregator<? super K, Object, VOut> aggregator,
+ final Initializer<VOut> initializer,
+ final String processorName,
+ final boolean stateCreated,
+ final StoreBuilder<?> storeBuilder,
+ final Windows<W> windows,
+ final SessionWindows sessionWindows,
+ final Merger<? super K, VOut> sessionMerger) {
final ProcessorSupplier<K, ?> kStreamAggregate;
@@ -164,19 +163,17 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
new String[]{storeBuilder.name()}
);
}
+
return statefulProcessorNode;
}
- /**
- * @return the new sourceName of the repartitioned source
- */
@SuppressWarnings("unchecked")
- private <VIn> String createRepartitionSource(final String repartitionTopicNamePrefix,
+ private <VIn> void createRepartitionSource(final String repartitionTopicNamePrefix,
final OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder,
final Serde<K> keySerde,
final Serde<?> valueSerde) {
- return KStreamImpl.createRepartitionedSource(builder,
+ KStreamImpl.createRepartitionedSource(builder,
keySerde,
(Serde<VIn>) valueSerde,
repartitionTopicNamePrefix,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 13e7218..3f3c199 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Collections;
@@ -68,7 +67,7 @@ class GroupedStreamAggregateBuilder<K, V> {
}
<KR, VR> KTable<KR, VR> build(final NamedInternal functionName,
- final StoreBuilder<? extends StateStore> storeBuilder,
+ final StoreBuilder<?> storeBuilder,
final KStreamAggProcessorSupplier<K, KR, V, VR> aggregateSupplier,
final String queryableStoreName,
final Serde<KR> keySerde,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 9509431..a374c42 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -64,7 +64,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
- private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap<>();
+ private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap<>();
private final LinkedHashSet<StreamsGraphNode> mergeNodes = new LinkedHashSet<>();
private final LinkedHashSet<StreamsGraphNode> tableSourceNodes = new LinkedHashSet<>();
@@ -197,41 +197,45 @@ public class InternalStreamsBuilder implements InternalNameProvider {
return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
}
- public synchronized void addStateStore(final StoreBuilder builder) {
- addGraphNode(root, new StateStoreNode(builder));
+ public synchronized void addStateStore(final StoreBuilder<?> builder) {
+ addGraphNode(root, new StateStoreNode<>(builder));
}
- public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
- final String sourceName,
- final String topic,
- final ConsumedInternal consumed,
- final String processorName,
- final ProcessorSupplier stateUpdateSupplier) {
-
- final StreamsGraphNode globalStoreNode = new GlobalStoreNode(storeBuilder,
- sourceName,
- topic,
- consumed,
- processorName,
- stateUpdateSupplier);
+ public synchronized <K, V> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+ final String sourceName,
+ final String topic,
+ final ConsumedInternal<K, V> consumed,
+ final String processorName,
+ final ProcessorSupplier<K, V> stateUpdateSupplier) {
+
+ final StreamsGraphNode globalStoreNode = new GlobalStoreNode<>(
+ storeBuilder,
+ sourceName,
+ topic,
+ consumed,
+ processorName,
+ stateUpdateSupplier
+ );
addGraphNode(root, globalStoreNode);
}
- public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
- final String topic,
- final ConsumedInternal consumed,
- final ProcessorSupplier stateUpdateSupplier) {
+ public synchronized <K, V> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+ final String topic,
+ final ConsumedInternal<K, V> consumed,
+ final ProcessorSupplier<K, V> stateUpdateSupplier) {
// explicitly disable logging for global stores
storeBuilder.withLoggingDisabled();
final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
- addGlobalStore(storeBuilder,
- sourceName,
- topic,
- consumed,
- processorName,
- stateUpdateSupplier);
+ addGlobalStore(
+ storeBuilder,
+ sourceName,
+ topic,
+ consumed,
+ processorName,
+ stateUpdateSupplier
+ );
}
void addGraphNode(final StreamsGraphNode parent,
@@ -242,7 +246,6 @@ public class InternalStreamsBuilder implements InternalNameProvider {
maybeAddNodeForOptimizationMetadata(child);
}
-
void addGraphNode(final Collection<StreamsGraphNode> parents,
final StreamsGraphNode child) {
Objects.requireNonNull(parents, "parent node can't be null");
@@ -271,7 +274,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
} else if (node instanceof OptimizableRepartitionNode) {
final StreamsGraphNode parentNode = getKeyChangingParentNode(node);
if (parentNode != null) {
- keyChangingOperationsToOptimizableRepartitionNodes.get(parentNode).add((OptimizableRepartitionNode) node);
+ keyChangingOperationsToOptimizableRepartitionNodes.get(parentNode).add((OptimizableRepartitionNode<?, ?>) node);
}
} else if (node.isMergeNode()) {
mergeNodes.add(node);
@@ -322,16 +325,16 @@ public class InternalStreamsBuilder implements InternalNameProvider {
private void optimizeKTableSourceTopics() {
LOG.debug("Marking KTable source nodes to optimize using source topic for changelogs ");
- tableSourceNodes.forEach(node -> ((TableSourceNode) node).reuseSourceTopicForChangeLog(true));
+ tableSourceNodes.forEach(node -> ((TableSourceNode<?, ?>) node).reuseSourceTopicForChangeLog(true));
}
- @SuppressWarnings("unchecked")
private void maybeOptimizeRepartitionOperations() {
maybeUpdateKeyChangingRepartitionNodeMap();
- final Iterator<Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>> entryIterator = keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
+ final Iterator<Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>>> entryIterator =
+ keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
while (entryIterator.hasNext()) {
- final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry = entryIterator.next();
+ final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>> entry = entryIterator.next();
final StreamsGraphNode keyChangingNode = entry.getKey();
@@ -339,7 +342,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
continue;
}
- final GroupedInternal groupedInternal = new GroupedInternal(getRepartitionSerdes(entry.getValue()));
+ final GroupedInternal<?, ?> groupedInternal = new GroupedInternal<>(getRepartitionSerdes(entry.getValue()));
final String repartitionTopicName = getFirstRepartitionTopicName(entry.getValue());
//passing in the name of the first repartition topic, re-used to create the optimized repartition topic
@@ -350,7 +353,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
// re-use parent buildPriority to make sure the single repartition graph node is evaluated before downstream nodes
optimizedSingleRepartition.setBuildPriority(keyChangingNode.buildPriority());
- for (final OptimizableRepartitionNode repartitionNodeToBeReplaced : entry.getValue()) {
+ for (final OptimizableRepartitionNode<?, ?> repartitionNodeToBeReplaced : entry.getValue()) {
final StreamsGraphNode keyChangingNodeChild = findParentNodeMatching(repartitionNodeToBeReplaced, gn -> gn.parentNodes().contains(keyChangingNode));
@@ -408,7 +411,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry : mergeNodesToKeyChangers.entrySet()) {
final StreamsGraphNode mergeKey = entry.getKey();
final Collection<StreamsGraphNode> keyChangingParents = entry.getValue();
- final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes = new LinkedHashSet<>();
+ final LinkedHashSet<OptimizableRepartitionNode<?, ?>> repartitionNodes = new LinkedHashSet<>();
for (final StreamsGraphNode keyChangingParent : keyChangingParents) {
repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
mergeNodeKeyChangingParentsToRemove.add(keyChangingParent);
@@ -421,17 +424,19 @@ public class InternalStreamsBuilder implements InternalNameProvider {
}
}
- @SuppressWarnings("unchecked")
- private OptimizableRepartitionNode createRepartitionNode(final String repartitionTopicName,
- final Serde keySerde,
- final Serde valueSerde) {
+ private <K, V> OptimizableRepartitionNode<K, V> createRepartitionNode(final String repartitionTopicName,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
- final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
- KStreamImpl.createRepartitionedSource(this,
- keySerde,
- valueSerde,
- repartitionTopicName,
- repartitionNodeBuilder);
+ final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder =
+ OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+ KStreamImpl.createRepartitionedSource(
+ this,
+ keySerde,
+ valueSerde,
+ repartitionTopicName,
+ repartitionNodeBuilder
+ );
// ensures setting the repartition topic to the name of the
// first repartition topic to get merged
@@ -452,22 +457,22 @@ public class InternalStreamsBuilder implements InternalNameProvider {
return null;
}
- private String getFirstRepartitionTopicName(final Collection<OptimizableRepartitionNode> repartitionNodes) {
+ private String getFirstRepartitionTopicName(final Collection<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
return repartitionNodes.iterator().next().repartitionTopic();
}
@SuppressWarnings("unchecked")
- private GroupedInternal getRepartitionSerdes(final Collection<OptimizableRepartitionNode> repartitionNodes) {
- Serde keySerde = null;
- Serde valueSerde = null;
+ private <K, V> GroupedInternal<K, V> getRepartitionSerdes(final Collection<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
+ Serde<K> keySerde = null;
+ Serde<V> valueSerde = null;
- for (final OptimizableRepartitionNode repartitionNode : repartitionNodes) {
+ for (final OptimizableRepartitionNode<?, ?> repartitionNode : repartitionNodes) {
if (keySerde == null && repartitionNode.keySerde() != null) {
- keySerde = repartitionNode.keySerde();
+ keySerde = (Serde<K>) repartitionNode.keySerde();
}
if (valueSerde == null && repartitionNode.valueSerde() != null) {
- valueSerde = repartitionNode.valueSerde();
+ valueSerde = (Serde<V>) repartitionNode.valueSerde();
}
if (keySerde != null && valueSerde != null) {
@@ -475,7 +480,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
}
}
- return new GroupedInternal(Grouped.with(keySerde, valueSerde));
+ return new GroupedInternal<>(Grouped.with(keySerde, valueSerde));
}
private StreamsGraphNode findParentNodeMatching(final StreamsGraphNode startSeekingNode,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 252d6e8..3c3bdd0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -58,7 +58,6 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
private TimestampedKeyValueStore<K, T> store;
- private StreamsMetricsImpl metrics;
private Sensor droppedRecordsSensor;
private TimestampedTupleForwarder<K, T> tupleForwarder;
@@ -66,8 +65,10 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
@Override
public void init(final ProcessorContext context) {
super.init(context);
- metrics = (StreamsMetricsImpl) context.metrics();
- droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
+ droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(
+ Thread.currentThread().getName(),
+ context.taskId().toString(),
+ (StreamsMetricsImpl) context.metrics());
store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
store,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
index 2cc1539..0fa61f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
@@ -29,12 +29,12 @@ public abstract class BaseRepartitionNode<K, V> extends StreamsGraphNode {
protected final String sinkName;
protected final String sourceName;
protected final String repartitionTopic;
- protected final ProcessorParameters processorParameters;
+ protected final ProcessorParameters<K, V> processorParameters;
protected final StreamPartitioner<K, V> partitioner;
BaseRepartitionNode(final String nodeName,
final String sourceName,
- final ProcessorParameters processorParameters,
+ final ProcessorParameters<K, V> processorParameters,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final String sinkName,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
index a844de6..3e077f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
@@ -14,31 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
-public class GlobalStoreNode extends StateStoreNode {
-
+public class GlobalStoreNode<K, V, S extends StateStore> extends StateStoreNode<S> {
private final String sourceName;
private final String topic;
- private final ConsumedInternal consumed;
+ private final ConsumedInternal<K, V> consumed;
private final String processorName;
- private final ProcessorSupplier stateUpdateSupplier;
+ private final ProcessorSupplier<K, V> stateUpdateSupplier;
- public GlobalStoreNode(final StoreBuilder<KeyValueStore> storeBuilder,
+ public GlobalStoreNode(final StoreBuilder<S> storeBuilder,
final String sourceName,
final String topic,
- final ConsumedInternal consumed,
+ final ConsumedInternal<K, V> consumed,
final String processorName,
- final ProcessorSupplier stateUpdateSupplier) {
+ final ProcessorSupplier<K, V> stateUpdateSupplier) {
super(storeBuilder);
this.sourceName = sourceName;
@@ -48,9 +46,7 @@ public class GlobalStoreNode extends StateStoreNode {
this.stateUpdateSupplier = stateUpdateSupplier;
}
-
@Override
- @SuppressWarnings("unchecked")
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
storeBuilder.withLoggingDisabled();
topologyBuilder.addGlobalStore(storeBuilder,
@@ -64,7 +60,6 @@ public class GlobalStoreNode extends StateStoreNode {
}
-
@Override
public String toString() {
return "GlobalStoreNode{" +
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
index a3f79c5..2617f46 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
@@ -34,7 +34,7 @@ public class GroupedTableOperationRepartitionNode<K, V> extends BaseRepartitionN
final String sinkName,
final String sourceName,
final String repartitionTopic,
- final ProcessorParameters processorParameters) {
+ final ProcessorParameters<K, V> processorParameters) {
super(
nodeName,
@@ -115,7 +115,7 @@ public class GroupedTableOperationRepartitionNode<K, V> extends BaseRepartitionN
private String nodeName;
private String sourceName;
private String repartitionTopic;
- private ProcessorParameters processorParameters;
+ private ProcessorParameters<K, V> processorParameters;
private GroupedTableOperationRepartitionNodeBuilder() {
}
@@ -150,7 +150,7 @@ public class GroupedTableOperationRepartitionNode<K, V> extends BaseRepartitionN
return this;
}
- public GroupedTableOperationRepartitionNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters processorParameters) {
+ public GroupedTableOperationRepartitionNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
this.processorParameters = processorParameters;
return this;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 542726b..1879bb7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -82,7 +82,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
}
public String queryableStoreName() {
- return ((KTableKTableJoinMerger) mergeProcessorParameters().processorSupplier()).getQueryableName();
+ return ((KTableKTableJoinMerger<K, VR>) mergeProcessorParameters().processorSupplier()).getQueryableName();
}
/**
@@ -212,8 +212,8 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
joinOtherProcessorParameters,
new ProcessorParameters<>(
KTableKTableJoinMerger.of(
- (KTableProcessorSupplier<K, V1, VR>) (joinThisProcessorParameters.processorSupplier()),
- (KTableProcessorSupplier<K, V2, VR>) (joinOtherProcessorParameters.processorSupplier()),
+ (KTableProcessorSupplier<K, V1, VR>) joinThisProcessorParameters.processorSupplier(),
+ (KTableProcessorSupplier<K, V2, VR>) joinOtherProcessorParameters.processorSupplier(),
queryableStoreName),
nodeName),
thisJoinSide,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
index e3cf2b8..727b6aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
@@ -29,7 +29,7 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, V>
OptimizableRepartitionNode(final String nodeName,
final String sourceName,
- final ProcessorParameters processorParameters,
+ final ProcessorParameters<K, V> processorParameters,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final String sinkName,
@@ -116,7 +116,7 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, V>
public static final class OptimizableRepartitionNodeBuilder<K, V> {
private String nodeName;
- private ProcessorParameters processorParameters;
+ private ProcessorParameters<K, V> processorParameters;
private Serde<K> keySerde;
private Serde<V> valueSerde;
private String sinkName;
@@ -127,7 +127,7 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, V>
private OptimizableRepartitionNodeBuilder() {
}
- public OptimizableRepartitionNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters processorParameters) {
+ public OptimizableRepartitionNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
this.processorParameters = processorParameters;
return this;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
index 5c75a09..6eea0c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
@@ -43,7 +43,7 @@ public class ProcessorGraphNode<K, V> extends StreamsGraphNode {
this.processorParameters = processorParameters;
}
- public ProcessorParameters processorParameters() {
+ public ProcessorParameters<K, V> processorParameters() {
return processorParameters;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
index ea42cec..239ed9f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
@@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.kstream.internals.graph;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
-public class StateStoreNode extends StreamsGraphNode {
+public class StateStoreNode<S extends StateStore> extends StreamsGraphNode {
- protected final StoreBuilder storeBuilder;
+ protected final StoreBuilder<S> storeBuilder;
- public StateStoreNode(final StoreBuilder storeBuilder) {
+ public StateStoreNode(final StoreBuilder<S> storeBuilder) {
super(storeBuilder.name());
this.storeBuilder = storeBuilder;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
index 6ed2917..6acd854 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
@@ -14,13 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.kstream.internals.graph;
-
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -31,13 +28,13 @@ import java.util.stream.Stream;
public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
private final String[] storeNames;
- private final StoreBuilder<? extends StateStore> storeBuilder;
+ private final StoreBuilder<?> storeBuilder;
/**
* Create a node representing a stateful processor, where the named stores have already been registered.
*/
public StatefulProcessorNode(final ProcessorParameters<K, V> processorParameters,
- final Set<StoreBuilder<? extends StateStore>> preRegisteredStores,
+ final Set<StoreBuilder<?>> preRegisteredStores,
final Set<KTableValueGetterSupplier<?, ?>> valueGetterSuppliers) {
super(processorParameters.processorName(), processorParameters);
final Stream<String> registeredStoreNames = preRegisteredStores.stream().map(StoreBuilder::name);
@@ -65,7 +62,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
*/
public StatefulProcessorNode(final String nodeName,
final ProcessorParameters<K, V> processorParameters,
- final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder) {
+ final StoreBuilder<?> materializedKTableStoreBuilder) {
super(nodeName, processorParameters);
this.storeNames = null;
@@ -84,7 +81,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
final String processorName = processorParameters().processorName();
- final ProcessorSupplier processorSupplier = processorParameters().processorSupplier();
+ final ProcessorSupplier<K, V> processorSupplier = processorParameters().processorSupplier();
topologyBuilder.addProcessor(processorName, processorSupplier, parentNodeNames());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 92b9d33..95881ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -59,9 +59,9 @@ public class InternalTopologyBuilder {
private static final String[] NO_PREDECESSORS = {};
// node factories in a topological order
- private final Map<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
+ private final Map<String, NodeFactory<?, ?>> nodeFactories = new LinkedHashMap<>();
- private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();
+ private final Map<String, StateStoreFactory<?>> stateFactories = new HashMap<>();
private final Map<String, StoreBuilder<?>> globalStateBuilders = new LinkedHashMap<>();
@@ -124,15 +124,15 @@ public class InternalTopologyBuilder {
private Map<Integer, Set<String>> nodeGroups = null;
- public static class StateStoreFactory {
- private final StoreBuilder<?> builder;
+ public static class StateStoreFactory<S extends StateStore> {
+ private final StoreBuilder<S> builder;
private final Set<String> users = new HashSet<>();
- private StateStoreFactory(final StoreBuilder<?> builder) {
+ private StateStoreFactory(final StoreBuilder<S> builder) {
this.builder = builder;
}
- public StateStore build() {
+ public S build() {
return builder.build();
}
@@ -174,7 +174,7 @@ public class InternalTopologyBuilder {
}
}
- private static abstract class NodeFactory {
+ private static abstract class NodeFactory<K, V> {
final String name;
final String[] predecessors;
@@ -184,18 +184,18 @@ public class InternalTopologyBuilder {
this.predecessors = predecessors;
}
- public abstract ProcessorNode<?, ?> build();
+ public abstract ProcessorNode<K, V> build();
abstract AbstractNode describe();
}
- private static class ProcessorNodeFactory extends NodeFactory {
- private final ProcessorSupplier<?, ?> supplier;
+ private static class ProcessorNodeFactory<K, V> extends NodeFactory<K, V> {
+ private final ProcessorSupplier<K, V> supplier;
private final Set<String> stateStoreNames = new HashSet<>();
ProcessorNodeFactory(final String name,
final String[] predecessors,
- final ProcessorSupplier<?, ?> supplier) {
+ final ProcessorSupplier<K, V> supplier) {
super(name, predecessors.clone());
this.supplier = supplier;
}
@@ -205,7 +205,7 @@ public class InternalTopologyBuilder {
}
@Override
- public ProcessorNode<?, ?> build() {
+ public ProcessorNode<K, V> build() {
return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
}
@@ -219,19 +219,19 @@ public class InternalTopologyBuilder {
// even if it can be matched by multiple regex patterns. Only used by SourceNodeFactory
private final Map<String, Pattern> topicToPatterns = new HashMap<>();
- private class SourceNodeFactory extends NodeFactory {
+ private class SourceNodeFactory<K, V> extends NodeFactory<K, V> {
private final List<String> topics;
private final Pattern pattern;
- private final Deserializer<?> keyDeserializer;
- private final Deserializer<?> valDeserializer;
+ private final Deserializer<K> keyDeserializer;
+ private final Deserializer<V> valDeserializer;
private final TimestampExtractor timestampExtractor;
private SourceNodeFactory(final String name,
final String[] topics,
final Pattern pattern,
final TimestampExtractor timestampExtractor,
- final Deserializer<?> keyDeserializer,
- final Deserializer<?> valDeserializer) {
+ final Deserializer<K> keyDeserializer,
+ final Deserializer<V> valDeserializer) {
super(name, NO_PREDECESSORS);
this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<>();
this.pattern = pattern;
@@ -267,7 +267,7 @@ public class InternalTopologyBuilder {
}
@Override
- public ProcessorNode<?, ?> build() {
+ public ProcessorNode<K, V> build() {
final List<String> sourceTopics = nodeToSourceTopics.get(name);
// if it is subscribed via patterns, it is possible that the topic metadata has not been updated
@@ -290,7 +290,7 @@ public class InternalTopologyBuilder {
}
}
- private class SinkNodeFactory<K, V> extends NodeFactory {
+ private class SinkNodeFactory<K, V> extends NodeFactory<K, V> {
private final Serializer<K> keySerializer;
private final Serializer<V> valSerializer;
private final StreamPartitioner<? super K, ? super V> partitioner;
@@ -310,9 +310,9 @@ public class InternalTopologyBuilder {
}
@Override
- public ProcessorNode<?, ?> build() {
+ public ProcessorNode<K, V> build() {
if (topicExtractor instanceof StaticTopicNameExtractor) {
- final String topic = ((StaticTopicNameExtractor<?, ?>) topicExtractor).topicName;
+ final String topic = ((StaticTopicNameExtractor<K, V>) topicExtractor).topicName;
if (internalTopicNames.contains(topic)) {
// prefix the internal topic name with the application id
return new SinkNode<>(name, new StaticTopicNameExtractor<>(decorateTopic(topic)), keySerializer, valSerializer, partitioner);
@@ -325,8 +325,8 @@ public class InternalTopologyBuilder {
}
@Override
- Sink describe() {
- return new Sink(name, topicExtractor);
+ Sink<K, V> describe() {
+ return new Sink<>(name, topicExtractor);
}
}
@@ -346,7 +346,7 @@ public class InternalTopologyBuilder {
// maybe strip out caching layers
if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) == 0L) {
- for (final StateStoreFactory storeFactory : stateFactories.values()) {
+ for (final StateStoreFactory<?> storeFactory : stateFactories.values()) {
storeFactory.builder.withCachingDisabled();
}
@@ -384,7 +384,7 @@ public class InternalTopologyBuilder {
sourceTopicNames.add(topic);
}
- nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
+ nodeFactories.put(name, new SourceNodeFactory<>(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
nodeToSourceTopics.put(name, Arrays.asList(topics));
nodeGrouper.add(name);
nodeGroups = null;
@@ -423,7 +423,7 @@ public class InternalTopologyBuilder {
maybeAddToResetList(earliestResetPatterns, latestResetPatterns, offsetReset, topicPattern);
- nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
+ nodeFactories.put(name, new SourceNodeFactory<>(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
nodeToSourcePatterns.put(name, topicPattern);
nodeGrouper.add(name);
nodeGroups = null;
@@ -505,7 +505,7 @@ public class InternalTopologyBuilder {
}
}
- nodeFactories.put(name, new ProcessorNodeFactory(name, predecessorNames, supplier));
+ nodeFactories.put(name, new ProcessorNodeFactory<>(name, predecessorNames, supplier));
nodeGrouper.add(name);
nodeGrouper.unite(name, predecessorNames);
nodeGroups = null;
@@ -524,7 +524,7 @@ public class InternalTopologyBuilder {
throw new TopologyException("StateStore " + storeBuilder.name() + " is already added.");
}
- stateFactories.put(storeBuilder.name(), new StateStoreFactory(storeBuilder));
+ stateFactories.put(storeBuilder.name(), new StateStoreFactory<>(storeBuilder));
if (processorNames != null) {
for (final String processorName : processorNames) {
@@ -535,14 +535,14 @@ public class InternalTopologyBuilder {
nodeGroups = null;
}
- public final void addGlobalStore(final StoreBuilder<?> storeBuilder,
- final String sourceName,
- final TimestampExtractor timestampExtractor,
- final Deserializer<?> keyDeserializer,
- final Deserializer<?> valueDeserializer,
- final String topic,
- final String processorName,
- final ProcessorSupplier<?, ?> stateUpdateSupplier) {
+ public final <K, V> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+ final String sourceName,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer<K> keyDeserializer,
+ final Deserializer<V> valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier<K, V> stateUpdateSupplier) {
Objects.requireNonNull(storeBuilder, "store builder must not be null");
validateGlobalStoreArguments(sourceName,
topic,
@@ -555,17 +555,21 @@ public class InternalTopologyBuilder {
final String[] topics = {topic};
final String[] predecessors = {sourceName};
- final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName,
+ final ProcessorNodeFactory<K, V> nodeFactory = new ProcessorNodeFactory<>(
+ processorName,
predecessors,
- stateUpdateSupplier);
+ stateUpdateSupplier
+ );
globalTopics.add(topic);
- nodeFactories.put(sourceName, new SourceNodeFactory(sourceName,
+ nodeFactories.put(sourceName, new SourceNodeFactory<>(
+ sourceName,
topics,
null,
timestampExtractor,
keyDeserializer,
- valueDeserializer));
+ valueDeserializer)
+ );
nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
nodeGrouper.add(sourceName);
nodeFactory.addStateStore(storeBuilder.name());
@@ -665,7 +669,7 @@ public class InternalTopologyBuilder {
throw new TopologyException("Processor " + processorName + " is not added yet.");
}
- final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
+ final StateStoreFactory<?> stateStoreFactory = stateFactories.get(stateStoreName);
final Iterator<String> iter = stateStoreFactory.users().iterator();
if (iter.hasNext()) {
final String user = iter.next();
@@ -673,9 +677,9 @@ public class InternalTopologyBuilder {
}
stateStoreFactory.users().add(processorName);
- final NodeFactory nodeFactory = nodeFactories.get(processorName);
+ final NodeFactory<?, ?> nodeFactory = nodeFactories.get(processorName);
if (nodeFactory instanceof ProcessorNodeFactory) {
- final ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
+ final ProcessorNodeFactory<?, ?> processorNodeFactory = (ProcessorNodeFactory<?, ?>) nodeFactory;
processorNodeFactory.addStateStore(stateStoreName);
connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
} else {
@@ -683,21 +687,21 @@ public class InternalTopologyBuilder {
}
}
- private Set<SourceNodeFactory> findSourcesForProcessorPredecessors(final String[] predecessors) {
- final Set<SourceNodeFactory> sourceNodes = new HashSet<>();
+ private Set<SourceNodeFactory<?, ?>> findSourcesForProcessorPredecessors(final String[] predecessors) {
+ final Set<SourceNodeFactory<?, ?>> sourceNodes = new HashSet<>();
for (final String predecessor : predecessors) {
- final NodeFactory nodeFactory = nodeFactories.get(predecessor);
+ final NodeFactory<?, ?> nodeFactory = nodeFactories.get(predecessor);
if (nodeFactory instanceof SourceNodeFactory) {
- sourceNodes.add((SourceNodeFactory) nodeFactory);
+ sourceNodes.add((SourceNodeFactory<?, ?>) nodeFactory);
} else if (nodeFactory instanceof ProcessorNodeFactory) {
- sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory) nodeFactory).predecessors));
+ sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory<?, ?>) nodeFactory).predecessors));
}
}
return sourceNodes;
}
- private void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
- final ProcessorNodeFactory processorNodeFactory) {
+ private <K, V> void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
+ final ProcessorNodeFactory<K, V> processorNodeFactory) {
// we should never update the mapping from state store names to source topics if the store name already exists
// in the map; this scenario is possible, for example, that a state store underlying a source KTable is
// connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.
@@ -709,10 +713,10 @@ public class InternalTopologyBuilder {
final Set<String> sourceTopics = new HashSet<>();
final Set<Pattern> sourcePatterns = new HashSet<>();
- final Set<SourceNodeFactory> sourceNodesForPredecessor =
+ final Set<SourceNodeFactory<?, ?>> sourceNodesForPredecessor =
findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
- for (final SourceNodeFactory sourceNodeFactory : sourceNodesForPredecessor) {
+ for (final SourceNodeFactory<?, ?> sourceNodeFactory : sourceNodesForPredecessor) {
if (sourceNodeFactory.pattern != null) {
sourcePatterns.add(sourceNodeFactory.pattern);
} else {
@@ -849,7 +853,7 @@ public class InternalTopologyBuilder {
// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
// also make sure the state store map values following the insertion ordering
- for (final NodeFactory factory : nodeFactories.values()) {
+ for (final NodeFactory<?, ?> factory : nodeFactories.values()) {
if (nodeGroup == null || nodeGroup.contains(factory.name)) {
final ProcessorNode<?, ?> node = factory.build();
processorMap.put(node.name(), node);
@@ -857,13 +861,13 @@ public class InternalTopologyBuilder {
if (factory instanceof ProcessorNodeFactory) {
buildProcessorNode(processorMap,
stateStoreMap,
- (ProcessorNodeFactory) factory,
+ (ProcessorNodeFactory<?, ?>) factory,
node);
} else if (factory instanceof SourceNodeFactory) {
buildSourceNode(topicSourceMap,
repartitionTopics,
- (SourceNodeFactory) factory,
+ (SourceNodeFactory<?, ?>) factory,
(SourceNode<?, ?>) node);
} else if (factory instanceof SinkNodeFactory) {
@@ -913,7 +917,7 @@ public class InternalTopologyBuilder {
private void buildSourceNode(final Map<String, SourceNode<?, ?>> topicSourceMap,
final Set<String> repartitionTopics,
- final SourceNodeFactory sourceNodeFactory,
+ final SourceNodeFactory<?, ?> sourceNodeFactory,
final SourceNode<?, ?> node) {
final List<String> topics = (sourceNodeFactory.pattern != null) ?
@@ -934,7 +938,7 @@ public class InternalTopologyBuilder {
private void buildProcessorNode(final Map<String, ProcessorNode<?, ?>> processorMap,
final Map<String, StateStore> stateStoreMap,
- final ProcessorNodeFactory factory,
+ final ProcessorNodeFactory<?, ?> factory,
final ProcessorNode<?, ?> node) {
for (final String predecessor : factory.predecessors) {
@@ -944,7 +948,7 @@ public class InternalTopologyBuilder {
for (final String stateStoreName : factory.stateStoreNames) {
if (!stateStoreMap.containsKey(stateStoreName)) {
if (stateFactories.containsKey(stateStoreName)) {
- final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
+ final StateStoreFactory<?> stateStoreFactory = stateFactories.get(stateStoreName);
// remember the changelog topic if this state store is change-logging enabled
if (stateStoreFactory.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
@@ -1033,7 +1037,7 @@ public class InternalTopologyBuilder {
// if the node is connected to a state store whose changelog topics are not predefined,
// add to the changelog topics
- for (final StateStoreFactory stateFactory : stateFactories.values()) {
+ for (final StateStoreFactory<?> stateFactory : stateFactories.values()) {
if (stateFactory.users().contains(node) && storeToChangelogTopic.containsKey(stateFactory.name())) {
final String topicName = storeToChangelogTopic.get(stateFactory.name());
if (!stateChangelogTopics.containsKey(topicName)) {
@@ -1059,7 +1063,7 @@ public class InternalTopologyBuilder {
private void setRegexMatchedTopicsToSourceNodes() {
if (hasSubscriptionUpdates()) {
for (final String nodeName : nodeToSourcePatterns.keySet()) {
- final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(nodeName);
+ final SourceNodeFactory<?, ?> sourceNode = (SourceNodeFactory<?, ?>) nodeFactories.get(nodeName);
final List<String> sourceTopics = sourceNode.getTopics(subscriptionUpdates);
//need to update nodeToSourceTopics and sourceTopicNames with topics matched from given regex
nodeToSourceTopics.put(nodeName, sourceTopics);
@@ -1093,8 +1097,8 @@ public class InternalTopologyBuilder {
}
}
- private InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory factory,
- final String name) {
+ private <S extends StateStore> InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory<S> factory,
+ final String name) {
if (factory.isWindowStore()) {
final WindowedChangelogTopicConfig config = new WindowedChangelogTopicConfig(name, factory.logConfig());
config.setRetentionMs(factory.retentionPeriod());
@@ -1238,10 +1242,10 @@ public class InternalTopologyBuilder {
}
private boolean isGlobalSource(final String nodeName) {
- final NodeFactory nodeFactory = nodeFactories.get(nodeName);
+ final NodeFactory<?, ?> nodeFactory = nodeFactories.get(nodeName);
if (nodeFactory instanceof SourceNodeFactory) {
- final List<String> topics = ((SourceNodeFactory) nodeFactory).topics;
+ final List<String> topics = ((SourceNodeFactory<?, ?>) nodeFactory).topics;
return topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0));
}
return false;
@@ -1280,7 +1284,7 @@ public class InternalTopologyBuilder {
description.addGlobalStore(new GlobalStore(
node,
processorNode,
- ((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
+ ((ProcessorNodeFactory<?, ?>) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
nodeToSourceTopics.get(node).get(0),
id
));
@@ -1513,8 +1517,9 @@ public class InternalTopologyBuilder {
// omit successor to avoid infinite loops
return name.equals(source.name)
&& Objects.equals(topics, source.topics)
- && (topicPattern == null ? source.topicPattern == null :
- topicPattern.pattern().equals(source.topicPattern.pattern()));
+ && (topicPattern == null ?
+ source.topicPattern == null :
+ topicPattern.pattern().equals(source.topicPattern.pattern()));
}
@Override
@@ -1567,11 +1572,11 @@ public class InternalTopologyBuilder {
}
}
- public final static class Sink extends AbstractNode implements TopologyDescription.Sink {
- private final TopicNameExtractor<?, ?> topicNameExtractor;
+ public final static class Sink<K, V> extends AbstractNode implements TopologyDescription.Sink {
+ private final TopicNameExtractor<K, V> topicNameExtractor;
public Sink(final String name,
- final TopicNameExtractor<?, ?> topicNameExtractor) {
+ final TopicNameExtractor<K, V> topicNameExtractor) {
super(name);
this.topicNameExtractor = topicNameExtractor;
}
@@ -1585,14 +1590,14 @@ public class InternalTopologyBuilder {
@Override
public String topic() {
if (topicNameExtractor instanceof StaticTopicNameExtractor) {
- return ((StaticTopicNameExtractor<?, ?>) topicNameExtractor).topicName;
+ return ((StaticTopicNameExtractor<K, V>) topicNameExtractor).topicName;
} else {
return null;
}
}
@Override
- public TopicNameExtractor<?, ?> topicNameExtractor() {
+ public TopicNameExtractor<K, V> topicNameExtractor() {
if (topicNameExtractor instanceof StaticTopicNameExtractor) {
return null;
} else {
@@ -1614,6 +1619,7 @@ public class InternalTopologyBuilder {
+ nodeNames(predecessors);
}
+ @SuppressWarnings("unchecked")
@Override
public boolean equals(final Object o) {
if (this == o) {
@@ -1623,7 +1629,7 @@ public class InternalTopologyBuilder {
return false;
}
- final Sink sink = (Sink) o;
+ final Sink<K, V> sink = (Sink<K, V>) o;
return name.equals(sink.name)
&& topicNameExtractor.equals(sink.topicNameExtractor)
&& predecessors.equals(sink.predecessors);
@@ -1921,7 +1927,7 @@ public class InternalTopologyBuilder {
return sourceTopicNames;
}
- public synchronized Map<String, StateStoreFactory> stateStores() {
+ public synchronized Map<String, StateStoreFactory<?>> stateStores() {
return stateFactories;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 178704f..3e541e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -69,7 +69,6 @@ public class MeteredSessionStore<K, V>
@Override
public void init(final ProcessorContext context,
final StateStore root) {
- //noinspection unchecked
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 33cb6b9..7087085 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -915,13 +915,14 @@ public class KafkaStreamsTest {
final StoreBuilder<KeyValueStore<String, String>> globalStoreBuilder = Stores.keyValueStoreBuilder(
isPersistentStore ? Stores.persistentKeyValueStore(globalStoreName) : Stores.inMemoryKeyValueStore(globalStoreName),
Serdes.String(), Serdes.String()).withLoggingDisabled();
- topology.addGlobalStore(globalStoreBuilder,
+ topology.addGlobalStore(
+ globalStoreBuilder,
"global",
Serdes.String().deserializer(),
Serdes.String().deserializer(),
globalTopicName,
globalTopicName + "-processor",
- new MockProcessorSupplier<byte[], byte[]>());
+ new MockProcessorSupplier<>());
return topology;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 7ea9b2b..d0147e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -55,8 +55,8 @@ import static org.junit.Assert.fail;
public class TopologyTest {
- private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
- private final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
+ private final StoreBuilder<MockKeyValueStore> storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
+ private final KeyValueStoreBuilder<?, ?> globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
private final Topology topology = new Topology();
private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
@@ -87,7 +87,7 @@ public class TopologyTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullNameWhenAddingProcessor() {
- topology.addProcessor(null, () -> new MockProcessorSupplier().get());
+ topology.addProcessor(null, () -> new MockProcessorSupplier<>().get());
}
@Test(expected = NullPointerException.class)
@@ -169,9 +169,9 @@ public class TopologyTest {
@Test
public void shouldNotAllowToAddProcessorWithSameName() {
topology.addSource("source", "topic-1");
- topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+ topology.addProcessor("processor", new MockProcessorSupplier<>(), "source");
try {
- topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+ topology.addProcessor("processor", new MockProcessorSupplier<>(), "source");
fail("Should throw TopologyException for duplicate processor name");
} catch (final TopologyException expected) { }
}
@@ -180,7 +180,7 @@ public class TopologyTest {
public void shouldNotAllowToAddProcessorWithEmptyParents() {
topology.addSource("source", "topic-1");
try {
- topology.addProcessor("processor", new MockProcessorSupplier());
+ topology.addProcessor("processor", new MockProcessorSupplier<>());
fail("Should throw TopologyException for processor without at least one parent node");
} catch (final TopologyException expected) { }
}
@@ -189,19 +189,19 @@ public class TopologyTest {
public void shouldNotAllowToAddProcessorWithNullParents() {
topology.addSource("source", "topic-1");
try {
- topology.addProcessor("processor", new MockProcessorSupplier(), (String) null);
+ topology.addProcessor("processor", new MockProcessorSupplier<>(), (String) null);
fail("Should throw NullPointerException for processor when null parent names are provided");
} catch (final NullPointerException expected) { }
}
@Test(expected = TopologyException.class)
public void shouldFailOnUnknownSource() {
- topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+ topology.addProcessor("processor", new MockProcessorSupplier<>(), "source");
}
@Test(expected = TopologyException.class)
public void shouldFailIfNodeIsItsOwnParent() {
- topology.addProcessor("processor", new MockProcessorSupplier(), "processor");
+ topology.addProcessor("processor", new MockProcessorSupplier<>(), "processor");
}
@Test
@@ -217,7 +217,7 @@ public class TopologyTest {
@Test
public void shouldNotAllowToAddSinkWithEmptyParents() {
topology.addSource("source", "topic-1");
- topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+ topology.addProcessor("processor", new MockProcessorSupplier<>(), "source");
try {
topology.addSink("sink", "topic-2");
fail("Should throw TopologyException for sink without at least one parent node");
@@ -227,7 +227,7 @@ public class TopologyTest {
@Test
public void shouldNotAllowToAddSinkWithNullParents() {
topology.addSource("source", "topic-1");
- topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+ topology.addProcessor("processor", new MockProcessorSupplier<>(), "source");
try {
topology.addSink("sink", "topic-2", (String) null);
fail("Should throw NullPointerException for sink when null parent names are provided");
@@ -333,12 +333,12 @@ public class TopologyTest {
}
}
- private static class LocalMockProcessorSupplier implements ProcessorSupplier {
+ private static class LocalMockProcessorSupplier implements ProcessorSupplier<Object, Object> {
final static String STORE_NAME = "store";
@Override
- public Processor get() {
- return new Processor() {
+ public Processor<Object, Object> get() {
+ return new Processor<Object, Object>() {
@Override
public void init(final ProcessorContext context) {
context.getStateStore(STORE_NAME);
@@ -364,7 +364,7 @@ public class TopologyTest {
null,
"anyTopicName",
"sameName",
- new MockProcessorSupplier());
+ new MockProcessorSupplier<>());
}
@Test
@@ -375,16 +375,16 @@ public class TopologyTest {
@Test
public void sinkShouldReturnNullTopicWithDynamicRouting() {
final TopologyDescription.Sink expectedSinkNode =
- new InternalTopologyBuilder.Sink("sink", (key, value, record) -> record.topic() + "-" + key);
+ new InternalTopologyBuilder.Sink<>("sink", (key, value, record) -> record.topic() + "-" + key);
assertThat(expectedSinkNode.topic(), equalTo(null));
}
@Test
public void sinkShouldReturnTopicNameExtractorWithDynamicRouting() {
- final TopicNameExtractor topicNameExtractor = (key, value, record) -> record.topic() + "-" + key;
+ final TopicNameExtractor<?, ?> topicNameExtractor = (key, value, record) -> record.topic() + "-" + key;
final TopologyDescription.Sink expectedSinkNode =
- new InternalTopologyBuilder.Sink("sink", topicNameExtractor);
+ new InternalTopologyBuilder.Sink<>("sink", topicNameExtractor);
assertThat(expectedSinkNode.topicNameExtractor(), equalTo(topicNameExtractor));
}
@@ -1127,10 +1127,10 @@ public class TopologyTest {
parentNames[i] = parents[i].name();
}
- topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
+ topology.addProcessor(processorName, new MockProcessorSupplier<>(), parentNames);
if (newStores) {
for (final String store : storeNames) {
- final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
+ final StoreBuilder<?> storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
EasyMock.expect(storeBuilder.name()).andReturn(store).anyTimes();
EasyMock.replay(storeBuilder);
topology.addStateStore(storeBuilder, processorName);
@@ -1174,7 +1174,7 @@ public class TopologyTest {
final String globalTopicName,
final String processorName,
final int id) {
- final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
+ final KeyValueStoreBuilder<?, ?> globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).anyTimes();
EasyMock.replay(globalStoreBuilder);
topology.addGlobalStore(
@@ -1185,7 +1185,7 @@ public class TopologyTest {
null,
globalTopicName,
processorName,
- new MockProcessorSupplier());
+ new MockProcessorSupplier<>());
final TopologyDescription.GlobalStore expectedGlobalStore = new InternalTopologyBuilder.GlobalStore(
sourceName,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 8a76164..272cfa8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -116,7 +116,7 @@ public class GlobalThreadShutDownOrderTest {
storeBuilder,
globalStoreTopic,
Consumed.with(Serdes.String(), Serdes.Long()),
- new MockProcessorSupplier());
+ new MockProcessorSupplier<>());
builder
.stream(streamTopic, stringLongConsumed)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index cddc531..1463739 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -227,7 +228,7 @@ public class RegexSourceIntegrationTest {
public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException {
final ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
- final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("testStateStore", false);
+ final StoreBuilder<KeyValueStore<Object, Object>> storeBuilder = new MockKeyValueStoreBuilder("testStateStore", false);
final long thirtySecondTimeout = 30 * 1000;
final TopologyWrapper topology = new TopologyWrapper();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 4280e9c..0d75452 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.junit.Test;
@@ -63,7 +62,7 @@ public class GraphGraceSearchUtilTest {
},
"dummy"
),
- (StoreBuilder<? extends StateStore>) null
+ (StoreBuilder<?>) null
);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
@@ -91,7 +90,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
- (StoreBuilder<? extends StateStore>) null
+ (StoreBuilder<?>) null
);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@@ -114,7 +113,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
- (StoreBuilder<? extends StateStore>) null
+ (StoreBuilder<?>) null
);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@@ -129,7 +128,7 @@ public class GraphGraceSearchUtilTest {
new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>(
windows, "asdf", null, null, null
), "asdf"),
- (StoreBuilder<? extends StateStore>) null
+ (StoreBuilder<?>) null
);
final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>(
@@ -147,7 +146,7 @@ public class GraphGraceSearchUtilTest {
},
"dummy"
),
- (StoreBuilder<? extends StateStore>) null
+ (StoreBuilder<?>) null
);
graceGrandparent.addChild(statefulParent);
@@ -173,7 +172,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
- (StoreBuilder<? extends StateStore>) null
+ (StoreBuilder<?>) null
);
final ProcessorGraphNode<String, Long> statelessParent = new ProcessorGraphNode<>("stateless", null);
@@ -200,7 +199,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
- (StoreBuilder<? extends StateStore>) null
+ (StoreBuilder<?>) null
);
final StatefulProcessorNode<String, Long> rightParent = new StatefulProcessorNode<>(
@@ -214,7 +213,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
- (StoreBuilder<? extends StateStore>) null
+ (StoreBuilder<?>) null
);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index e5d12e6..64c472f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -494,7 +494,7 @@ public class ProcessorTopologyTest {
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String());
topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic")
- .addProcessor(processor, () -> new StatefulProcessor(storeSupplier.name()), "source")
+ .addProcessor(processor, (ProcessorSupplier<String, String>) () -> new StatefulProcessor(storeSupplier.name()), "source")
.addStateStore(storeBuilder, processor);
return topology.getInternalBuilder("anyAppId").buildTopology();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 71e15d4..f6f00b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -86,13 +86,13 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
- final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+ final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("cache-store"),
(Serde<K>) context.keySerde(),
(Serde<V>) context.valueSerde())
.withCachingEnabled();
- final KeyValueStore<K, V> store = (KeyValueStore<K, V>) storeBuilder.build();
+ final KeyValueStore<K, V> store = storeBuilder.build();
store.init(context, store);
return store;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index 0848970..7c0d16c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -30,15 +29,15 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
- final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+ final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-store"),
(Serde<K>) context.keySerde(),
(Serde<V>) context.valueSerde())
.withLoggingEnabled(Collections.singletonMap("retention.ms", "1000"));
- final StateStore store = storeBuilder.build();
+ final KeyValueStore<K, V> store = storeBuilder.build();
store.init(context, store);
- return (KeyValueStore<K, V>) store;
+ return store;
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index ef5d6dc..62f8949 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -33,14 +32,14 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
- final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+ final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-store"),
(Serde<K>) context.keySerde(),
(Serde<V>) context.valueSerde());
- final StateStore store = storeBuilder.build();
+ final KeyValueStore<K, V> store = storeBuilder.build();
store.init(context, store);
- return (KeyValueStore<K, V>) store;
+ return store;
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 1bd4045..2a86cdd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -19,13 +19,11 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.Test;
-
import java.util.Arrays;
import java.util.List;
@@ -39,16 +37,15 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
-
- final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+ final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.lruMap("my-store", 10),
(Serde<K>) context.keySerde(),
(Serde<V>) context.valueSerde());
- final StateStore store = storeBuilder.build();
+ final KeyValueStore<K, V> store = storeBuilder.build();
store.init(context, store);
- return (KeyValueStore<K, V>) store;
+ return store;
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
index 3f0b347..90f4850 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
@@ -44,8 +44,9 @@ public class InMemoryTimeOrderedKeyValueBufferTest {
final String expect = "3";
final Map<String, String> logConfig = new HashMap<>();
logConfig.put("min.insync.replicas", expect);
- final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null)
- .withLoggingEnabled(logConfig);
+ final StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<Object, Object>> builder =
+ new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null)
+ .withLoggingEnabled(logConfig);
assertThat(builder.logConfig(), is(singletonMap("min.insync.replicas", expect)));
assertThat(builder.loggingEnabled(), is(true));
@@ -53,8 +54,9 @@ public class InMemoryTimeOrderedKeyValueBufferTest {
@Test
public void bufferShouldAllowLoggingDisablement() {
- final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null)
- .withLoggingDisabled();
+ final StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<Object, Object>> builder
+ = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null)
+ .withLoggingDisabled();
assertThat(builder.logConfig(), is(emptyMap()));
assertThat(builder.loggingEnabled(), is(false));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 8db040e..504aa9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -40,14 +39,14 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
- final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+ final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
(Serde<K>) context.keySerde(),
(Serde<V>) context.valueSerde());
- final StateStore store = storeBuilder.build();
+ final KeyValueStore<K, V> store = storeBuilder.build();
store.init(context, store);
- return (KeyValueStore<K, V>) store;
+ return store;
}
public static class TheRocksDbConfigSetter implements RocksDBConfigSetter {
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 55cd5fa..0c05d8f 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -176,10 +176,10 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
*
* @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
*/
- def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
- topic: String,
- consumed: Consumed[_, _],
- stateUpdateSupplier: ProcessorSupplier[_, _]): StreamsBuilderJ =
+ def addGlobalStore[K, V](storeBuilder: StoreBuilder[_ <: StateStore],
+ topic: String,
+ consumed: Consumed[K, V],
+ stateUpdateSupplier: ProcessorSupplier[K, V]): StreamsBuilderJ =
inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)
def build(): Topology = inner.build()
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 32c479c..fe901ed 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -247,12 +247,12 @@ public class MockProcessorContextTest {
final MockProcessorContext context = new MockProcessorContext();
- final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+ final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-state"),
Serdes.String(),
Serdes.Long()).withLoggingDisabled();
- final KeyValueStore<String, Long> store = (KeyValueStore<String, Long>) storeBuilder.build();
+ final KeyValueStore<String, Long> store = storeBuilder.build();
store.init(context, store);
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 60ab516..fb5585f 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
@@ -371,7 +370,7 @@ public class TopologyTestDriverTest {
for (final String sourceTopicName : sourceTopicNames) {
topology.addGlobalStore(
- Stores.<Bytes, byte[]>keyValueStoreBuilder(
+ Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(
sourceTopicName + "-globalStore"),
null,