You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/10/05 16:45:37 UTC
[kafka] branch 2.1 updated: MINOR: Clarify usage of stateful
processor node (#5740)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new a682cf4 MINOR: Clarify usage of stateful processor node (#5740)
a682cf4 is described below
commit a682cf42a3307e9da2ed286f2eb7045dc6031b14
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Oct 5 11:44:49 2018 -0500
MINOR: Clarify usage of stateful processor node (#5740)
In recent PRs, we have been confused about the proper usage of
StatefulProcessorNode (#5731 , #5737 )
This change disambiguates it.
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../internals/GroupedStreamAggregateBuilder.java | 29 ++++----
.../kstream/internals/KGroupedTableImpl.java | 23 ++----
.../streams/kstream/internals/KStreamImpl.java | 5 +-
.../streams/kstream/internals/KTableImpl.java | 1 -
.../internals/graph/StatefulProcessorNode.java | 83 +++++++---------------
.../internals/graph/GraphGraceSearchUtilTest.java | 26 +++----
6 files changed, 55 insertions(+), 112 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 8e6f990..d410bce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -64,12 +64,12 @@ class GroupedStreamAggregateBuilder<K, V> {
this.userName = groupedInternal.name();
}
- <KR, T> KTable<KR, T> build(final String functionName,
- final StoreBuilder<? extends StateStore> storeBuilder,
- final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier,
- final boolean isQueryable,
- final Serde<KR> keySerde,
- final Serde<T> valSerde) {
+ <KR, VR> KTable<KR, VR> build(final String functionName,
+ final StoreBuilder<? extends StateStore> storeBuilder,
+ final KStreamAggProcessorSupplier<K, KR, V, VR> aggregateSupplier,
+ final boolean isQueryable,
+ final Serde<KR> keySerde,
+ final Serde<VR> valSerde) {
final String aggFunctionName = builder.newProcessorName(functionName);
@@ -84,17 +84,14 @@ class GroupedStreamAggregateBuilder<K, V> {
builder.addGraphNode(parentNode, repartitionNode);
parentNode = repartitionNode;
}
- final StatefulProcessorNode.StatefulProcessorNodeBuilder<K, V> statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder();
- final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName);
-
- statefulProcessorNodeBuilder
- .withProcessorParameters(processorParameters)
- .withNodeName(aggFunctionName)
- .withRepartitionRequired(repartitionRequired)
- .withStoreBuilder(storeBuilder);
-
- final StatefulProcessorNode<K, V> statefulProcessorNode = statefulProcessorNodeBuilder.build();
+ final StatefulProcessorNode<K, V> statefulProcessorNode =
+ new StatefulProcessorNode<>(
+ aggFunctionName,
+ new ProcessorParameters<>(aggregateSupplier, aggFunctionName),
+ storeBuilder,
+ repartitionRequired
+ );
builder.addGraphNode(parentNode, statefulProcessorNode);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 013028d..29a52b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -79,9 +79,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
// the passed in StreamsGraphNode must be the parent of the repartition node
builder.addGraphNode(this.streamsGraphNode, repartitionNode);
- final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode(materialized,
- funcName,
- aggregateSupplier);
+ final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>(
+ funcName,
+ new ProcessorParameters<>(aggregateSupplier, funcName),
+ new KeyValueStoreMaterializer<>(materialized).materialize(),
+ false
+ );
// now the repartition node must be the parent of the StateProcessorNode
builder.addGraphNode(repartitionNode, statefulProcessorNode);
@@ -98,20 +101,6 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
builder);
}
- private <T> StatefulProcessorNode<K, Change<V>> getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
- final String functionName,
- final ProcessorSupplier<K, Change<V>> aggregateSupplier) {
-
- final ProcessorParameters<K, Change<V>> aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName);
-
- return StatefulProcessorNode
- .<K, Change<V>>statefulProcessorNodeBuilder()
- .withNodeName(functionName)
- .withProcessorParameters(aggregateFunctionProcessorParams)
- .withStoreBuilder(new KeyValueStoreMaterializer<>(materialized).materialize())
- .build();
- }
-
private GroupedTableOperationRepartitionNode<K, V> createRepartitionNode(final String sinkName,
final String sourceName,
final String topic) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 49dbbd1..f1a8754 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import java.time.Duration;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
@@ -56,6 +55,7 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import java.lang.reflect.Array;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -456,7 +456,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
name,
new ProcessorParameters<>(new KStreamTransform<>(transformerSupplier), name),
stateStoreNames,
- null,
true
);
@@ -491,7 +490,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
name,
new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name),
stateStoreNames,
- null,
repartitionRequired
);
@@ -513,7 +511,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
name,
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames,
- null,
repartitionRequired
);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 45f5552..f49d109 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -377,7 +377,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
- null,
new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName),
false
);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
index 2dc6aad..fe11b26 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
@@ -31,25 +31,44 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
private final StoreBuilder<? extends StateStore> storeBuilder;
+ /**
+ * Create a node representing a stateful procssor, where the named store has already been registered.
+ */
public StatefulProcessorNode(final String nodeName,
final ProcessorParameters<K, V> processorParameters,
final String[] storeNames,
- final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder,
final boolean repartitionRequired) {
super(nodeName,
- processorParameters,
- repartitionRequired);
+ processorParameters,
+ repartitionRequired);
this.storeNames = storeNames;
+ this.storeBuilder = null;
+ }
+
+
+ /**
+ * Create a node representing a stateful procssor,
+ * where the store needs to be built and registered as part of building this node.
+ */
+ public StatefulProcessorNode(final String nodeName,
+ final ProcessorParameters<K, V> processorParameters,
+ final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder,
+ final boolean repartitionRequired) {
+ super(nodeName,
+ processorParameters,
+ repartitionRequired);
+
+ this.storeNames = null;
this.storeBuilder = materializedKTableStoreBuilder;
}
@Override
public String toString() {
return "StatefulProcessorNode{" +
- "storeNames=" + Arrays.toString(storeNames) +
- ", storeBuilder=" + storeBuilder +
- "} " + super.toString();
+ "storeNames=" + Arrays.toString(storeNames) +
+ ", storeBuilder=" + storeBuilder +
+ "} " + super.toString();
}
@Override
@@ -68,56 +87,4 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}
-
- public static <K, V> StatefulProcessorNodeBuilder<K, V> statefulProcessorNodeBuilder() {
- return new StatefulProcessorNodeBuilder<>();
- }
-
- public static final class StatefulProcessorNodeBuilder<K, V> {
-
- private ProcessorParameters<K, V> processorSupplier;
- private String nodeName;
- private boolean repartitionRequired;
- private String[] storeNames;
- private StoreBuilder<? extends StateStore> storeBuilder;
-
- private StatefulProcessorNodeBuilder() {
- }
-
- public StatefulProcessorNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
- this.processorSupplier = processorParameters;
- return this;
- }
-
- public StatefulProcessorNodeBuilder<K, V> withNodeName(final String nodeName) {
- this.nodeName = nodeName;
- return this;
- }
-
- public StatefulProcessorNodeBuilder<K, V> withStoreNames(final String[] storeNames) {
- this.storeNames = storeNames;
- return this;
- }
-
- public StatefulProcessorNodeBuilder<K, V> withRepartitionRequired(final boolean repartitionRequired) {
- this.repartitionRequired = repartitionRequired;
- return this;
- }
-
- public StatefulProcessorNodeBuilder<K, V> withStoreBuilder(final StoreBuilder<? extends StateStore> storeBuilder) {
- this.storeBuilder = storeBuilder;
- return this;
- }
-
- public StatefulProcessorNode<K, V> build() {
- return new StatefulProcessorNode<>(
- nodeName,
- processorSupplier,
- storeNames,
- storeBuilder,
- repartitionRequired
- );
-
- }
- }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 20ce3ff..5e426d9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.junit.Test;
import static java.time.Duration.ofMillis;
@@ -61,8 +63,7 @@ public class GraphGraceSearchUtilTest {
},
"dummy"
),
- null,
- null,
+ (StoreBuilder<? extends StateStore>) null,
false
);
@@ -91,8 +92,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
- null,
- null,
+ (StoreBuilder<? extends StateStore>) null,
false
);
@@ -116,8 +116,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
- null,
- null,
+ (StoreBuilder<? extends StateStore>) null,
false
);
@@ -133,8 +132,7 @@ public class GraphGraceSearchUtilTest {
new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>(
windows, "asdf", null, null, null
), "asdf"),
- null,
- null,
+ (StoreBuilder<? extends StateStore>) null,
false
);
@@ -153,8 +151,7 @@ public class GraphGraceSearchUtilTest {
},
"dummy"
),
- null,
- null,
+ (StoreBuilder<? extends StateStore>) null,
false
);
graceGrandparent.addChild(statefulParent);
@@ -181,8 +178,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
- null,
- null,
+ (StoreBuilder<? extends StateStore>) null,
false
);
@@ -210,8 +206,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
- null,
- null,
+ (StoreBuilder<? extends StateStore>) null,
false
);
@@ -226,8 +221,7 @@ public class GraphGraceSearchUtilTest {
),
"asdf"
),
- null,
- null,
+ (StoreBuilder<? extends StateStore>) null,
false
);