You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/26 01:25:59 UTC

[GitHub] [kafka] vvcephei opened a new pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

vvcephei opened a new pull request #9221:
URL: https://github.com/apache/kafka/pull/9221


   Converts `Topology#addProcessor` and `#addGlobalStore`
   Also, convert some of the internals in support of `addProcessor`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9221:
URL: https://github.com/apache/kafka/pull/9221


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r476953793



##########
File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##########
@@ -145,24 +145,24 @@ public void shouldPunctuateIfWallClockTimeAdvances() {
         assertThat(outputTopic.isEmpty(), is(true));
     }
 
-    public static class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
+    public static class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long, String, Long> {
         @Override
-        public Processor<String, Long> get() {
+        public Processor<String, Long, String, Long> get() {
             return new CustomMaxAggregator();
         }
     }
 
-    public static class CustomMaxAggregator implements Processor<String, Long> {
-        ProcessorContext context;
+    public static class CustomMaxAggregator implements Processor<String, Long, String, Long> {
+        ProcessorContext<String, Long> context;
         private KeyValueStore<String, Long> store;
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(final ProcessorContext context) {
+        public void init(final ProcessorContext<String, Long> context) {
             this.context = context;
             context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
             context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
-            store = (KeyValueStore<String, Long>) context.getStateStore("aggStore");
+            store = context.getStateStore("aggStore");

Review comment:
       This is a small improvement I noticed; I'll mention this on the KIP discussion if you like it. I've changed the ProcessorContext getStateStore method so that we don't have to cast the store type anymore. The generic parameters to the method take care of casting now.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -658,8 +658,42 @@ public synchronized Topology addSink(final String name,
      */
     @SuppressWarnings("rawtypes")
     public synchronized Topology addProcessor(final String name,
-                                              final ProcessorSupplier supplier,
+                                              final org.apache.kafka.streams.processor.ProcessorSupplier supplier,
                                               final String... parentNames) {
+        return addProcessor(
+            name,
+            new ProcessorSupplier<Object, Object, Object, Object>() {
+                @Override
+                public Set<StoreBuilder<?>> stores() {
+                    return supplier.stores();
+                }
+
+                @Override
+                public org.apache.kafka.streams.processor.api.Processor<Object, Object, Object, Object> get() {
+                    return ProcessorAdapter.adaptRaw(supplier.get());
+                }
+            },
+            parentNames
+        );
+    }

Review comment:
       
   
   as in previous changes, delegating the old API to the new one.
   

##########
File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##########
@@ -145,24 +145,24 @@ public void shouldPunctuateIfWallClockTimeAdvances() {
         assertThat(outputTopic.isEmpty(), is(true));
     }
 
-    public static class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
+    public static class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long, String, Long> {

Review comment:
       Since the new public API change is small, I also converted almost all of the usages of the old API to the new one.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param storeBuilder          user defined state store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                           final String sourceName,
+                                                           final Deserializer<KIn> keyDeserializer,
+                                                           final Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String processorName,
+                                                           final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {

Review comment:
       new API

##########
File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##########
@@ -180,8 +180,5 @@ private void flushStore() {
                 context.forward(next.key, next.value);
             }
         }
-
-        @Override
-        public void close() {}

Review comment:
       It has default method in the new API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
##########
@@ -26,22 +30,57 @@
  * Used by the Join nodes as there are several parameters, this abstraction helps
  * keep the number of arguments more reasonable.
  */
-public class ProcessorParameters<K, V> {
+public class ProcessorParameters<KIn, VIn, KOut, VOut> {
 
-    private final ProcessorSupplier<K, V> processorSupplier;
+    // During the transition to KIP-478, we capture arguments passed from the old API to simplify
+    // the performance of casts that we still need to perform. This will eventually be removed.
+    private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier;
+    private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
+    public ProcessorParameters(final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> processorSupplier,
                                final String processorName) {
+        oldProcessorSupplier = processorSupplier;
+        this.processorSupplier = () -> ProcessorAdapter.adapt(processorSupplier.get());
+        this.processorName = processorName;
+    }
 
+    public ProcessorParameters(final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier,
+                               final String processorName) {
+        oldProcessorSupplier = null;
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
 
-    public ProcessorSupplier<K, V> processorSupplier() {
+    public ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier() {
         return processorSupplier;
     }
 
+    public org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier() {
+        return oldProcessorSupplier;
+    }
+
+    @SuppressWarnings("unchecked")
+    KTableSource<KIn, VIn> kTableSourceSupplier() {
+        // This cast always works because KTableSource hasn't been converted yet.
+        return oldProcessorSupplier == null
+            ? null
+            : !(oldProcessorSupplier instanceof KTableSource)
+              ? null
+              : (KTableSource<KIn, VIn>) oldProcessorSupplier;
+    }

Review comment:
       This replaces a type check that was previously done elsewhere.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -482,9 +482,9 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset,
         nodeGroups = null;
     }
 
-    public final void addProcessor(final String name,
-                                   final org.apache.kafka.streams.processor.ProcessorSupplier<?, ?> supplier,
-                                   final String... predecessorNames) {
+    public final <KIn, VIn, KOut, VOut> void addProcessor(final String name,
+                                                          final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
+                                                          final String... predecessorNames) {

Review comment:
       Just converting the internal method, since we've introduced the new API in the public Topology.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -713,7 +747,7 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
                                                        final Deserializer<V> valueDeserializer,
                                                        final String topic,
                                                        final String processorName,
-                                                       final ProcessorSupplier<K, V> stateUpdateSupplier) {
+                                                       final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) {

Review comment:
       As in the other PRs, I inverted the imports, so the old API is fully qualified now, and the new API is imported.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
##########
@@ -82,14 +81,18 @@
     }
 
     public String queryableStoreName() {
-        return ((KTableKTableJoinMerger<K, VR>) mergeProcessorParameters().processorSupplier()).getQueryableName();
+        return mergeProcessorParameters().kTableKTableJoinMergerProcessorSupplier().getQueryableName();
     }
 
     /**
      * The supplier which provides processor with KTable-KTable join merge functionality.
      */
+    @SuppressWarnings("unchecked")
     public KTableKTableJoinMerger<K, VR> joinMerger() {
-        return (KTableKTableJoinMerger<K, VR>) mergeProcessorParameters().processorSupplier();
+        final KTableKTableJoinMerger<K, Change<VR>> merger =
+            mergeProcessorParameters().kTableKTableJoinMergerProcessorSupplier();
+        // this incorrect cast should be corrected by the end of the KIP-478 implementation

Review comment:
       Specifically, it'll get fixed when `KTableKTableJoinMerger` is converted.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
##########
@@ -70,7 +70,7 @@ private static long findAndVerifyWindowGrace(final StreamsGraphNode streamsGraph
 
     private static Long extractGracePeriod(final StreamsGraphNode node) {
         if (node instanceof StatefulProcessorNode) {
-            final ProcessorSupplier processorSupplier = ((StatefulProcessorNode) node).processorParameters().processorSupplier();
+            final ProcessorSupplier processorSupplier = ((StatefulProcessorNode) node).processorParameters().oldProcessorSupplier();

Review comment:
       You'll see in the ProcessorParameters class that I've captured both the old and new APIs and also internalized a few casts. This is all just an effort to limit the scope of this PR. It'll all come out in the wash once the KIP is completely implemented.

##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -87,12 +88,12 @@ public void shouldNotAllowZeroTopicsWhenAddingSource() {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullNameWhenAddingProcessor() {
-        topology.addProcessor(null, () -> new MockProcessorSupplier<>().get());
+        topology.addProcessor(null, () -> new MockApiProcessorSupplier<>().get());
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() {
-        topology.addProcessor("name", null);
+        topology.addProcessor("name", (ProcessorSupplier<Object, Object, Object, Object>) null);

Review comment:
       There are a few places where we have to cast `null` in order to resolve the right overload.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -853,15 +918,56 @@ public void process(final String key, final String value) {
         }
     }
 
-    private <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) {
+    /**
+     * A processor that stores each key-value pair in an in-memory key-value store registered with the context.
+     */
+    protected static class StatefulProcessor implements Processor<String, String, Void, Void> {
+        private KeyValueStore<String, String> store;
+        private final String storeName;
+
+        StatefulProcessor(final String storeName) {
+            this.storeName = storeName;
+        }
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(storeName);
+        }
+
+        @Override
+        public void process(final String key, final String value) {
+            store.put(key, value);
+        }
+    }
+
+    private <K, V> org.apache.kafka.streams.processor.ProcessorSupplier<K, V> define(final org.apache.kafka.streams.processor.Processor<K, V> processor) {

Review comment:
       These are just convenience functions for defining ProcessorSuppliers (with and without stores attached). I've duplicated them for the old and new APIs.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
##########
@@ -26,22 +30,57 @@
  * Used by the Join nodes as there are several parameters, this abstraction helps
  * keep the number of arguments more reasonable.
  */
-public class ProcessorParameters<K, V> {
+public class ProcessorParameters<KIn, VIn, KOut, VOut> {
 
-    private final ProcessorSupplier<K, V> processorSupplier;
+    // During the transition to KIP-478, we capture arguments passed from the old API to simplify
+    // the performance of casts that we still need to perform. This will eventually be removed.
+    private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier;
+    private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
+    public ProcessorParameters(final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> processorSupplier,
                                final String processorName) {
+        oldProcessorSupplier = processorSupplier;
+        this.processorSupplier = () -> ProcessorAdapter.adapt(processorSupplier.get());
+        this.processorName = processorName;
+    }

Review comment:
       The constructor for the old API is still present, and when you call it, we save a direct reference as well as adapting it to the new API. saving a direct reference dramatically simplifies the casts we've internalized below. Once everything is converted, we'll go back to just one reference saved.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -851,12 +851,12 @@ boolean sendingOldValueEnabled() {
     }
 
     /**
-     * We conflate V with Change<V> in many places. It might be nice to fix that eventually.
+     * We conflate V with Change<V> in many places. This will get fixed in the implementation of KIP-478.

Review comment:
       Prototyping this KIP was actually how I discovered this conflation to begin with. Once we start converting Processors to the new API, the compiler will make this conflation impossible, and this method will eventually be unused.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -735,58 +766,92 @@ private Topology createAddHeaderTopology() {
     /**
      * A processor that simply forwards all messages to all children.
      */
-    protected static class ForwardingProcessor extends AbstractProcessor<String, String> {
+    protected static class ForwardingProcessor implements Processor<String, String, String, String> {

Review comment:
       Converted to the new API.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
##########
@@ -390,7 +389,20 @@ private Topology setupGlobalStoreTopology(final String... sourceTopicNames) {
                 null,
                 sourceTopicName,
                 sourceTopicName + "-processor",
-                new MockProcessorSupplier()
+                () -> new Processor<Object, Object, Void, Void>() {
+                    KeyValueStore<Object, Object> store;
+
+                    @SuppressWarnings("unchecked")
+                    @Override
+                    public void init(final ProcessorContext<Void, Void> context) {
+                        store = context.getStateStore(sourceTopicName + "-globalStore");
+                    }
+
+                    @Override
+                    public void process(final Object key, final Object value) {
+                        store.put(key, value);
+                    }
+                }

Review comment:
       Shockingly, the old test wasn't testing what it was supposed to test.
   
   It's supposed to check and make sure the processor populates the global store, but it was actually just checking whether the mock captured the processed records. I changed the processor to actually do what it's supposed to, and then changed the assertion below to check what this test is really supposed to be checking.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -658,8 +658,42 @@ public synchronized Topology addSink(final String name,
      */
     @SuppressWarnings("rawtypes")
     public synchronized Topology addProcessor(final String name,
-                                              final ProcessorSupplier supplier,
+                                              final org.apache.kafka.streams.processor.ProcessorSupplier supplier,
                                               final String... parentNames) {
+        return addProcessor(
+            name,
+            new ProcessorSupplier<Object, Object, Object, Object>() {
+                @Override
+                public Set<StoreBuilder<?>> stores() {
+                    return supplier.stores();
+                }
+
+                @Override
+                public org.apache.kafka.streams.processor.api.Processor<Object, Object, Object, Object> get() {
+                    return ProcessorAdapter.adaptRaw(supplier.get());
+                }
+            },
+            parentNames
+        );
+    }
+
+    /**
+     * Add a new processor node that receives and processes records output by one or more parent source or processor
+     * node.
+     * Any new record output by this processor will be forwarded to its child processor or sink nodes.
+     * If {@code supplier} provides stores via {@link ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s
+     * will be added to the topology and connected to this processor automatically.
+     *
+     * @param name the unique name of the processor node
+     * @param supplier the supplier used to obtain this node's {@link Processor} instance
+     * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
+     * and process
+     * @return itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     */
+    public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(final String name,
+                                                                     final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
+                                                                     final String... parentNames) {

Review comment:
       new API

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -127,7 +127,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil
             .orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME);
 
         final KTableSource<K, V> tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName());
-        final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(tableSource, tableSourceName);
+        final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(tableSource, tableSourceName);

Review comment:
       I've also converted ProcessorParameters to the new API, so you'll see a lot of changes like this. Many of them will eventually go away as individual processors are converted (such as, in this case, `KTableSource`).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
##########
@@ -99,5 +99,14 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
             }
         }
 
+        // temporary hack until KIP-478 is fully implemented

Review comment:
       This is kind of horrible, but it'll be gone soon enough.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
##########
@@ -82,14 +81,18 @@
     }
 
     public String queryableStoreName() {
-        return ((KTableKTableJoinMerger<K, VR>) mergeProcessorParameters().processorSupplier()).getQueryableName();
+        return mergeProcessorParameters().kTableKTableJoinMergerProcessorSupplier().getQueryableName();

Review comment:
       An example of an internalized cast.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param storeBuilder          user defined state store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                           final String sourceName,
+                                                           final Deserializer<KIn> keyDeserializer,
+                                                           final Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String processorName,
+                                                           final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(
+            storeBuilder,
+            sourceName,
+            null,
+            keyDeserializer,
+            valueDeserializer,
+            topic,
+            processorName,
+            stateUpdateSupplier
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                           final String sourceName,
+                                                           final TimestampExtractor timestampExtractor,
+                                                           final Deserializer<KIn> keyDeserializer,
+                                                           final Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String processorName,
+                                                           final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {

Review comment:
       new API

##########
File path: streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
##########
@@ -37,8 +38,8 @@
     /**
      * A connected sub-graph of a {@link Topology}.
      * <p>
-     * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String,
-     * org.apache.kafka.streams.processor.ProcessorSupplier, String...) directly} or indirectly via
+     * Nodes of a {@code Subtopology} are connected
+     * {@link Topology#addProcessor(String, ProcessorSupplier, String...) directly} or indirectly via

Review comment:
       referencing the new API

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
##########
@@ -31,6 +31,15 @@
         }
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static <KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut> adaptRaw(final org.apache.kafka.streams.processor.Processor delegate) {

Review comment:
       A minor, convenience adapter to avoid a `rawtypes` warning at the call site.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
##########
@@ -26,22 +30,57 @@
  * Used by the Join nodes as there are several parameters, this abstraction helps
  * keep the number of arguments more reasonable.
  */
-public class ProcessorParameters<K, V> {
+public class ProcessorParameters<KIn, VIn, KOut, VOut> {

Review comment:
       Converted to the new API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
##########
@@ -67,8 +66,8 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
             topologyBuilder.connectProcessorAndStateStores(processorName, storeNames);
         }
 
-        if (processorParameters.processorSupplier() instanceof KTableSource) {
-            if (((KTableSource<?, ?>) processorParameters.processorSupplier()).materialized()) {
+        if (processorParameters.kTableSourceSupplier() != null) {
+            if (processorParameters.kTableSourceSupplier().materialized()) {

Review comment:
       This is the type check that I internalized. Note, it becomes more complicated now that we also have to check whether or not the supplier is an "old API" supplier.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -159,10 +159,11 @@ Cancellable schedule(final long intervalMs,
      * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME}
      * @param callback a function consuming timestamps representing the current stream or system time
      * @return a handle allowing cancellation of the punctuation schedule established by this method
+     * @throws IllegalArgumentException if the interval is not representable in milliseconds
      */
     Cancellable schedule(final Duration interval,
                          final PunctuationType type,
-                         final Punctuator callback) throws IllegalArgumentException;
+                         final Punctuator callback);

Review comment:
       This was a minor API design error. The intent was to document that we could throw the exception, but declaring a runtime exception in the method header doesn't do anything. The right way to do it is to put it in the javadoc.

##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -102,7 +102,7 @@ public void shouldAddGlobalStore() {
                 @SuppressWarnings("unchecked")
                 @Override
                 public void init(final ProcessorContext<Void, Void> context) {
-                    store = (KeyValueStore<String, String>) context.getStateStore("store");
+                    store = context.getStateStore("store");

Review comment:
       Another example of how we don't need to cast anymore.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -102,7 +102,7 @@ void register(final StateStore store,
      * @param name The store name
      * @return The state store instance
      */
-    StateStore getStateStore(final String name);
+    <S extends StateStore> S getStateStore(final String name);

Review comment:
       This is the small extension to the KIP that I mentioned, which means callers no longer have to cast the result to the store interface that they need (eg `KeyValueStore`).

##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -87,12 +88,12 @@ public void shouldNotAllowZeroTopicsWhenAddingSource() {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullNameWhenAddingProcessor() {
-        topology.addProcessor(null, () -> new MockProcessorSupplier<>().get());
+        topology.addProcessor(null, () -> new MockApiProcessorSupplier<>().get());

Review comment:
       I've converted most of the tests to the new API, and just left behind a couple to make sure the delegation works properly.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -856,11 +857,18 @@ public void statelessTopologyShouldNotCreateStateDirectory() throws Exception {
         final String outputTopic = safeTestName + "-output";
         final Topology topology = new Topology();
         topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic)
-                .addProcessor("process", () -> new AbstractProcessor<String, String>() {
+                .addProcessor("process", () -> new Processor<String, String, String, String>() {

Review comment:
       Converting to the new API. I didn't want to convert over the AbstractProcessor, since that would drag in more changes. I also didn't introduce a new abstract class, since the only thing it does is capture the context.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -293,6 +293,37 @@ public void testDrivingConnectedStateStoreTopology() {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testDrivingConnectedStateStoreInDifferentProcessorsTopologyWithOldAPI() {

Review comment:
       Here's an example of a test I left in place to exercise the delegation mechanism (I renamed the "old API" version of the test).

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##########
@@ -1196,33 +1195,25 @@ public void shouldReinitializeRevivedTasksInAnyState() {
         internalTopologyBuilder.addSource(null, "name", null, null, null, topic1);
         final AtomicBoolean shouldThrow = new AtomicBoolean(false);
         final AtomicBoolean processed = new AtomicBoolean(false);
-        internalTopologyBuilder.addProcessor("proc", new ProcessorSupplier<Object, Object>() {
-            @Override
-            public Processor<Object, Object> get() {
-                return new Processor<Object, Object>() {
-                    private ProcessorContext context;
-
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        this.context = context;
-                    }
+        internalTopologyBuilder.addProcessor(
+            "proc",
+            () -> new Processor<Object, Object, Object, Object>() {

Review comment:
       Converted to the new API and cleaned up.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
##########
@@ -996,7 +1003,7 @@ public void shouldPunctuateOnWallClockTime() {
     @Test
     public void shouldReturnAllStores() {
         final Topology topology = setupSourceSinkTopology();
-        topology.addProcessor("processor", () -> null, "source");
+        topology.addProcessor("processor", (ProcessorSupplier<Object, Object, Object, Object>) () -> null, "source");

Review comment:
       Just casting `null` to resolve the right overload.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r485732127



##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param storeBuilder          user defined state store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                           final String sourceName,
+                                                           final Deserializer<KIn> keyDeserializer,
+                                                           final Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String processorName,
+                                                           final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(
+            storeBuilder,
+            sourceName,
+            null,
+            keyDeserializer,
+            valueDeserializer,
+            topic,
+            processorName,
+            stateUpdateSupplier
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                           final String sourceName,
+                                                           final TimestampExtractor timestampExtractor,
+                                                           final Deserializer<KIn> keyDeserializer,
+                                                           final Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String processorName,
+                                                           final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {

Review comment:
       Yeah, this probably would have been a better design. I'm a little hesitant to make this change to the KIP right now, though. Subjectively, it seems more lightweight for users if they don't have to change much of their code to switch over to the new API. Also, maybe I have a little bit of emotional resistance to increasing the scope of this KIP because it's been taking so long to actually make progress on it.
   
   I've filed https://issues.apache.org/jira/browse/KAFKA-10472 to capture the thought, though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#issuecomment-689669494


   Thanks for the review @bbejeck ! I really appreciate your time.
   
   I've responded to your feedback.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r485735152



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
##########
@@ -26,22 +30,57 @@
  * Used by the Join nodes as there are several parameters, this abstraction helps
  * keep the number of arguments more reasonable.
  */
-public class ProcessorParameters<K, V> {
+public class ProcessorParameters<KIn, VIn, KOut, VOut> {
 
-    private final ProcessorSupplier<K, V> processorSupplier;
+    // During the transition to KIP-478, we capture arguments passed from the old API to simplify
+    // the performance of casts that we still need to perform. This will eventually be removed.
+    private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier;
+    private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
+    public ProcessorParameters(final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> processorSupplier,
                                final String processorName) {
+        oldProcessorSupplier = processorSupplier;
+        this.processorSupplier = () -> ProcessorAdapter.adapt(processorSupplier.get());
+        this.processorName = processorName;
+    }
 
+    public ProcessorParameters(final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier,
+                               final String processorName) {
+        oldProcessorSupplier = null;
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
 
-    public ProcessorSupplier<K, V> processorSupplier() {
+    public ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier() {
         return processorSupplier;
     }
 
+    public org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier() {
+        return oldProcessorSupplier;
+    }
+
+    @SuppressWarnings("unchecked")
+    KTableSource<KIn, VIn> kTableSourceSupplier() {
+        // This cast always works because KTableSource hasn't been converted yet.
+        return oldProcessorSupplier == null
+            ? null
+            : !(oldProcessorSupplier instanceof KTableSource)
+              ? null
+              : (KTableSource<KIn, VIn>) oldProcessorSupplier;
+    }

Review comment:
       Thanks; yes, let's revisit it after the dust settles from KIP-478. These methods are for the most part temporary, since it's a real pain to do the cast when you have to deal with the current "dual interface" state in which processors might be old-style or new-style.
   
   I have a feeling I'll be able to eliminate these methods completely when I convert the relevant processors to the new API again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#issuecomment-685985893


   Note: this is part 3 of KIP-478.
   Part 1: #9004
   Part 2: #9148


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

Posted by GitBox <gi...@apache.org>.
bbejeck commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r485754938



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
##########
@@ -99,5 +99,14 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
             }
         }
 
+        // temporary hack until KIP-478 is fully implemented
+        final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> oldProcessorSupplier =
+            processorParameters().oldProcessorSupplier();
+        if (oldProcessorSupplier != null && oldProcessorSupplier.stores() != null) {
+            for (final StoreBuilder<?> storeBuilder : oldProcessorSupplier.stores()) {
+                topologyBuilder.addStateStore(storeBuilder, processorName);
+            }
+        }
+

Review comment:
       That makes sense to me, thanks for the explanation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

Posted by GitBox <gi...@apache.org>.
bbejeck commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r485691111



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
##########
@@ -99,5 +99,14 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
             }
         }
 
+        // temporary hack until KIP-478 is fully implemented
+        final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> oldProcessorSupplier =
+            processorParameters().oldProcessorSupplier();
+        if (oldProcessorSupplier != null && oldProcessorSupplier.stores() != null) {
+            for (final StoreBuilder<?> storeBuilder : oldProcessorSupplier.stores()) {
+                topologyBuilder.addStateStore(storeBuilder, processorName);
+            }
+        }
+

Review comment:
       Could this cause a problem with lines 96-99 above? I could be missing something, but it looks like we could be attempting to add the same stores twice, which _**I think**_  will result in a runtime error building the topology.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param storeBuilder          user defined state store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                           final String sourceName,
+                                                           final Deserializer<KIn> keyDeserializer,
+                                                           final Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String processorName,
+                                                           final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(
+            storeBuilder,
+            sourceName,
+            null,
+            keyDeserializer,
+            valueDeserializer,
+            topic,
+            processorName,
+            stateUpdateSupplier
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                           final String sourceName,
+                                                           final TimestampExtractor timestampExtractor,
+                                                           final Deserializer<KIn> keyDeserializer,
+                                                           final Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String processorName,
+                                                           final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {

Review comment:
       For consideration - maybe provide a builder for use as parameter.  
   Something like 
   ```java
   GlobalStoreBuilder.builder().addSource().addTimestampextractor().addTopic()...
   ```
   Just a thought, whenever I see long parameter lists I tend to look if there's a way to include a builder instead

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
##########
@@ -26,22 +30,57 @@
  * Used by the Join nodes as there are several parameters, this abstraction helps
  * keep the number of arguments more reasonable.
  */
-public class ProcessorParameters<K, V> {
+public class ProcessorParameters<KIn, VIn, KOut, VOut> {
 
-    private final ProcessorSupplier<K, V> processorSupplier;
+    // During the transition to KIP-478, we capture arguments passed from the old API to simplify
+    // the performance of casts that we still need to perform. This will eventually be removed.
+    private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier;
+    private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
+    public ProcessorParameters(final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> processorSupplier,
                                final String processorName) {
+        oldProcessorSupplier = processorSupplier;
+        this.processorSupplier = () -> ProcessorAdapter.adapt(processorSupplier.get());
+        this.processorName = processorName;
+    }
 
+    public ProcessorParameters(final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier,
+                               final String processorName) {
+        oldProcessorSupplier = null;
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
 
-    public ProcessorSupplier<K, V> processorSupplier() {
+    public ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier() {
         return processorSupplier;
     }
 
+    public org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier() {
+        return oldProcessorSupplier;
+    }
+
+    @SuppressWarnings("unchecked")
+    KTableSource<KIn, VIn> kTableSourceSupplier() {
+        // This cast always works because KTableSource hasn't been converted yet.
+        return oldProcessorSupplier == null
+            ? null
+            : !(oldProcessorSupplier instanceof KTableSource)
+              ? null
+              : (KTableSource<KIn, VIn>) oldProcessorSupplier;
+    }

Review comment:
       Just a minor thought here.  I'm wondering if these `ktableX` methods should go in a separate class, I feel like this is "leaks" a little bit. But I don't have a better idea ATM, so maybe we can revisit later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#issuecomment-689703973


   Thanks, @bbejeck ! Just to be sure, I'm merging trunk into this branch and running all the tests before I squash-and-merge.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r485742765



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
##########
@@ -99,5 +99,14 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
             }
         }
 
+        // temporary hack until KIP-478 is fully implemented
+        final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> oldProcessorSupplier =
+            processorParameters().oldProcessorSupplier();
+        if (oldProcessorSupplier != null && oldProcessorSupplier.stores() != null) {
+            for (final StoreBuilder<?> storeBuilder : oldProcessorSupplier.stores()) {
+                topologyBuilder.addStateStore(storeBuilder, processorName);
+            }
+        }
+

Review comment:
       It definitely looks that way, but I've just double-checked, and I think it's safe. The thing is that only a subclass of ProcessorSupplier (either the new or old one) could override the ConnectedStoreProvider#stores method. Unlike Processor and ProcessorContext, we haven't added adapters for ProcessorSupplier that could delegate the `stores` method from the new API to the old one, so only a proper direct instantiation of the new API ProcessorSupplier could return a non-null result from `processorSupplier.stores()` on L96. Likewise, `oldProcessorSupplier` is only non-null itself when the provided processor is _only_ an old-api processorSupplier.
   
   So, it seems like either L96-99 will add stores or L102-109 will (or neither), but never both. Does that reasoning seem legit to you?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org