You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2022/06/29 01:39:56 UTC

[kafka] branch trunk updated: KAFKA-13821: Update Kafka Streams WordCount demo to new Processor API (#12139)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6ac7f4ea8f KAFKA-13821: Update Kafka Streams WordCount demo to new Processor API (#12139)
6ac7f4ea8f is described below

commit 6ac7f4ea8f1dafdf644fcfb869fbf9f04238786e
Author: CHUN-HAO TANG <ta...@gmail.com>
AuthorDate: Wed Jun 29 09:39:32 2022 +0800

    KAFKA-13821: Update Kafka Streams WordCount demo to new Processor API (#12139)
    
    https://issues.apache.org/jira/browse/KAFKA-13821
    
    Reviewers: Jorge Esteban Quilcate Otoya <qu...@gmail.com>, Bill Bejeck <bb...@apache.org>
---
 .../wordcount/WordCountTransformerDemo.java        | 27 +++++++++++-----------
 .../wordcount/WordCountTransformerTest.java        | 15 ++++++------
 2 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
index c7ac87f046..90f4764be7 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
@@ -25,8 +25,11 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ConnectedStoreProvider;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -63,15 +66,15 @@ import java.util.concurrent.CountDownLatch;
  */
 public final class WordCountTransformerDemo {
 
-    static class MyTransformerSupplier implements TransformerSupplier<String, String, KeyValue<String, String>> {
+    static class MyProcessorSupplier implements ProcessorSupplier<String, String, String, String> {
 
         @Override
-        public Transformer<String, String, KeyValue<String, String>> get() {
-            return new Transformer<String, String, KeyValue<String, String>>() {
+        public Processor<String, String, String, String> get() {
+            return new Processor<String, String, String, String>() {
                 private KeyValueStore<String, Integer> kvStore;
 
                 @Override
-                public void init(final ProcessorContext context) {
+                public void init(final ProcessorContext<String, String> context) {
                     context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
                         try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
                             System.out.println("----------- " + timestamp + " ----------- ");
@@ -80,8 +83,7 @@ public final class WordCountTransformerDemo {
                                 final KeyValue<String, Integer> entry = iter.next();
 
                                 System.out.println("[" + entry.key + ", " + entry.value + "]");
-
-                                context.forward(entry.key, entry.value.toString());
+                                context.forward(new Record<>(entry.key, entry.value.toString(), timestamp));
                             }
                         }
                     });
@@ -89,8 +91,8 @@ public final class WordCountTransformerDemo {
                 }
 
                 @Override
-                public KeyValue<String, String> transform(final String dummy, final String line) {
-                    final String[] words = line.toLowerCase(Locale.getDefault()).split("\\W+");
+                public void process(final Record<String, String> record) {
+                    final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+");
 
                     for (final String word : words) {
                         final Integer oldValue = this.kvStore.get(word);
@@ -101,8 +103,6 @@ public final class WordCountTransformerDemo {
                             this.kvStore.put(word, oldValue + 1);
                         }
                     }
-
-                    return null;
                 }
 
                 @Override
@@ -119,7 +119,6 @@ public final class WordCountTransformerDemo {
         }
     }
 
-    @SuppressWarnings("deprecation")
     public static void main(final String[] args) throws IOException {
         final Properties props = new Properties();
         if (args != null && args.length > 0) {
@@ -142,8 +141,8 @@ public final class WordCountTransformerDemo {
         final StreamsBuilder builder = new StreamsBuilder();
 
         builder.<String, String>stream("streams-plaintext-input")
-            .transform(new MyTransformerSupplier())
-            .to("streams-wordcount-processor-output");
+                .process(new MyProcessorSupplier())
+                .to("streams-wordcount-processor-output");
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final CountDownLatch latch = new CountDownLatch(1);
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
index 95a63916d8..27d32ee933 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
@@ -16,13 +16,13 @@
  */
 package org.apache.kafka.streams.examples.wordcount;
 
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.junit.jupiter.api.Test;
@@ -44,7 +44,7 @@ public class WordCountTransformerTest {
         final MockProcessorContext<String, String> context = new MockProcessorContext<>();
 
         // Create and initialize the transformer under test; including its provided store
-        final WordCountTransformerDemo.MyTransformerSupplier supplier = new WordCountTransformerDemo.MyTransformerSupplier();
+        final WordCountTransformerDemo.MyProcessorSupplier supplier = new WordCountTransformerDemo.MyProcessorSupplier();
         for (final StoreBuilder<?> storeBuilder : supplier.stores()) {
             final StateStore store = storeBuilder
                 .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
@@ -53,16 +53,16 @@ public class WordCountTransformerTest {
             store.init(context.getStateStoreContext(), store);
             context.getStateStoreContext().register(store, null);
         }
-        final Transformer<String, String, KeyValue<String, String>> transformer = supplier.get();
-        transformer.init(new org.apache.kafka.streams.processor.MockProcessorContext() {
+        final Processor<String, String, String, String> processor = supplier.get();
+        processor.init(new org.apache.kafka.streams.processor.api.MockProcessorContext<String, String>() {
             @Override
             public <S extends StateStore> S getStateStore(final String name) {
                 return context.getStateStore(name);
             }
 
             @Override
-            public <K, V> void forward(final K key, final V value) {
-                context.forward(new Record<>((String) key, (String) value, 0L));
+            public <K extends String, V extends String> void forward(final Record<K, V> record) {
+                context.forward(record);
             }
 
             @Override
@@ -72,7 +72,8 @@ public class WordCountTransformerTest {
         });
 
         // send a record to the transformer
-        transformer.transform("key", "alpha beta\tgamma\n\talpha");
+        final Record<String, String> record = new Record<>("key", "alpha beta\tgamma\n\talpha", 0L);
+        processor.process(record);
 
         // note that the transformer does not forward during transform()
         assertTrue(context.forwarded().isEmpty());