You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/01/11 12:06:23 UTC
[rocketmq-site] branch new-official-website updated: [streams doc] modify streas doc
This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch new-official-website
in repository https://gitbox.apache.org/repos/asf/rocketmq-site.git
The following commit(s) were added to refs/heads/new-official-website by this push:
new 58055efdb [streams doc] modify streas doc
new b21c85df5 Merge pull request #427 from ni-ze/new-official-website
58055efdb is described below
commit 58055efdb5751476cc4d06dd0cd0ad426e080fe3
Author: 维章 <un...@gmail.com>
AuthorDate: Wed Jan 11 20:01:30 2023 +0800
[streams doc] modify streas doc
---
docs/07-streams/30RocketMQ Streams Overview.md | 14 +-
docs/07-streams/31RocketMQ Streams Concept.md | 142 ++++----
docs/07-streams/32RocketMQ Streams Quick Start.md | 208 ++++++-----
docs/07-streams/33RocketMQ Streams In Action.md | 386 ---------------------
docs/picture/33rocketmq-streams/stage.png | Bin 79694 -> 0 bytes
.../\346\200\273\344\275\223-1.png" | Bin 0 -> 35338 bytes
.../\346\200\273\344\275\223-2.png" | Bin 0 -> 146415 bytes
...75\223\346\236\266\346\236\204\345\233\276.png" | Bin 60493 -> 0 bytes
...\206\345\237\237\346\250\241\345\236\213-1.png" | Bin 0 -> 21993 bytes
...\206\345\237\237\346\250\241\345\236\213-2.png" | Bin 0 -> 34203 bytes
10 files changed, 197 insertions(+), 553 deletions(-)
diff --git a/docs/07-streams/30RocketMQ Streams Overview.md b/docs/07-streams/30RocketMQ Streams Overview.md
index be7f4eb4a..617516c82 100644
--- a/docs/07-streams/30RocketMQ Streams Overview.md
+++ b/docs/07-streams/30RocketMQ Streams Overview.md
@@ -3,11 +3,14 @@ RocketMQ Streams是基于RocketMQ的轻量级流计算引擎。能以SDK方式
因此具有资源消耗少、扩展性好、支持流计算算子丰富的特点。
## 整体架构
-![总体架构](../picture/33rocketmq-streams/总体架构图.png)
+![总体架构](../picture/33rocketmq-streams/总体-1.png)
数据从RocketMQ中被RocketMQ-streams消费,经过处理最终被写回到RocketMQ。
-如果流处理任务中含有算子groupBy,则需要将数据按照Key进行分组,将分组数据写入shuffle topic。后续算子从
-shuffle topic消费。如果还涉及count之类有状态算子,那么计算时需要读写状态,在窗口触发之后将计算结果写出。
+
+![总体架构](../picture/33rocketmq-streams/总体-2.png)
+
+数据被RocketMQ Consumer消费,进入处理拓扑被算子处理,如果流处理任务中含有算子keyBy,则需要将数据按照Key进行分组,将分组数据写入shuffle topic。后续算子从
+shuffle topic消费。如果还涉及count之类有状态算子,那么计算时需要读写state topic,计算结束后,将结果写回到RocketMQ中。
## 消费模型
@@ -22,9 +25,8 @@ shuffle topic消费。如果还涉及count之类有状态算子,那么计算
## 状态
![img_3.png](../picture/33rocketmq-streams/状态存储.png)
-对于有状态算子,比如count,需要先对count算子进行分组,然后才能求和。分组算子groupBy会将数据按照分组的key重新写回RocketMQ,并且使相同key写入同一分区(这一过程称作shuffle),
-保证这个含有相同key的数据被同一个消费者消费。 状态本地依赖RocksDB加速读取,远程依赖RocketMQ做持久化,在做checkpoint时将本地RocksDB中状态写入到RocketMQ中。
-允许流计算任务运行时,可以只依赖本地存储RocksDB, 只需要将setLocalStorageOnly设置成true即可。这种情况下可能存在状态丢失,不建议在生产环境使用。
+对于有状态算子,比如count,需要先对count算子进行分组,然后才能求和。分组算子keyBy会将数据按照分组的key重新写回RocketMQ,并且使相同key写入同一分区(这一过程称作shuffle),
+保证这个含有相同key的数据被同一个消费者消费。 状态本地依赖RocksDB加速读取,远程依赖RocketMQ做持久化。
## 扩缩容
diff --git a/docs/07-streams/31RocketMQ Streams Concept.md b/docs/07-streams/31RocketMQ Streams Concept.md
index 3524e5b3d..26e8afdd7 100644
--- a/docs/07-streams/31RocketMQ Streams Concept.md
+++ b/docs/07-streams/31RocketMQ Streams Concept.md
@@ -1,83 +1,65 @@
# RocketMQ Streams 核心概念
-## DataStream
-### DataStreamSource
-DataStreamSource 是分段式编程的源头类,用于对接数据源RocketMQ;
-+ ```dataStream(nameSpaceName,pipelineName)``` 返回DataStreamSource实例,用于分段编程实现流计算任务;
-+ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
- + ```topic``` rocketmq消息队列的topic名称,必填参数
- + ```groupName``` 消费者组的名称,必填参数
- + ```namesrvAddress``` RocketMQ集群的namesrv地址
- + ```isJson``` 是否json格式,非必填参数
- + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
-
-### DataStream
-
-+ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
-+ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
-+ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
-+ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
-+ ```script``` 针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
-+ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
-+ ```toPrint``` 将结果在控制台打印,生成新的DataStream实例
-+ ```toFile``` 将结果保存为文件,生成一个新的DataStream实例
-+ ```toRocketmq``` 将结果输出到rocketmq
-+ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
- + ```count``` 在窗口内计数
- + ```min``` 获取窗口内统计值的最小值
- + ```max``` 获取窗口内统计值得最大值
- + ```avg``` 获取窗口内统计值的平均值
- + ```sum``` 获取窗口内统计值的加和值
- + ```reduce``` 在窗口内进行自定义的汇总运算
-+ ```join``` 根据条件将俩个流进行内关联
-+ ```leftJoin``` 根据条件将俩个流的数据进行左关联
-+ ```union``` 将俩个流进行合并
-+ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
-+ ```setLocalStorageOnly``` 状态是否使用本地存储
-+ ```with``` with算子用来指定计算过程中的相关策略
-
-## ChainPipeline
-数据流转管道,ChainPipeline从一个目标源读取数据,数据一次流过多个stage,组成一个数据处理链条。ChainPipeline间可相互组合成处理拓扑,多个ChainPipeline组成一个流处理任务,
-例如一个流处理任务中,从两个topic中读出数据经过处理后写入到第三个topic,虽然只有一个流处理任务,但是却由两个ChainPipeline组成,一个ChainPipeline包含一个source实例和若干处理数据的stage组成。
-
-## stage
-数据具体运算节点,所有的其他运算,包括map,filter,script,window都会先构建出stage,然后以stage的身份进入ChainPipeline。流计算时,深度优先遍历ChainPipeline中的
-stage,stage中属性nextStageLabel指向下游计算节点。
-![img_2.png](../picture/33rocketmq-streams/stage.png)
-## shuffle
-当需要对数据进行分组统计时,需要针对某个key将数据分开,然后对相同的key进行统计。流计算中,往往将含有相同key的数据发送到同一节点处理。在RocketMQ Streams中,借助RocketMQ来实现这一过程能降低复杂度,
-不需要动态发现某个key应该被发送到下有哪个实例节点上。
-将需要分组的数据,使用key作路由分组,发送回RocketMQ的topic中,含有相同key的数据都在一个分区(messageQueue)中,这个过程成为shuffle。
-下游消费者节点,按照集群消费模式消费数据,一个消费者能消费到含有相同key的所有数据,因此才能做出正确的计算。
-
## 领域模型
-领域模型为数据在流处理算子之间传递的数据模型。RocketMQ Streams使用自定义的Message对象作为算子间信息的承载,它具有以下属性:
-```java
-public class Message {
- private JSONObject message;
-
- private boolean isJsonMessage = true;
-
- protected ISystemMessage systemMessage;
-
- protected MessageHeader header = new MessageHeader();
-
-}
-```
-
-- message:数据内容,如果原始数据是json形式,则解析成JSONObject,如果原始对象不是,则解析成自定义的对象UserDefinedMessage,他继承自JSONObject;
-- systemMessage:表征数据是数据消息还是系统消息,系统消息是指在特定事件发生时,将产生特定类型的系统消息,下游组件接收到将做出特定处理。目前有下列几种系统消息:发现数据来自新分区产生的新增分区系统消息、不再消费某个分区数据的分区异常系统消息、数据消费位点提交时产生的checkpoint系统消息。
-- header:数据在处理过程中保留的额外信息,用于辅助后续计算。他有以下属性:
-
-| 字段类型 | 字段名称 | 含义 |
-| ---- | ---- |-----|
-| String | JOIN_LEFT | join时标记左右窗口|
-| String | JOIN_RIGHT | join时标记左右窗口|
-|ISource |source | 数据来自的source实例|
-| String| routeLabels| 路由标签,标记会被哪些下游节点处理|
-| String| filterLabels|路由标记,标记不会被哪些下游节点处理|
-|MessageOffset | messageOffset|消息offset|
-|boolean |needFlush |是否需要立即将消息传递到下游|
-| boolean| isSystemMessage|是否是系统消息|
-|String | msgRouteFromLable|消息来自哪个lable|
-|String |originTable |消息来自那张sql源表|
+
+### StreamBuilder
+![img_2.png](../picture/33rocketmq-streams/领域模型-1.png)
+
+* 一个StreamBuilder实例,有1到N个pipeline,pipeline表示一个数据处理路径;
+* 一个pipeline可以含有1到N个处理节点GroupNode;
+* 一个StreamBuilder实例,有一个TopologyBuilder,TopologyBuilder可构建出数据处理器processor;
+* 一个JobId对应一个StreamBuilder实例。
+
+### RocketMQStream
+![img_2.png](../picture/33rocketmq-streams/领域模型-2.png)
+
+* 一个RocketMQStream实例,有一个拓扑构建器TopologyBuilder;
+* 一个RocketMQStream实例,可实例化1到N个worker线程;
+* 每个线程WorkerThread实例,包含一个engine;
+* 一个engine包含执行数据处理的所有逻辑,包含一个consumer实例、一个producer实例、一个StateStore实例;
+
+### 流处理实例
+流处理实例表示一个运行RocketMQ Streams的进程;
+
+* 一个流处理实例包含一个StreamBuilder,一个RocketMQStream,一个拓扑图,一到多个pipeline;
+
+
+## StreamBuilder
++ ```StreamBuilder(jobId)``` 构建实例;
++ ```<OUT> RStream<OUT> source(topicName, deserializer) ``` 定义source topic 和反序列化方式;
+
+
+## RStream
++ ```<K> GroupedStream<K, T> keyBy(selectAction)``` 按照特定字段分组;
++ ```<O> RStream<O> map(mapperAction)``` 对数据进行一对一转化;
++ ```RStream<T> filter(predictor)``` 对数据进行过滤
++ ```<VR> RStream<T> flatMap(mapper)```对数据进行一对多转化;
++ ```<T2> JoinedStream<T, T2> join(rightStream)``` 双流Join;
++ ```sink(topicName, serializer)``` 将结果输出到特定topic;
+
+
+## GroupedStream
+对含有相同Key的数据进行操作
++ ```<OUT> GroupedStream<K, Integer> count(selectAction)``` 统计含有某个字段数据的个数;
++ ```GroupedStream<K, V> min(selectAction)``` 对某个字段统计最小值;
++ ```GroupedStream<K, V> max(selectAction)``` 对某个字段统计最大值;
++ ```GroupedStream<K, ? extends Number> sum(selectAction)``` 对某个字段统计和;
++ ```GroupedStream<K, V> filter(predictor)``` 对某个字段进行过滤;
++ ```<OUT> GroupedStream<K, OUT> map(valueMapperAction)``` 对数据进行一对一转化;
++ ```<OUT> GroupedStream<K, OUT> aggregate(accumulator)``` 对数据进行聚合操作,且聚合支持二阶聚合,例如在窗口未触发时添加数据,在窗口触发时计算结果这类算子;
++ ```WindowStream<K, V> window(windowInfo)``` 对窗口划定window;
++ ```GroupedStream<K, V> addGraphNode(name, supplier)``` 底层接口,向流处理拓扑中增加自定义算子;
++ ```RStream<V> toRStream()``` 转化为RStream,只是在接口形式上转化,对数据无任何操作;
++ ```sink(topicName, serializer)``` 按照自定义序列化形式将结果写出到topic;
+
+
+## WindowStream
+对被划分window的数据进行操作
++ ```WindowStream<K, Integer> count()``` 统计窗口内数据个数;
++ ```WindowStream<K, V> filter(predictor)``` 过滤窗口内数据;
++ ```<OUT> WindowStream<K, OUT> map(mapperAction)``` 对窗口内数据一对一转化;
++ ```<OUT> WindowStream<K, OUT> aggregate(aggregateAction)``` 对窗口内数据多对一转化;
++ ```<OUT> WindowStream<K, OUT> aggregate(accumulator)``` 对数据进行聚合操作,且聚合支持二阶聚合,例如在窗口未触发时添加数据,在窗口触发时计算结果这类算子;
++ ```void sink(topicName, serializer)``` 按照自定义序列化形式将结果写出到topic;
+
+
diff --git a/docs/07-streams/32RocketMQ Streams Quick Start.md b/docs/07-streams/32RocketMQ Streams Quick Start.md
index 6deaa45d7..075ce4375 100644
--- a/docs/07-streams/32RocketMQ Streams Quick Start.md
+++ b/docs/07-streams/32RocketMQ Streams Quick Start.md
@@ -4,9 +4,11 @@
## RocketMQ Streams工程中运行
-参考RocketMQ Streams工程rocketmq-streams-examples中文档:[examples](https://github.com/apache/rocketmq-streams/tree/main/rocketmq-streams-examples)
-
-rocketmq-streams-examples模块下程序可以直接运行(某些例子需要启动RocketMQ)。
+参考RocketMQ Streams工程rocketmq-streams-examples模块下程序可以直接运行;运行example步骤:
+* 本地启动RocketMQ 5.0及以上版本;
+* 使用mqAdmin创建example中数据源topic;
+* 启动example中例子;
+* 向RocketMQ的源topic中写入合适数据(依据示例而定);
## RocketMQ Streams以SDK方式被应用依赖
### 环境准备
@@ -16,104 +18,148 @@ rocketmq-streams-examples模块下程序可以直接运行(某些例子需要
### 构建RocketMQ Streams
-```shell
-git clone https://github.com/apache/rocketmq-streams.git
-cd rocketmq-streams
-mvn clean -DskipTests install -U
-```
+
### 添加pom依赖
```xml
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-streams-clients</artifactId>
- <!--替换成最新版本-->
- <version>${version}</version>
+ <artifactId>rocketmq-streams</artifactId>
+ <!-- 根据需要修改 -->
+ <version>1.1.0</version>
</dependency>
</dependencies>
-
-<build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.2.1</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <minimizeJar>false</minimizeJar>
- <shadedArtifactAttached>true</shadedArtifactAttached>
- <artifactSet>
- <includes>
- <include>org.apache.rocketmq:rocketmq-streams-clients</include>
- </includes>
- </artifactSet>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
-</build>
```
### 编写流计算程序
```java
-public class Demo{
- private static String topicName = "topic-1";
- private static String groupName = "groupName-1";
-
+public class WordCount {
public static void main(String[] args) {
- DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
-
- source.fromRocketmq(
- topicName,
- groupName,
- NAMESRV_ADDRESS
- )
- .map(message -> JSONObject.parseObject((String) message))
- .filter(message -> ((JSONObject) message).getInteger("score") > 90)
- .selectFields("name", "subject")
- .toPrint()
- .start();
+ StreamBuilder builder = new StreamBuilder("wordCount");
+
+ builder.source("sourceTopic", total -> {
+ String value = new String(total, StandardCharsets.UTF_8);
+ return new Pair<>(null, value);
+ })
+ .flatMap((ValueMapperAction<String, List<String>>) value -> {
+ String[] splits = value.toLowerCase().split("\\W+");
+ return Arrays.asList(splits);
+ })
+ .keyBy(value -> value)
+ .count()
+ .toRStream()
+ .print();
+
+ TopologyBuilder topologyBuilder = builder.build();
+
+ Properties properties = new Properties();
+ properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
+
+ RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {
+ @Override
+ public void run() {
+ rocketMQStream.stop();
+ latch.countDown();
+ }
+ });
+ try {
+ rocketMQStream.start();
+ latch.await();
+ } catch (final Throwable e) {
+ System.exit(1);
+ }
+ System.exit(0);
}
}
```
-### 向RocketMQ topic-1中写入数据并观察结果
-如果向topic-1中写入的数据如下:
+### 向RocketMQ sourceTopic中写入数据并观察结果
+如果向sourceTopic中写入的数据如下:每行数据作为一个消息发送;
```xml
-{"name":"张三","class":"3","subject":"数学","score":90}
-{"name":"张三","class":"3","subject":"历史","score":81}
-{"name":"张三","class":"3","subject":"英语","score":91}
-{"name":"张三","class":"3","subject":"语文","score":70}
-{"name":"张三","class":"3","subject":"政治","score":84}
-{"name":"张三","class":"3","subject":"地理","score":99}
-{"name":"李四","class":"3","subject":"数学","score":76}
-{"name":"李四","class":"3","subject":"历史","score":83}
-{"name":"李四","class":"3","subject":"英语","score":82}
-{"name":"李四","class":"3","subject":"语文","score":92}
-{"name":"李四","class":"3","subject":"政治","score":97}
-{"name":"李四","class":"3","subject":"地理","score":89}
-{"name":"王五","class":"3","subject":"数学","score":86}
-{"name":"王五","class":"3","subject":"历史","score":88}
-{"name":"王五","class":"3","subject":"英语","score":86}
-{"name":"王五","class":"3","subject":"语文","score":93}
-{"name":"王五","class":"3","subject":"政治","score":99}
-{"name":"王五","class":"3","subject":"地理","score":88}
+"To be, or not to be,--that is the question:--",
+"Whether 'tis nobler in the mind to suffer",
+"The slings and arrows of outrageous fortune",
+"Or to take arms against a sea of troubles,",
+"And by opposing end them?--To die,--to sleep,--",
+"No more; and by a sleep to say we end",
+"The heartache, and the thousand natural shocks",
+"That flesh is heir to,--'tis a consummation",
```
-得到结果如下:
+统计单词出现频率,计算结果如下:
```xml
-{"subject":"政治","name":"王五"}
-{"subject":"地理","name":"张三"}
-{"subject":"语文","name":"李四"}
-{"subject":"语文","name":"王五"}
-{"subject":"英语","name":"张三"}
-{"subject":"政治","name":"李四"}
+(key=to, value=1)
+(key=be, value=1)
+(key=or, value=1)
+(key=not, value=1)
+(key=to, value=2)
+(key=be, value=2)
+(key=that, value=1)
+(key=is, value=1)
+(key=the, value=1)
+(key=whether, value=1)
+(key=tis, value=1)
+(key=nobler, value=1)
+(key=mind, value=1)
+(key=against, value=1)
+(key=troubles, value=1)
+(key=slings, value=1)
+(key=die, value=1)
+(key=natural, value=1)
+(key=flesh, value=1)
+(key=sea, value=1)
+(key=fortune, value=1)
+(key=shocks, value=1)
+(key=consummation, value=1)
+(key=to, value=3)
+(key=to, value=4)
+(key=to, value=5)
+(key=say, value=1)
+(key=end, value=1)
+(key=end, value=2)
+(key=to, value=6)
+(key=to, value=7)
+(key=to, value=8)
+(key=or, value=2)
+(key=them, value=1)
+(key=take, value=1)
+(key=arms, value=1)
+(key=of, value=1)
+(key=and, value=1)
+(key=of, value=2)
+(key=and, value=2)
+(key=by, value=1)
+(key=sleep, value=1)
+(key=and, value=3)
+(key=by, value=2)
+(key=sleep, value=2)
+(key=and, value=4)
+(key=that, value=2)
+(key=arrows, value=1)
+(key=heir, value=1)
+(key=question, value=1)
+(key=is, value=2)
+(key=the, value=2)
+(key=suffer, value=1)
+(key=a, value=1)
+(key=the, value=3)
+(key=no, value=1)
+(key=a, value=2)
+(key=opposing, value=1)
+(key=the, value=4)
+(key=the, value=5)
+(key=a, value=3)
+(key=in, value=1)
+(key=more, value=1)
+(key=heartache, value=1)
+(key=outrageous, value=1)
+(key=we, value=1)
+(key=thousand, value=1)
+(key=tis, value=2)
```
diff --git a/docs/07-streams/33RocketMQ Streams In Action.md b/docs/07-streams/33RocketMQ Streams In Action.md
deleted file mode 100644
index 1b9b75b5e..000000000
--- a/docs/07-streams/33RocketMQ Streams In Action.md
+++ /dev/null
@@ -1,386 +0,0 @@
-# RocketMQ Streams 实战
-
-
-## 背景
-分布式链路追踪能实时、准确的追踪每一次请求经过的服务节点和每个节点的耗时,能帮忙快速定位错误根因,因此被广泛地运用在智能运维领域。那么,如何将分布式链路追踪系统产生的数据实时、准确的计算展示出来,
-就成为搭建分布式链路追踪系统的关键所在。
-
-分布式链路追踪系统中有下面几个核心概念:
-- traceId:唯一对应一次请求。
-- span:一次请求中某两个关键方法之间的调用,一次请求只有一个traceId,但是可以有多个span。
- - spanId: 唯一标识一个span;
- - parentSpanId:标识当前span的上一个span的spanId是多少;
-
-运用上述概念,将一次请求中所产生的数据,处理成请求拓扑图或者链式调用图。由于分布式链路追踪系统产生的数量大、处理延迟要求低(延迟太高的请求链路信息对排查线上问题帮助会减弱),批量处理已经不能满足要求,必须使用流计算。
-
-输入数据如下所示,需要将下列数据处理成一个调用链。
-```json
-{
- "empty":false,
- "labels":[
- "68e51083e737a3f7d9c9e7ab34b1f6a7"
- ],
- "spanIndex":{
- "0.1.1":{
- "appId":"1_1@811c06548ae3a13",
- "clientAppId":"1_1@10b8d040e1d0f78",
- "clientIp":"xx.xxx.xxx.xx",
- "elapsed":1,
- "httpStatusCode":"",
- "kind":"sr",
- "orgId":"1",
- "parentSpanId":"0.1",
- "resultCode":"0",
- "serverAppId":"1_1@811c06548ae3a13",
- "serverIp":"xx.xxx.xxx.xx",
- "spanId":"0.1.1",
- "timestamp":1657511940232,
- "traceId":"ea1bef7f5a16575119402287990d0018"
- }
- },
- "traceId":"ea1bef7f5a16575119402287990d0018"
-}
-```
-```json
-{
- "empty":false,
- "labels":[
- "68e51083e737a3f7d9c9e7ab34b1f6a7"
- ],
- "spanIndex":{
- "0.1":{
- "appId":"1_1@10b8d040e1d0f78",
- "clientAppId":"1_1@a48b12f3ecc3d60",
- "clientIp":"xx.xxx.xxx.xx",
- "elapsed":14,
- "httpStatusCode":"200",
- "kind":"sr",
- "orgId":"1",
- "parentSpanId":"0",
- "resultCode":"0",
- "serverAppId":"1_1@10b8d040e1d0f78",
- "serverIp":"xx.xxx.xxx.xx",
- "spanId":"0.1",
- "timestamp":1657511940232,
- "traceId":"ea1bef7f5a16575119402287990d0018"
- }
- },
- "traceId":"ea1bef7f5a16575119402287990d0018"
-}
-```
-```json
-{
- "empty":false,
- "labels":[
- "e118372a87c3d7af811e08c6eea829c6"
- ],
- "spanIndex":{
- "0":{
- "appId":"1_1@a48b12f3ecc3d60",
- "clientAppId":"",
- "clientIp":"",
- "elapsed":20,
- "httpStatusCode":"200",
- "kind":"sr",
- "orgId":"1",
- "parentSpanId":"",
- "resultCode":"0",
- "serverAppId":"1_1@a48b12f3ecc3d60",
- "serverIp":"xx.xxx.xxx.xx",
- "spanId":"0",
- "timestamp":1657511940228,
- "traceId":"ea1bef7f5a16575119402287990d0018"
- }
- },
- "traceId":"ea1bef7f5a16575119402287990d0018"
-}
-```
-```json
-{
- "empty":false,
- "labels":[
- "68e51083e737a3f7d9c9e7ab34b1f6a7"
- ],
- "spanIndex":{
- "0.1.2":{
- "appId":"1_1@811c06548ae3a13",
- "clientAppId":"1_1@10b8d040e1d0f78",
- "clientIp":"xx.xxx.xxx.xx",
- "elapsed":5,
- "httpStatusCode":"",
- "kind":"sr",
- "orgId":"1",
- "parentSpanId":"0.1",
- "resultCode":"0",
- "serverAppId":"1_1@811c06548ae3a13",
- "serverIp":"xx.xxx.xxx.xx",
- "spanId":"0.1.2",
- "timestamp":1657511940234,
- "traceId":"ea1bef7f5a16575119402287990d0018"
- }
- },
- "traceId":"ea1bef7f5a16575119402287990d0018"
-}
-```
-
-上述数据经过一定简化,去掉不必要字段,也只有一个traceId,实际生产中会同时存在多个traceId,但是处理原理一样。为了得到调用链,需要先对traceId进行分组,
-分组之后,同一组内数据traceId相同,需要对多个span按照调用顺序进行排序,最后输出。这样的输出结果中就包含了一次调用的所有span,可以清晰地标识出这次请求每个span
-节点耗时多久,如果请求有错误发生,也会知道错误发生在哪里。下面展示RocketMQ Streams是如何处理这些数据的。
-
-
-## 搭建集群
-
-### 启动RocketMQ
-
-- Linux/Unix/Mac
-- 64bit JDK 1.8+;
-- Maven 3.2.x或以上版本;
-- 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/);
-
-### 本地安装RocketMQ-streams
-
-```shell
-git clone https://github.com/apache/rocketmq-streams.git
-cd rocketmq-streams
-mvn clean -DskipTests install -U
-```
-
-### 构建流计算程序
-
-- 新建IDE工程
-- 添加pom依赖
-```xml
- <dependencies>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-streams-clients</artifactId>
- <!--替换成本地install的rocketmq-streams-->
- <version>${version}</version>
- </dependency>
-</dependencies>
-
-<build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.2.1</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <minimizeJar>false</minimizeJar>
- <shadedArtifactAttached>true</shadedArtifactAttached>
- <artifactSet>
- <includes>
- <include>org.apache.rocketmq:rocketmq-streams-clients</include>
- </includes>
- </artifactSet>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
-</build>
-```
-
-
-
-- 编写流计算程序
-```java
-public class UserTest {
- public static void main(String[] args) {
- DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
-
- source.fromRocketmq(
- "traceTopic",
- "traceGroup",
- true,
- "127.0.0.1:9876")
- .map((message -> {
- JSONObject spanIndex = (JSONObject) ((JSONObject) message).get("spanIndex");
- JSONObject span = (JSONObject) spanIndex.getInnerMap().values().stream().findFirst().get();
-
- Object spanId = span.get("spanId");
- Object parentSpanId = span.get("parentSpanId");
-
- ((JSONObject) message).put("spanId", spanId);
- ((JSONObject) message).put("parentSpanId", parentSpanId);
-
- return message;
- }))
- .window(SessionWindow.of(Time.seconds(5)))
- .groupBy("traceId")
- //同一traceId中,原始数据中的spanIndex字段数据,放在一起形成新的字段traceList,达到一个traceId包含属于该trace的多个span的目的。
- .addUDAF(new TestUDAF(), "traceList", "spanIndex")
- .setLocalStorageOnly(true)
- .toDataStream()
- .forEach(new ForEachFunction<JSONObject>() {
- @Override
- public void foreach(JSONObject data) {
- //对同一trace中的span进行排序,使span顺序符合请求发生顺序
- JSONArray traceList = (JSONArray) data.get("traceList");
- HashMap<String, Object> totalMap = new HashMap<>();
-
- Iterator<Object> iterator = traceList.stream().iterator();
- while (iterator.hasNext()) {
- String next = (String) iterator.next();
- JSONObject spanItem = JSON.parseObject(next);
-
- Set<String> keySet = spanItem.keySet();
- for (String key : keySet) {
- totalMap.put(key, spanItem);
- }
- }
-
- ArrayList<String> temp = new ArrayList<>(totalMap.keySet());
- Collections.sort(temp);
-
- traceList.clear();
- for (String sortKey : temp) {
- traceList.add(totalMap.get(sortKey));
- }
- }
- })
- .toPrint(1)
- .with(WindowStrategy.highPerformance())
- .start();
- }
-}
-```
-
-```java
-public class TestUDAF implements IAccumulator<List<String>, TestUDAF.Trace> {
- public static class Trace {
- public List<String> result = new ArrayList<>();
- }
-
- @Override
- public Trace createAccumulator() {
- return new Trace();
- }
-
- @Override
- public List<String> getValue(Trace accumulator) {
- return accumulator.result;
- }
-
- @Override
- public void accumulate(Trace accumulator, Object... parameters) {
- if (parameters == null || parameters.length == 0) {
- return;
- }
- if (parameters.length != 1) {
- throw new IllegalArgumentException("parameters length must be one");
- }
-
- JSONObject param = (JSONObject) parameters[0];
- String result = param.toJSONString();
-
- if (accumulator == null) {
- accumulator = new Trace();
- }
-
- accumulator.result.add(result);
- }
-
- @Override
- public void merge(Trace accumulator, Iterable<Trace> its) {
-
- }
-
- @Override
- public void retract(Trace accumulator, String... parameters) {
-
- }
-
-}
-```
-
-- 输出结果:
- 结果中,一条数据包含一个trace,即一次请求所产生的数据,请求中多个span按照请求发生顺序排列。
-```json
-{
- "traceId":"ea1bef7f5a16575119402287990d0018",
- "start_time":"2022-10-08 15:56:22",
- "traceList":[
- {
- "0":{
- "traceId":"ea1bef7f5a16575119402287990d0018",
- "kind":"sr",
- "resultCode":"0",
- "clientAppId":"",
- "parentSpanId":"",
- "orgId":"1",
- "elapsed":20,
- "spanId":"0",
- "appId":"1_1@a48b12f3ecc3d60",
- "clientIp":"",
- "serverAppId":"1_1@a48b12f3ecc3d60",
- "serverIp":"xx.xxx.xxx.xx",
- "httpStatusCode":"200",
- "timestamp":1657511940228
- }
- },
- {
- "0.1":{
- "traceId":"ea1bef7f5a16575119402287990d0018",
- "kind":"sr",
- "resultCode":"0",
- "clientAppId":"1_1@a48b12f3ecc3d60",
- "parentSpanId":"0",
- "orgId":"1",
- "elapsed":14,
- "spanId":"0.1",
- "appId":"1_1@10b8d040e1d0f78",
- "clientIp":"xx.xxx.xxx.xx",
- "serverAppId":"1_1@10b8d040e1d0f78",
- "serverIp":"xx.xxx.xxx.xx",
- "httpStatusCode":"200",
- "timestamp":1657511940232
- }
- },
- {
- "0.1.1":{
- "traceId":"ea1bef7f5a16575119402287990d0018",
- "kind":"sr",
- "resultCode":"0",
- "clientAppId":"1_1@10b8d040e1d0f78",
- "parentSpanId":"0.1",
- "orgId":"1",
- "elapsed":1,
- "spanId":"0.1.1",
- "appId":"1_1@811c06548ae3a13",
- "clientIp":"xx.xxx.xxx.xx",
- "serverAppId":"1_1@811c06548ae3a13",
- "serverIp":"xx.xxx.xxx.xx",
- "httpStatusCode":"",
- "timestamp":1657511940232
- }
- },
- {
- "0.1.2":{
- "traceId":"ea1bef7f5a16575119402287990d0018",
- "kind":"sr",
- "resultCode":"0",
- "clientAppId":"1_1@10b8d040e1d0f78",
- "parentSpanId":"0.1",
- "orgId":"1",
- "elapsed":5,
- "spanId":"0.1.2",
- "appId":"1_1@811c06548ae3a13",
- "clientIp":"xx.xxx.xxx.xx",
- "serverAppId":"1_1@811c06548ae3a13",
- "serverIp":"xx.xxx.xxx.xx",
- "httpStatusCode":"",
- "timestamp":1657511940234
- }
- }
- ],
- "fire_time":"2022-10-08 15:56:29",
- "end_time":"2022-10-08 15:56:29"
-}
-```
\ No newline at end of file
diff --git a/docs/picture/33rocketmq-streams/stage.png b/docs/picture/33rocketmq-streams/stage.png
deleted file mode 100644
index 317fe46b3..000000000
Binary files a/docs/picture/33rocketmq-streams/stage.png and /dev/null differ
diff --git "a/docs/picture/33rocketmq-streams/\346\200\273\344\275\223-1.png" "b/docs/picture/33rocketmq-streams/\346\200\273\344\275\223-1.png"
new file mode 100644
index 000000000..79132fc66
Binary files /dev/null and "b/docs/picture/33rocketmq-streams/\346\200\273\344\275\223-1.png" differ
diff --git "a/docs/picture/33rocketmq-streams/\346\200\273\344\275\223-2.png" "b/docs/picture/33rocketmq-streams/\346\200\273\344\275\223-2.png"
new file mode 100644
index 000000000..f7184ab8e
Binary files /dev/null and "b/docs/picture/33rocketmq-streams/\346\200\273\344\275\223-2.png" differ
diff --git "a/docs/picture/33rocketmq-streams/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" "b/docs/picture/33rocketmq-streams/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png"
deleted file mode 100644
index 5eba9cec2..000000000
Binary files "a/docs/picture/33rocketmq-streams/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" and /dev/null differ
diff --git "a/docs/picture/33rocketmq-streams/\351\242\206\345\237\237\346\250\241\345\236\213-1.png" "b/docs/picture/33rocketmq-streams/\351\242\206\345\237\237\346\250\241\345\236\213-1.png"
new file mode 100644
index 000000000..408f5bd0b
Binary files /dev/null and "b/docs/picture/33rocketmq-streams/\351\242\206\345\237\237\346\250\241\345\236\213-1.png" differ
diff --git "a/docs/picture/33rocketmq-streams/\351\242\206\345\237\237\346\250\241\345\236\213-2.png" "b/docs/picture/33rocketmq-streams/\351\242\206\345\237\237\346\250\241\345\236\213-2.png"
new file mode 100644
index 000000000..07bc742f1
Binary files /dev/null and "b/docs/picture/33rocketmq-streams/\351\242\206\345\237\237\346\250\241\345\236\213-2.png" differ