You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/05/21 21:02:58 UTC

[kafka] 03/05: fix processorSupplier

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 06640c7b4b91cb4799664452aeb05d7ee71d8dc7
Author: John Roesler <vv...@apache.org>
AuthorDate: Fri May 21 14:28:13 2021 -0500

    fix processorSupplier
---
 .../org/apache/kafka/streams/kstream/internals/KTableImpl.java | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index a2b8702..9733511 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -124,8 +124,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
     private static final String TOPIC_SUFFIX = "-topic";
     private static final String SINK_NAME = "KTABLE-SINK-";
 
-    private final ProcessorSupplier<?, ?> processorSupplier;
-    private final org.apache.kafka.streams.processor.api.ProcessorSupplier<?, ?, ?, ?> newProcessorSupplier;
+    // Temporarily setting the processorSupplier to type Object so that we can transition from the
+    // old ProcessorSupplier to the new api.ProcessorSupplier. This works because all accesses to
+    // this field are guarded by typechecks anyway.
+    private final Object processorSupplier;
 
     private final String queryableStoreName;
 
@@ -141,7 +143,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                       final InternalStreamsBuilder builder) {
         super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
         this.processorSupplier = processorSupplier;
-        this.newProcessorSupplier = null;
         this.queryableStoreName = queryableStoreName;
     }
 
@@ -154,8 +155,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                       final GraphNode graphNode,
                       final InternalStreamsBuilder builder) {
         super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
-        this.processorSupplier = null;
-        this.newProcessorSupplier = newProcessorSupplier;
+        this.processorSupplier = newProcessorSupplier;
         this.queryableStoreName = queryableStoreName;
     }