You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/01/06 02:30:25 UTC

[rocketmq-streams] branch main updated: add example

This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new eca42f4b add example
     new a8baae4d Merge pull request #245 from ni-ze/newWorld-4
eca42f4b is described below

commit eca42f4be3666c3d43b8398547bb056e1adf498e
Author: 维章 <un...@gmail.com>
AuthorDate: Fri Jan 6 10:28:58 2023 +0800

    add example
---
 .../core/function/supplier/SinkSupplier.java       |  2 --
 .../rocketmq/streams/examples/WordCount.java       | 27 +++-------------------
 .../{WordCount.java => sink/WordCountSink.java}    |  5 ++--
 .../streams/examples/window/WindowCount.java       |  8 ++-----
 4 files changed, 7 insertions(+), 35 deletions(-)

diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
index 74d8317a..393e8956 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
@@ -75,8 +75,6 @@ public class SinkSupplier<K, T> implements Supplier<Processor<T>> {
 
                 if (this.key == null) {
                     message = new Message(this.topicName, value);
-
-                    message.putUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME, null);
                     message.putUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME, data.getClass().getName());
                     if (this.topicName.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
                         message.putUserProperty(Constant.SOURCE_TIMESTAMP, String.valueOf(this.context.getDataTime()));
diff --git a/example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java b/example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java
index 91937ba3..16d01c2a 100644
--- a/example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java
+++ b/example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java
@@ -1,4 +1,3 @@
-package org.apache.rocketmq.streams.examples;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -15,17 +14,14 @@ package org.apache.rocketmq.streams.examples;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.streams.examples;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.streams.core.RocketMQStream;
 import org.apache.rocketmq.streams.core.function.ValueMapperAction;
 import org.apache.rocketmq.streams.core.rstream.StreamBuilder;
-import org.apache.rocketmq.streams.core.serialization.KeyValueSerializer;
 import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
 import org.apache.rocketmq.streams.core.util.Pair;
-import org.apache.rocketmq.streams.examples.pojo.User;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -33,13 +29,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 
-/**
- * 1、启动RocketMQ
- * 2、创建topic sh bin/mqadmin updateTopic -c DefaultCluster -t sourceTopic -r 8 -w 8 -n 127.0.0.1:9876
- * 3、启动本例子运行
- * 4、向topic中写入数据
- * 5、观察输出结果
- */
 public class WordCount {
     public static void main(String[] args) {
         StreamBuilder builder = new StreamBuilder("wordCount");
@@ -54,17 +43,8 @@ public class WordCount {
                 })
                 .keyBy(value -> value)
                 .count()
-                .sink("wordCountSink", new KeyValueSerializer<String, Integer>() {
-                    final ObjectMapper objectMapper = new ObjectMapper();
-                    @Override
-                    public byte[] serialize(String o, Integer data) throws Throwable {
-                        ObjectNode objectNode = objectMapper.createObjectNode();
-                        objectNode.put(o, data);
-
-                        String result = objectNode.toPrettyString();
-                        return objectMapper.writeValueAsBytes(result);
-                    }
-                });
+                .toRStream()
+                .print();
 
         TopologyBuilder topologyBuilder = builder.build();
 
@@ -91,5 +71,4 @@ public class WordCount {
         }
         System.exit(0);
     }
-
 }
diff --git a/example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java b/example/src/main/java/org/apache/rocketmq/streams/examples/sink/WordCountSink.java
similarity index 96%
copy from example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java
copy to example/src/main/java/org/apache/rocketmq/streams/examples/sink/WordCountSink.java
index 91937ba3..d7b59926 100644
--- a/example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java
+++ b/example/src/main/java/org/apache/rocketmq/streams/examples/sink/WordCountSink.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.streams.examples;
+package org.apache.rocketmq.streams.examples.sink;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -25,7 +25,6 @@ import org.apache.rocketmq.streams.core.rstream.StreamBuilder;
 import org.apache.rocketmq.streams.core.serialization.KeyValueSerializer;
 import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
 import org.apache.rocketmq.streams.core.util.Pair;
-import org.apache.rocketmq.streams.examples.pojo.User;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -40,7 +39,7 @@ import java.util.concurrent.CountDownLatch;
  * 4、向topic中写入数据
  * 5、观察输出结果
  */
-public class WordCount {
+public class WordCountSink {
     public static void main(String[] args) {
         StreamBuilder builder = new StreamBuilder("wordCount");
 
diff --git a/example/src/main/java/org/apache/rocketmq/streams/examples/window/WindowCount.java b/example/src/main/java/org/apache/rocketmq/streams/examples/window/WindowCount.java
index a7db93f9..4f758b7e 100644
--- a/example/src/main/java/org/apache/rocketmq/streams/examples/window/WindowCount.java
+++ b/example/src/main/java/org/apache/rocketmq/streams/examples/window/WindowCount.java
@@ -56,12 +56,8 @@ public class WindowCount {
                 .keyBy(value -> "key")
                 .window(WindowBuilder.tumblingWindow(Time.seconds(15)))
                 .aggregate(aggregateAction)
-                .sink("", new KeyValueSerializer<String, Num>() {
-                    @Override
-                    public byte[] serialize(String s, Num data) throws Throwable {
-                        return new byte[0];
-                    }
-                });
+                .toRStream()
+                .print();
 
         TopologyBuilder topologyBuilder = builder.build();