You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/05/21 21:02:58 UTC
[kafka] 03/05: fix processorSupplier
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 06640c7b4b91cb4799664452aeb05d7ee71d8dc7
Author: John Roesler <vv...@apache.org>
AuthorDate: Fri May 21 14:28:13 2021 -0500
fix processorSupplier
---
.../org/apache/kafka/streams/kstream/internals/KTableImpl.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index a2b8702..9733511 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -124,8 +124,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
private static final String TOPIC_SUFFIX = "-topic";
private static final String SINK_NAME = "KTABLE-SINK-";
- private final ProcessorSupplier<?, ?> processorSupplier;
- private final org.apache.kafka.streams.processor.api.ProcessorSupplier<?, ?, ?, ?> newProcessorSupplier;
+ // Temporarily setting the processorSupplier to type Object so that we can transition from the
+ // old ProcessorSupplier to the new api.ProcessorSupplier. This works because all accesses to
+ // this field are guarded by typechecks anyway.
+ private final Object processorSupplier;
private final String queryableStoreName;
@@ -141,7 +143,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final InternalStreamsBuilder builder) {
super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
this.processorSupplier = processorSupplier;
- this.newProcessorSupplier = null;
this.queryableStoreName = queryableStoreName;
}
@@ -154,8 +155,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final GraphNode graphNode,
final InternalStreamsBuilder builder) {
super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
- this.processorSupplier = null;
- this.newProcessorSupplier = newProcessorSupplier;
+ this.processorSupplier = newProcessorSupplier;
this.queryableStoreName = queryableStoreName;
}