You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/10/05 16:45:37 UTC

[kafka] branch 2.1 updated: MINOR: Clarify usage of stateful processor node (#5740)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new a682cf4  MINOR: Clarify usage of stateful processor node (#5740)
a682cf4 is described below

commit a682cf42a3307e9da2ed286f2eb7045dc6031b14
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Oct 5 11:44:49 2018 -0500

    MINOR: Clarify usage of stateful processor node (#5740)
    
    In recent PRs, we have been confused about the proper usage of
    StatefulProcessorNode (#5731 , #5737 )
    
    This change disambiguates it.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../internals/GroupedStreamAggregateBuilder.java   | 29 ++++----
 .../kstream/internals/KGroupedTableImpl.java       | 23 ++----
 .../streams/kstream/internals/KStreamImpl.java     |  5 +-
 .../streams/kstream/internals/KTableImpl.java      |  1 -
 .../internals/graph/StatefulProcessorNode.java     | 83 +++++++---------------
 .../internals/graph/GraphGraceSearchUtilTest.java  | 26 +++----
 6 files changed, 55 insertions(+), 112 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 8e6f990..d410bce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -64,12 +64,12 @@ class GroupedStreamAggregateBuilder<K, V> {
         this.userName = groupedInternal.name();
     }
 
-    <KR, T> KTable<KR, T> build(final String functionName,
-                                final StoreBuilder<? extends StateStore> storeBuilder,
-                                final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier,
-                                final boolean isQueryable,
-                                final Serde<KR> keySerde,
-                                final Serde<T> valSerde) {
+    <KR, VR> KTable<KR, VR> build(final String functionName,
+                                  final StoreBuilder<? extends StateStore> storeBuilder,
+                                  final KStreamAggProcessorSupplier<K, KR, V, VR> aggregateSupplier,
+                                  final boolean isQueryable,
+                                  final Serde<KR> keySerde,
+                                  final Serde<VR> valSerde) {
 
         final String aggFunctionName = builder.newProcessorName(functionName);
 
@@ -84,17 +84,14 @@ class GroupedStreamAggregateBuilder<K, V> {
             builder.addGraphNode(parentNode, repartitionNode);
             parentNode = repartitionNode;
         }
-        final StatefulProcessorNode.StatefulProcessorNodeBuilder<K, V> statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder();
 
-        final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName);
-
-        statefulProcessorNodeBuilder
-            .withProcessorParameters(processorParameters)
-            .withNodeName(aggFunctionName)
-            .withRepartitionRequired(repartitionRequired)
-            .withStoreBuilder(storeBuilder);
-
-        final StatefulProcessorNode<K, V> statefulProcessorNode = statefulProcessorNodeBuilder.build();
+        final StatefulProcessorNode<K, V> statefulProcessorNode =
+            new StatefulProcessorNode<>(
+                aggFunctionName,
+                new ProcessorParameters<>(aggregateSupplier, aggFunctionName),
+                storeBuilder,
+                repartitionRequired
+            );
 
         builder.addGraphNode(parentNode, statefulProcessorNode);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 013028d..29a52b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -79,9 +79,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
         // the passed in StreamsGraphNode must be the parent of the repartition node
         builder.addGraphNode(this.streamsGraphNode, repartitionNode);
 
-        final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode(materialized,
-                                                                                     funcName,
-                                                                                     aggregateSupplier);
+        final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>(
+            funcName,
+            new ProcessorParameters<>(aggregateSupplier, funcName),
+            new KeyValueStoreMaterializer<>(materialized).materialize(),
+            false
+        );
 
         // now the repartition node must be the parent of the StateProcessorNode
         builder.addGraphNode(repartitionNode, statefulProcessorNode);
@@ -98,20 +101,6 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
                                 builder);
     }
 
-    private <T> StatefulProcessorNode<K, Change<V>> getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
-                                                                             final String functionName,
-                                                                             final ProcessorSupplier<K, Change<V>> aggregateSupplier) {
-
-        final ProcessorParameters<K, Change<V>> aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName);
-
-        return StatefulProcessorNode
-            .<K, Change<V>>statefulProcessorNodeBuilder()
-            .withNodeName(functionName)
-            .withProcessorParameters(aggregateFunctionProcessorParams)
-            .withStoreBuilder(new KeyValueStoreMaterializer<>(materialized).materialize())
-            .build();
-    }
-
     private GroupedTableOperationRepartitionNode<K, V> createRepartitionNode(final String sinkName,
                                                                              final String sourceName,
                                                                              final String topic) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 49dbbd1..f1a8754 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import java.time.Duration;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -56,6 +55,7 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 
 import java.lang.reflect.Array;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -456,7 +456,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
             name,
             new ProcessorParameters<>(new KStreamTransform<>(transformerSupplier), name),
             stateStoreNames,
-            null,
             true
         );
 
@@ -491,7 +490,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
             name,
             new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name),
             stateStoreNames,
-            null,
             repartitionRequired
         );
 
@@ -513,7 +511,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
             name,
             new ProcessorParameters<>(processorSupplier, name),
             stateStoreNames,
-            null,
             repartitionRequired
         );
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 45f5552..f49d109 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -377,7 +377,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
             name,
             new ProcessorParameters<>(suppressionSupplier, name),
-            null,
             new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName),
             false
         );
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
index 2dc6aad..fe11b26 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
@@ -31,25 +31,44 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
     private final StoreBuilder<? extends StateStore> storeBuilder;
 
 
+    /**
+     * Create a node representing a stateful procssor, where the named store has already been registered.
+     */
     public StatefulProcessorNode(final String nodeName,
                                  final ProcessorParameters<K, V> processorParameters,
                                  final String[] storeNames,
-                                 final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder,
                                  final boolean repartitionRequired) {
         super(nodeName,
-            processorParameters,
-            repartitionRequired);
+              processorParameters,
+              repartitionRequired);
 
         this.storeNames = storeNames;
+        this.storeBuilder = null;
+    }
+
+
+    /**
+     * Create a node representing a stateful procssor,
+     * where the store needs to be built and registered as part of building this node.
+     */
+    public StatefulProcessorNode(final String nodeName,
+                                 final ProcessorParameters<K, V> processorParameters,
+                                 final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder,
+                                 final boolean repartitionRequired) {
+        super(nodeName,
+              processorParameters,
+              repartitionRequired);
+
+        this.storeNames = null;
         this.storeBuilder = materializedKTableStoreBuilder;
     }
 
     @Override
     public String toString() {
         return "StatefulProcessorNode{" +
-               "storeNames=" + Arrays.toString(storeNames) +
-               ", storeBuilder=" + storeBuilder +
-               "} " + super.toString();
+            "storeNames=" + Arrays.toString(storeNames) +
+            ", storeBuilder=" + storeBuilder +
+            "} " + super.toString();
     }
 
     @Override
@@ -68,56 +87,4 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
             topologyBuilder.addStateStore(storeBuilder, processorName);
         }
     }
-
-    public static <K, V> StatefulProcessorNodeBuilder<K, V> statefulProcessorNodeBuilder() {
-        return new StatefulProcessorNodeBuilder<>();
-    }
-
-    public static final class StatefulProcessorNodeBuilder<K, V> {
-
-        private ProcessorParameters<K, V> processorSupplier;
-        private String nodeName;
-        private boolean repartitionRequired;
-        private String[] storeNames;
-        private StoreBuilder<? extends StateStore> storeBuilder;
-
-        private StatefulProcessorNodeBuilder() {
-        }
-
-        public StatefulProcessorNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
-            this.processorSupplier = processorParameters;
-            return this;
-        }
-
-        public StatefulProcessorNodeBuilder<K, V> withNodeName(final String nodeName) {
-            this.nodeName = nodeName;
-            return this;
-        }
-
-        public StatefulProcessorNodeBuilder<K, V> withStoreNames(final String[] storeNames) {
-            this.storeNames = storeNames;
-            return this;
-        }
-
-        public StatefulProcessorNodeBuilder<K, V> withRepartitionRequired(final boolean repartitionRequired) {
-            this.repartitionRequired = repartitionRequired;
-            return this;
-        }
-
-        public StatefulProcessorNodeBuilder<K, V> withStoreBuilder(final StoreBuilder<? extends StateStore> storeBuilder) {
-            this.storeBuilder = storeBuilder;
-            return this;
-        }
-
-        public StatefulProcessorNode<K, V> build() {
-            return new StatefulProcessorNode<>(
-                nodeName,
-                processorSupplier,
-                storeNames,
-                storeBuilder,
-                repartitionRequired
-            );
-
-        }
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 20ce3ff..5e426d9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.junit.Test;
 
 import static java.time.Duration.ofMillis;
@@ -61,8 +63,7 @@ public class GraphGraceSearchUtilTest {
                 },
                 "dummy"
             ),
-            null,
-            null,
+            (StoreBuilder<? extends StateStore>) null,
             false
         );
 
@@ -91,8 +92,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            null,
-            null,
+            (StoreBuilder<? extends StateStore>) null,
             false
         );
 
@@ -116,8 +116,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            null,
-            null,
+            (StoreBuilder<? extends StateStore>) null,
             false
         );
 
@@ -133,8 +132,7 @@ public class GraphGraceSearchUtilTest {
             new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>(
                 windows, "asdf", null, null, null
             ), "asdf"),
-            null,
-            null,
+            (StoreBuilder<? extends StateStore>) null,
             false
         );
 
@@ -153,8 +151,7 @@ public class GraphGraceSearchUtilTest {
                 },
                 "dummy"
             ),
-            null,
-            null,
+            (StoreBuilder<? extends StateStore>) null,
             false
         );
         graceGrandparent.addChild(statefulParent);
@@ -181,8 +178,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            null,
-            null,
+            (StoreBuilder<? extends StateStore>) null,
             false
         );
 
@@ -210,8 +206,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            null,
-            null,
+            (StoreBuilder<? extends StateStore>) null,
             false
         );
 
@@ -226,8 +221,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            null,
-            null,
+            (StoreBuilder<? extends StateStore>) null,
             false
         );