You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2022/06/29 01:39:56 UTC
[kafka] branch trunk updated: KAFKA-13821: Update Kafka Streams WordCount demo to new Processor API (#12139)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6ac7f4ea8f KAFKA-13821: Update Kafka Streams WordCount demo to new Processor API (#12139)
6ac7f4ea8f is described below
commit 6ac7f4ea8f1dafdf644fcfb869fbf9f04238786e
Author: CHUN-HAO TANG <ta...@gmail.com>
AuthorDate: Wed Jun 29 09:39:32 2022 +0800
KAFKA-13821: Update Kafka Streams WordCount demo to new Processor API (#12139)
https://issues.apache.org/jira/browse/KAFKA-13821
Reviewers: Jorge Esteban Quilcate Otoya <qu...@gmail.com>, Bill Bejeck <bb...@apache.org>
---
.../wordcount/WordCountTransformerDemo.java | 27 +++++++++++-----------
.../wordcount/WordCountTransformerTest.java | 15 ++++++------
2 files changed, 21 insertions(+), 21 deletions(-)
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
index c7ac87f046..90f4764be7 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
@@ -25,8 +25,11 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -63,15 +66,15 @@ import java.util.concurrent.CountDownLatch;
*/
public final class WordCountTransformerDemo {
- static class MyTransformerSupplier implements TransformerSupplier<String, String, KeyValue<String, String>> {
+ static class MyProcessorSupplier implements ProcessorSupplier<String, String, String, String> {
@Override
- public Transformer<String, String, KeyValue<String, String>> get() {
- return new Transformer<String, String, KeyValue<String, String>>() {
+ public Processor<String, String, String, String> get() {
+ return new Processor<String, String, String, String>() {
private KeyValueStore<String, Integer> kvStore;
@Override
- public void init(final ProcessorContext context) {
+ public void init(final ProcessorContext<String, String> context) {
context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
System.out.println("----------- " + timestamp + " ----------- ");
@@ -80,8 +83,7 @@ public final class WordCountTransformerDemo {
final KeyValue<String, Integer> entry = iter.next();
System.out.println("[" + entry.key + ", " + entry.value + "]");
-
- context.forward(entry.key, entry.value.toString());
+ context.forward(new Record<>(entry.key, entry.value.toString(), timestamp));
}
}
});
@@ -89,8 +91,8 @@ public final class WordCountTransformerDemo {
}
@Override
- public KeyValue<String, String> transform(final String dummy, final String line) {
- final String[] words = line.toLowerCase(Locale.getDefault()).split("\\W+");
+ public void process(final Record<String, String> record) {
+ final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+");
for (final String word : words) {
final Integer oldValue = this.kvStore.get(word);
@@ -101,8 +103,6 @@ public final class WordCountTransformerDemo {
this.kvStore.put(word, oldValue + 1);
}
}
-
- return null;
}
@Override
@@ -119,7 +119,6 @@ public final class WordCountTransformerDemo {
}
}
- @SuppressWarnings("deprecation")
public static void main(final String[] args) throws IOException {
final Properties props = new Properties();
if (args != null && args.length > 0) {
@@ -142,8 +141,8 @@ public final class WordCountTransformerDemo {
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("streams-plaintext-input")
- .transform(new MyTransformerSupplier())
- .to("streams-wordcount-processor-output");
+ .process(new MyProcessorSupplier())
+ .to("streams-wordcount-processor-output");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
index 95a63916d8..27d32ee933 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
@@ -16,13 +16,13 @@
*/
package org.apache.kafka.streams.examples.wordcount;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.StoreBuilder;
import org.junit.jupiter.api.Test;
@@ -44,7 +44,7 @@ public class WordCountTransformerTest {
final MockProcessorContext<String, String> context = new MockProcessorContext<>();
// Create and initialize the transformer under test; including its provided store
- final WordCountTransformerDemo.MyTransformerSupplier supplier = new WordCountTransformerDemo.MyTransformerSupplier();
+ final WordCountTransformerDemo.MyProcessorSupplier supplier = new WordCountTransformerDemo.MyProcessorSupplier();
for (final StoreBuilder<?> storeBuilder : supplier.stores()) {
final StateStore store = storeBuilder
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
@@ -53,16 +53,16 @@ public class WordCountTransformerTest {
store.init(context.getStateStoreContext(), store);
context.getStateStoreContext().register(store, null);
}
- final Transformer<String, String, KeyValue<String, String>> transformer = supplier.get();
- transformer.init(new org.apache.kafka.streams.processor.MockProcessorContext() {
+ final Processor<String, String, String, String> processor = supplier.get();
+ processor.init(new org.apache.kafka.streams.processor.api.MockProcessorContext<String, String>() {
@Override
public <S extends StateStore> S getStateStore(final String name) {
return context.getStateStore(name);
}
@Override
- public <K, V> void forward(final K key, final V value) {
- context.forward(new Record<>((String) key, (String) value, 0L));
+ public <K extends String, V extends String> void forward(final Record<K, V> record) {
+ context.forward(record);
}
@Override
@@ -72,7 +72,8 @@ public class WordCountTransformerTest {
});
// send a record to the transformer
- transformer.transform("key", "alpha beta\tgamma\n\talpha");
+ final Record<String, String> record = new Record<>("key", "alpha beta\tgamma\n\talpha", 0L);
+ processor.process(record);
// note that the transformer does not forward during transform()
assertTrue(context.forwarded().isEmpty());