You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/01/06 02:30:25 UTC
[rocketmq-streams] branch main updated: add example
This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new eca42f4b add example
new a8baae4d Merge pull request #245 from ni-ze/newWorld-4
eca42f4b is described below
commit eca42f4be3666c3d43b8398547bb056e1adf498e
Author: 维章 <un...@gmail.com>
AuthorDate: Fri Jan 6 10:28:58 2023 +0800
add example
---
.../core/function/supplier/SinkSupplier.java | 2 --
.../rocketmq/streams/examples/WordCount.java | 27 +++-------------------
.../{WordCount.java => sink/WordCountSink.java} | 5 ++--
.../streams/examples/window/WindowCount.java | 8 ++-----
4 files changed, 7 insertions(+), 35 deletions(-)
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
index 74d8317a..393e8956 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
@@ -75,8 +75,6 @@ public class SinkSupplier<K, T> implements Supplier<Processor<T>> {
if (this.key == null) {
message = new Message(this.topicName, value);
-
- message.putUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME, null);
message.putUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME, data.getClass().getName());
if (this.topicName.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
message.putUserProperty(Constant.SOURCE_TIMESTAMP, String.valueOf(this.context.getDataTime()));
diff --git a/example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java b/example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java
index 91937ba3..16d01c2a 100644
--- a/example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java
+++ b/example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java
@@ -1,4 +1,3 @@
-package org.apache.rocketmq.streams.examples;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -15,17 +14,14 @@ package org.apache.rocketmq.streams.examples;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.streams.examples;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.streams.core.RocketMQStream;
import org.apache.rocketmq.streams.core.function.ValueMapperAction;
import org.apache.rocketmq.streams.core.rstream.StreamBuilder;
-import org.apache.rocketmq.streams.core.serialization.KeyValueSerializer;
import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
import org.apache.rocketmq.streams.core.util.Pair;
-import org.apache.rocketmq.streams.examples.pojo.User;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -33,13 +29,6 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
-/**
- * 1、启动RocketMQ
- * 2、创建topic sh bin/mqadmin updateTopic -c DefaultCluster -t sourceTopic -r 8 -w 8 -n 127.0.0.1:9876
- * 3、启动本例子运行
- * 4、向topic中写入数据
- * 5、观察输出结果
- */
public class WordCount {
public static void main(String[] args) {
StreamBuilder builder = new StreamBuilder("wordCount");
@@ -54,17 +43,8 @@ public class WordCount {
})
.keyBy(value -> value)
.count()
- .sink("wordCountSink", new KeyValueSerializer<String, Integer>() {
- final ObjectMapper objectMapper = new ObjectMapper();
- @Override
- public byte[] serialize(String o, Integer data) throws Throwable {
- ObjectNode objectNode = objectMapper.createObjectNode();
- objectNode.put(o, data);
-
- String result = objectNode.toPrettyString();
- return objectMapper.writeValueAsBytes(result);
- }
- });
+ .toRStream()
+ .print();
TopologyBuilder topologyBuilder = builder.build();
@@ -91,5 +71,4 @@ public class WordCount {
}
System.exit(0);
}
-
}
diff --git a/example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java b/example/src/main/java/org/apache/rocketmq/streams/examples/sink/WordCountSink.java
similarity index 96%
copy from example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java
copy to example/src/main/java/org/apache/rocketmq/streams/examples/sink/WordCountSink.java
index 91937ba3..d7b59926 100644
--- a/example/src/main/java/org/apache/rocketmq/streams/examples/WordCount.java
+++ b/example/src/main/java/org/apache/rocketmq/streams/examples/sink/WordCountSink.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.streams.examples;
+package org.apache.rocketmq.streams.examples.sink;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -25,7 +25,6 @@ import org.apache.rocketmq.streams.core.rstream.StreamBuilder;
import org.apache.rocketmq.streams.core.serialization.KeyValueSerializer;
import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
import org.apache.rocketmq.streams.core.util.Pair;
-import org.apache.rocketmq.streams.examples.pojo.User;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -40,7 +39,7 @@ import java.util.concurrent.CountDownLatch;
* 4、向topic中写入数据
* 5、观察输出结果
*/
-public class WordCount {
+public class WordCountSink {
public static void main(String[] args) {
StreamBuilder builder = new StreamBuilder("wordCount");
diff --git a/example/src/main/java/org/apache/rocketmq/streams/examples/window/WindowCount.java b/example/src/main/java/org/apache/rocketmq/streams/examples/window/WindowCount.java
index a7db93f9..4f758b7e 100644
--- a/example/src/main/java/org/apache/rocketmq/streams/examples/window/WindowCount.java
+++ b/example/src/main/java/org/apache/rocketmq/streams/examples/window/WindowCount.java
@@ -56,12 +56,8 @@ public class WindowCount {
.keyBy(value -> "key")
.window(WindowBuilder.tumblingWindow(Time.seconds(15)))
.aggregate(aggregateAction)
- .sink("", new KeyValueSerializer<String, Num>() {
- @Override
- public byte[] serialize(String s, Num data) throws Throwable {
- return new byte[0];
- }
- });
+ .toRStream()
+ .print();
TopologyBuilder topologyBuilder = builder.build();