You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/04/09 19:50:56 UTC
[kafka] branch trunk updated: KAFKA-8410: Migrate KStream Stateless
operators to new Processor API (#10381)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c9cab2b KAFKA-8410: Migrate KStream Stateless operators to new Processor API (#10381)
c9cab2b is described below
commit c9cab2beb8159ca83e6e2c930514ba97282e1ae9
Author: Jorge Esteban Quilcate Otoya <qu...@gmail.com>
AuthorDate: Fri Apr 9 20:49:54 2021 +0100
KAFKA-8410: Migrate KStream Stateless operators to new Processor API (#10381)
Migrate KStream stateless operators to new Processor API.
Following PRs will complete migration of KStream stateful operators and KTable.
No expected functionality changes.
Reviewers: John Roesler <vv...@apache.org>
---
.../PrintedInternal.java => ForeachProcessor.java} | 22 +++++-----
.../streams/kstream/internals/KStreamBranch.java | 28 ++++++-------
.../streams/kstream/internals/KStreamFilter.java | 19 ++++-----
.../streams/kstream/internals/KStreamFlatMap.java | 25 +++++++-----
.../kstream/internals/KStreamFlatMapValues.java | 25 ++++++------
.../streams/kstream/internals/KStreamImpl.java | 5 ++-
.../streams/kstream/internals/KStreamMap.java | 25 +++++++-----
.../kstream/internals/KStreamMapValues.java | 23 ++++++-----
.../streams/kstream/internals/KStreamPeek.java | 25 +++++-------
.../streams/kstream/internals/KStreamPrint.java | 18 ++++-----
.../streams/kstream/internals/KTableImpl.java | 2 +-
.../streams/kstream/internals/PassThrough.java | 17 ++++----
.../streams/kstream/internals/PrintedInternal.java | 4 +-
.../streams/processor/api/ContextualProcessor.java | 47 ++++++++++++++++++++++
.../apache/kafka/streams/kstream/PrintedTest.java | 34 +++++++---------
.../kstream/internals/KStreamBranchTest.java | 4 +-
.../kstream/internals/KStreamPrintTest.java | 13 +++---
17 files changed, 194 insertions(+), 142 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java
similarity index 59%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java
index 546e353..cccd298 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java
@@ -14,21 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.kstream.internals;
+package org.apache.kafka.streams.kstream;
-import org.apache.kafka.streams.kstream.Printed;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
-public class PrintedInternal<K, V> extends Printed<K, V> {
- public PrintedInternal(final Printed<K, V> printed) {
- super(printed);
- }
+public class ForeachProcessor<K, V> implements Processor<K, V, Void, Void> {
+
+ private final ForeachAction<K, V> action;
- public ProcessorSupplier<K, V> build(final String processorName) {
- return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName));
+ public ForeachProcessor(final ForeachAction<K, V> action) {
+ this.action = action;
}
- public String name() {
- return processorName;
+ @Override
+ public void process(final Record<K, V> record) {
+ action.apply(record.key(), record.value());
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
index 21b69f2..2d3fc76 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
@@ -16,44 +16,44 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.To;
-
import java.util.List;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
-class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {
+class KStreamBranch<K, V> implements ProcessorSupplier<K, V, K, V> {
private final List<Predicate<? super K, ? super V>> predicates;
private final List<String> childNodes;
KStreamBranch(final List<Predicate<? super K, ? super V>> predicates,
- final List<String> childNodes) {
+ final List<String> childNodes) {
this.predicates = predicates;
this.childNodes = childNodes;
}
@Override
- public Processor<K, V> get() {
+ public Processor<K, V, K, V> get() {
return new KStreamBranchProcessor();
}
- private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
+ private class KStreamBranchProcessor extends ContextualProcessor<K, V, K, V> {
+
@Override
- public void process(final K key, final V value) {
+ public void process(final Record<K, V> record) {
for (int i = 0; i < predicates.size(); i++) {
- if (predicates.get(i).test(key, value)) {
+ if (predicates.get(i).test(record.key(), record.value())) {
// use forward with child here and then break the loop
// so that no record is going to be piped to multiple streams
- context().forward(key, value, To.child(childNodes.get(i)));
+ context().forward(record, childNodes.get(i));
return;
}
}
// using default child node if supplied
if (childNodes.size() > predicates.size()) {
- context().forward(key, value, To.child(childNodes.get(predicates.size())));
+ context().forward(record, childNodes.get(predicates.size()));
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
index ac03c18..ffafd10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
@@ -16,12 +16,13 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
-class KStreamFilter<K, V> implements ProcessorSupplier<K, V> {
+class KStreamFilter<K, V> implements ProcessorSupplier<K, V, K, V> {
private final Predicate<K, V> predicate;
private final boolean filterNot;
@@ -32,15 +33,15 @@ class KStreamFilter<K, V> implements ProcessorSupplier<K, V> {
}
@Override
- public Processor<K, V> get() {
+ public Processor<K, V, K, V> get() {
return new KStreamFilterProcessor();
}
- private class KStreamFilterProcessor extends AbstractProcessor<K, V> {
+ private class KStreamFilterProcessor extends ContextualProcessor<K, V, K, V> {
@Override
- public void process(final K key, final V value) {
- if (filterNot ^ predicate.test(key, value)) {
- context().forward(key, value);
+ public void process(final Record<K, V> record) {
+ if (filterNot ^ predicate.test(record.key(), record.value())) {
+ context().forward(record);
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
index e20ec90..b731a5c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -18,28 +18,31 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
-class KStreamFlatMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
+class KStreamFlatMap<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
- private final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper;
+ private final KeyValueMapper<? super KIn, ? super VIn, ? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper;
- KStreamFlatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
+ KStreamFlatMap(final KeyValueMapper<? super KIn, ? super VIn, ? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper) {
this.mapper = mapper;
}
@Override
- public Processor<K, V> get() {
+ public Processor<KIn, VIn, KOut, VOut> get() {
return new KStreamFlatMapProcessor();
}
- private class KStreamFlatMapProcessor extends AbstractProcessor<K, V> {
+ private class KStreamFlatMapProcessor extends ContextualProcessor<KIn, VIn, KOut, VOut> {
@Override
- public void process(final K key, final V value) {
- for (final KeyValue<? extends K1, ? extends V1> newPair : mapper.apply(key, value)) {
- context().forward(newPair.key, newPair.value);
+ public void process(final Record<KIn, VIn> record) {
+ final Iterable<? extends KeyValue<? extends KOut, ? extends VOut>> newKeyValues =
+ mapper.apply(record.key(), record.value());
+ for (final KeyValue<? extends KOut, ? extends VOut> newPair : newKeyValues) {
+ context().forward(record.withKey(newPair.key).withValue(newPair.value));
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
index fedfe393..1008b29 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
@@ -17,29 +17,30 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
-class KStreamFlatMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
+class KStreamFlatMapValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
- private final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends V1>> mapper;
+ private final ValueMapperWithKey<? super KIn, ? super VIn, ? extends Iterable<? extends VOut>> mapper;
- KStreamFlatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends V1>> mapper) {
+ KStreamFlatMapValues(final ValueMapperWithKey<? super KIn, ? super VIn, ? extends Iterable<? extends VOut>> mapper) {
this.mapper = mapper;
}
@Override
- public Processor<K, V> get() {
+ public Processor<KIn, VIn, KIn, VOut> get() {
return new KStreamFlatMapValuesProcessor();
}
- private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K, V> {
+ private class KStreamFlatMapValuesProcessor extends ContextualProcessor<KIn, VIn, KIn, VOut> {
@Override
- public void process(final K key, final V value) {
- final Iterable<? extends V1> newValues = mapper.apply(key, value);
- for (final V1 v : newValues) {
- context().forward(key, v);
+ public void process(final Record<KIn, VIn> record) {
+ final Iterable<? extends VOut> newValues = mapper.apply(record.key(), record.value());
+ for (final VOut v : newValues) {
+ context().forward(record.withValue(v));
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bacddc5..c013bd6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -61,6 +61,7 @@ import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
+import org.apache.kafka.streams.kstream.ForeachProcessor;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -406,7 +407,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FOREACH_NAME);
final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters =
- new ProcessorParameters<>(new KStreamPeek<>(action, false), name);
+ new ProcessorParameters<>(() -> new ForeachProcessor<>(action), name);
final ProcessorGraphNode<? super K, ? super V> foreachNode =
new ProcessorGraphNode<>(name, processorParameters);
@@ -426,7 +427,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, PEEK_NAME);
final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters =
- new ProcessorParameters<>(new KStreamPeek<>(action, true), name);
+ new ProcessorParameters<>(new KStreamPeek<>(action), name);
final ProcessorGraphNode<? super K, ? super V> peekNode =
new ProcessorGraphNode<>(name, processorParameters);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
index 8179ca8..0fec716 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -16,30 +16,33 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
-class KStreamMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
+class KStreamMap<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
- private final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper;
+ private final KeyValueMapper<? super KIn, ? super VIn, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper;
- public KStreamMap(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
+ public KStreamMap(final KeyValueMapper<? super KIn, ? super VIn, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper) {
this.mapper = mapper;
}
@Override
- public Processor<K, V> get() {
+ public Processor<KIn, VIn, KOut, VOut> get() {
return new KStreamMapProcessor();
}
- private class KStreamMapProcessor extends AbstractProcessor<K, V> {
+ private class KStreamMapProcessor extends ContextualProcessor<KIn, VIn, KOut, VOut> {
+
@Override
- public void process(final K key, final V value) {
- final KeyValue<? extends K1, ? extends V1> newPair = mapper.apply(key, value);
- context().forward(newPair.key, newPair.value);
+ public void process(final Record<KIn, VIn> record) {
+ final KeyValue<? extends KOut, ? extends VOut> newPair =
+ mapper.apply(record.key(), record.value());
+ context().forward(record.withKey(newPair.key).withValue(newPair.value));
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
index 28c120e..f73bfdd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
@@ -17,28 +17,29 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
-class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
+class KStreamMapValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
- private final ValueMapperWithKey<K, V, V1> mapper;
+ private final ValueMapperWithKey<KIn, VIn, VOut> mapper;
- public KStreamMapValues(final ValueMapperWithKey<K, V, V1> mapper) {
+ public KStreamMapValues(final ValueMapperWithKey<KIn, VIn, VOut> mapper) {
this.mapper = mapper;
}
@Override
- public Processor<K, V> get() {
+ public Processor<KIn, VIn, KIn, VOut> get() {
return new KStreamMapProcessor();
}
- private class KStreamMapProcessor extends AbstractProcessor<K, V> {
+ private class KStreamMapProcessor extends ContextualProcessor<KIn, VIn, KIn, VOut> {
@Override
- public void process(final K readOnlyKey, final V value) {
- final V1 newValue = mapper.apply(readOnlyKey, value);
- context().forward(readOnlyKey, newValue);
+ public void process(final Record<KIn, VIn> record) {
+ final VOut newValue = mapper.apply(record.key(), record.value());
+ context().forward(record.withValue(newValue));
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
index 44d1d60..69b5e7f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
@@ -17,32 +17,29 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
-class KStreamPeek<K, V> implements ProcessorSupplier<K, V> {
+class KStreamPeek<K, V> implements ProcessorSupplier<K, V, K, V> {
- private final boolean forwardDownStream;
private final ForeachAction<K, V> action;
- public KStreamPeek(final ForeachAction<K, V> action, final boolean forwardDownStream) {
+ public KStreamPeek(final ForeachAction<K, V> action) {
this.action = action;
- this.forwardDownStream = forwardDownStream;
}
@Override
- public Processor<K, V> get() {
+ public Processor<K, V, K, V> get() {
return new KStreamPeekProcessor();
}
- private class KStreamPeekProcessor extends AbstractProcessor<K, V> {
+ private class KStreamPeekProcessor extends ContextualProcessor<K, V, K, V> {
@Override
- public void process(final K key, final V value) {
- action.apply(key, value);
- if (forwardDownStream) {
- context().forward(key, value);
- }
+ public void process(final Record<K, V> record) {
+ action.apply(record.key(), record.value());
+ context().forward(record);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
index 8ce698a..a04662c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
@@ -17,28 +17,28 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
-public class KStreamPrint<K, V> implements ProcessorSupplier<K, V> {
+public class KStreamPrint<K, V> implements ProcessorSupplier<K, V, Void, Void> {
private final ForeachAction<K, V> action;
-
+
public KStreamPrint(final ForeachAction<K, V> action) {
this.action = action;
}
@Override
- public Processor<K, V> get() {
+ public Processor<K, V, Void, Void> get() {
return new KStreamPrintProcessor();
}
- private class KStreamPrintProcessor extends AbstractProcessor<K, V> {
+ private class KStreamPrintProcessor implements Processor<K, V, Void, Void> {
@Override
- public void process(final K key, final V value) {
- action.apply(key, value);
+ public void process(final Record<K, V> record) {
+ action.apply(record.key(), record.value());
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 98aee13..52f7b5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -484,7 +484,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
Objects.requireNonNull(named, "named can't be null");
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, TOSTREAM_NAME);
- final ProcessorSupplier<K, Change<V>> kStreamMapValues = new KStreamMapValues<>((key, change) -> change.newValue);
+ final KStreamMapValues<K, Change<V>, V> kStreamMapValues = new KStreamMapValues<>((key, change) -> change.newValue);
final ProcessorParameters<K, V, ?, ?> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
new ProcessorParameters<>(kStreamMapValues, name)
);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java
index b83b3a4..f357a46 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java
@@ -16,21 +16,22 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
-class PassThrough<K, V> implements ProcessorSupplier<K, V> {
+class PassThrough<K, V> implements ProcessorSupplier<K, V, K, V> {
@Override
- public Processor<K, V> get() {
+ public Processor<K, V, K, V> get() {
return new PassThroughProcessor<>();
}
- private static final class PassThroughProcessor<K, V> extends AbstractProcessor<K, V> {
+ private static final class PassThroughProcessor<K, V> extends ContextualProcessor<K, V, K, V> {
@Override
- public void process(final K key, final V value) {
- context().forward(key, value);
+ public void process(final Record<K, V> record) {
+ context().forward(record);
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
index 546e353..0cd1760 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
@@ -17,14 +17,14 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Printed;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
public class PrintedInternal<K, V> extends Printed<K, V> {
public PrintedInternal(final Printed<K, V> printed) {
super(printed);
}
- public ProcessorSupplier<K, V> build(final String processorName) {
+ public ProcessorSupplier<K, V, Void, Void> build(final String processorName) {
return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java
new file mode 100644
index 0000000..d2522e3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+/**
+ * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op
+ * implementation of {@link #close()}.
+ *
+ * @param <KIn> the type of input keys
+ * @param <VIn> the type of input values
+ * @param <KOut> the type of output keys
+ * @param <VOut> the type of output values
+ */
+public abstract class ContextualProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
+
+ protected ProcessorContext<KOut, VOut> context;
+
+ protected ContextualProcessor() {}
+
+ @Override
+ public void init(final ProcessorContext<KOut, VOut> context) {
+ this.context = context;
+ }
+
+ /**
+ * Get the processor's context set during {@link #init(ProcessorContext) initialization}.
+ *
+ * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}.
+ */
+ protected final ProcessorContext<KOut, VOut> context() {
+ return context;
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
index a56dbba..212074f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
@@ -19,8 +19,9 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.internals.PrintedInternal;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -59,11 +60,11 @@ public class PrintedTest {
@Test
public void shouldCreateProcessorThatPrintsToFile() throws IOException {
final File file = TestUtils.tempFile();
- final ProcessorSupplier<String, Integer> processorSupplier = new PrintedInternal<>(
+ final ProcessorSupplier<String, Integer, Void, Void> processorSupplier = new PrintedInternal<>(
Printed.<String, Integer>toFile(file.getPath()))
.build("processor");
- final Processor<String, Integer> processor = processorSupplier.get();
- processor.process("hi", 1);
+ final Processor<String, Integer, Void, Void> processor = processorSupplier.get();
+ processor.process(new Record<>("hi", 1, 0L));
processor.close();
try (final InputStream stream = Files.newInputStream(file.toPath())) {
final byte[] data = new byte[stream.available()];
@@ -74,36 +75,31 @@ public class PrintedTest {
@Test
public void shouldCreateProcessorThatPrintsToStdOut() throws UnsupportedEncodingException {
- final ProcessorSupplier<String, Integer> supplier = new PrintedInternal<>(sysOutPrinter).build("processor");
- final Processor<String, Integer> processor = supplier.get();
+ final ProcessorSupplier<String, Integer, Void, Void> supplier = new PrintedInternal<>(sysOutPrinter).build("processor");
+ final Processor<String, Integer, Void, Void> processor = supplier.get();
- processor.process("good", 2);
+ processor.process(new Record<>("good", 2, 0L));
processor.close();
assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: good, 2\n"));
}
@Test
public void shouldPrintWithLabel() throws UnsupportedEncodingException {
- final Processor<String, Integer> processor = new PrintedInternal<>(sysOutPrinter.withLabel("label"))
+ final Processor<String, Integer, Void, Void> processor = new PrintedInternal<>(sysOutPrinter.withLabel("label"))
.build("processor")
.get();
- processor.process("hello", 3);
+ processor.process(new Record<>("hello", 3, 0L));
processor.close();
assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[label]: hello, 3\n"));
}
@Test
public void shouldPrintWithKeyValueMapper() throws UnsupportedEncodingException {
- final Processor<String, Integer> processor = new PrintedInternal<>(sysOutPrinter.withKeyValueMapper(
- new KeyValueMapper<String, Integer, String>() {
- @Override
- public String apply(final String key, final Integer value) {
- return String.format("%s -> %d", key, value);
- }
- })).build("processor")
- .get();
- processor.process("hello", 1);
+ final Processor<String, Integer, Void, Void> processor = new PrintedInternal<>(
+ sysOutPrinter.withKeyValueMapper((key, value) -> String.format("%s -> %d", key, value))
+ ).build("processor").get();
+ processor.process(new Record<>("hello", 1, 0L));
processor.close();
assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: hello -> 1\n"));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index e33186e..1813522 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -60,8 +60,8 @@ public class KStreamBranchTest {
assertEquals(3, branches.length);
final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
- for (int i = 0; i < branches.length; i++) {
- branches[i].process(supplier);
+ for (final KStream<Integer, String> branch : branches) {
+ branch.process(supplier);
}
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index 9906556..2915a11 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -17,8 +17,9 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
@@ -33,7 +34,7 @@ import static org.junit.Assert.assertEquals;
public class KStreamPrintTest {
private ByteArrayOutputStream byteOutStream;
- private Processor<Integer, String> printProcessor;
+ private Processor<Integer, String, Void, Void> printProcessor;
@Before
public void setUp() {
@@ -45,14 +46,13 @@ public class KStreamPrintTest {
"test-stream"));
printProcessor = kStreamPrint.get();
- final ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class);
+ final ProcessorContext<Void, Void> processorContext = EasyMock.createNiceMock(ProcessorContext.class);
EasyMock.replay(processorContext);
printProcessor.init(processorContext);
}
@Test
- @SuppressWarnings("unchecked")
public void testPrintStreamWithProvidedKeyValueMapper() {
final List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
new KeyValue<>(0, "zero"),
@@ -67,7 +67,8 @@ public class KStreamPrintTest {
"[test-stream]: 3, three"};
for (final KeyValue<Integer, String> record: inputRecords) {
- printProcessor.process(record.key, record.value);
+ final Record<Integer, String> r = new Record<>(record.key, record.value, 0L);
+ printProcessor.process(r);
}
printProcessor.close();