You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/09 15:22:17 UTC

[GitHub] [kafka] bbejeck commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

bbejeck commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r485691111



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
##########
@@ -99,5 +99,14 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
             }
         }
 
+        // temporary hack until KIP-478 is fully implemented
+        final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> oldProcessorSupplier =
+            processorParameters().oldProcessorSupplier();
+        if (oldProcessorSupplier != null && oldProcessorSupplier.stores() != null) {
+            for (final StoreBuilder<?> storeBuilder : oldProcessorSupplier.stores()) {
+                topologyBuilder.addStateStore(storeBuilder, processorName);
+            }
+        }
+

Review comment:
       Could this cause a problem with lines 96-99 above? I could be missing something, but it looks like we could be attempting to add the same stores twice, which _**I think**_  will result in a runtime error building the topology.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param storeBuilder          user defined state store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                           final String sourceName,
+                                                           final Deserializer<KIn> keyDeserializer,
+                                                           final Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String processorName,
+                                                           final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(
+            storeBuilder,
+            sourceName,
+            null,
+            keyDeserializer,
+            valueDeserializer,
+            topic,
+            processorName,
+            stateUpdateSupplier
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                           final String sourceName,
+                                                           final TimestampExtractor timestampExtractor,
+                                                           final Deserializer<KIn> keyDeserializer,
+                                                           final Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String processorName,
+                                                           final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {

Review comment:
       For consideration - maybe provide a builder for use as parameter.  
   Something like 
   ```java
   GlobalStoreBuilder.builder().addSource().addTimestampextractor().addTopic()...
   ```
   Just a thought, whenever I see long parameter lists I tend to look if there's a way to include a builder instead

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
##########
@@ -26,22 +30,57 @@
  * Used by the Join nodes as there are several parameters, this abstraction helps
  * keep the number of arguments more reasonable.
  */
-public class ProcessorParameters<K, V> {
+public class ProcessorParameters<KIn, VIn, KOut, VOut> {
 
-    private final ProcessorSupplier<K, V> processorSupplier;
+    // During the transition to KIP-478, we capture arguments passed from the old API to simplify
+    // the performance of casts that we still need to perform. This will eventually be removed.
+    private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier;
+    private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
+    public ProcessorParameters(final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> processorSupplier,
                                final String processorName) {
+        oldProcessorSupplier = processorSupplier;
+        this.processorSupplier = () -> ProcessorAdapter.adapt(processorSupplier.get());
+        this.processorName = processorName;
+    }
 
+    public ProcessorParameters(final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier,
+                               final String processorName) {
+        oldProcessorSupplier = null;
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
 
-    public ProcessorSupplier<K, V> processorSupplier() {
+    public ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier() {
         return processorSupplier;
     }
 
+    public org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier() {
+        return oldProcessorSupplier;
+    }
+
+    @SuppressWarnings("unchecked")
+    KTableSource<KIn, VIn> kTableSourceSupplier() {
+        // This cast always works because KTableSource hasn't been converted yet.
+        return oldProcessorSupplier == null
+            ? null
+            : !(oldProcessorSupplier instanceof KTableSource)
+              ? null
+              : (KTableSource<KIn, VIn>) oldProcessorSupplier;
+    }

Review comment:
       Just a minor thought here.  I'm wondering if these `ktableX` methods should go in a separate class, I feel like this is "leaks" a little bit. But I don't have a better idea ATM, so maybe we can revisit later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org