You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/01/11 12:06:23 UTC

[rocketmq-site] branch new-official-website updated: [streams doc] modify streas doc

This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch new-official-website
in repository https://gitbox.apache.org/repos/asf/rocketmq-site.git


The following commit(s) were added to refs/heads/new-official-website by this push:
     new 58055efdb [streams doc] modify streas doc
     new b21c85df5 Merge pull request #427 from ni-ze/new-official-website
58055efdb is described below

commit 58055efdb5751476cc4d06dd0cd0ad426e080fe3
Author: 维章 <un...@gmail.com>
AuthorDate: Wed Jan 11 20:01:30 2023 +0800

    [streams doc] modify streas doc
---
 docs/07-streams/30RocketMQ Streams Overview.md     |  14 +-
 docs/07-streams/31RocketMQ Streams Concept.md      | 142 ++++----
 docs/07-streams/32RocketMQ Streams Quick Start.md  | 208 ++++++-----
 docs/07-streams/33RocketMQ Streams In Action.md    | 386 ---------------------
 docs/picture/33rocketmq-streams/stage.png          | Bin 79694 -> 0 bytes
 .../\346\200\273\344\275\223-1.png"                | Bin 0 -> 35338 bytes
 .../\346\200\273\344\275\223-2.png"                | Bin 0 -> 146415 bytes
 ...75\223\346\236\266\346\236\204\345\233\276.png" | Bin 60493 -> 0 bytes
 ...\206\345\237\237\346\250\241\345\236\213-1.png" | Bin 0 -> 21993 bytes
 ...\206\345\237\237\346\250\241\345\236\213-2.png" | Bin 0 -> 34203 bytes
 10 files changed, 197 insertions(+), 553 deletions(-)

diff --git a/docs/07-streams/30RocketMQ Streams Overview.md b/docs/07-streams/30RocketMQ Streams Overview.md
index be7f4eb4a..617516c82 100644
--- a/docs/07-streams/30RocketMQ Streams Overview.md	
+++ b/docs/07-streams/30RocketMQ Streams Overview.md	
@@ -3,11 +3,14 @@ RocketMQ Streams是基于RocketMQ的轻量级流计算引擎。能以SDK方式
 因此具有资源消耗少、扩展性好、支持流计算算子丰富的特点。
 
 ## 整体架构
-![总体架构](../picture/33rocketmq-streams/总体架构图.png)
+![总体架构](../picture/33rocketmq-streams/总体-1.png)
 
 数据从RocketMQ中被RocketMQ-streams消费,经过处理最终被写回到RocketMQ。
-如果流处理任务中含有算子groupBy,则需要将数据按照Key进行分组,将分组数据写入shuffle topic。后续算子从
-shuffle topic消费。如果还涉及count之类有状态算子,那么计算时需要读写状态,在窗口触发之后将计算结果写出。
+
+![总体架构](../picture/33rocketmq-streams/总体-2.png)
+
+数据被RocketMQ Consumer消费,进入处理拓扑被算子处理,如果流处理任务中含有算子keyBy,则需要将数据按照Key进行分组,将分组数据写入shuffle topic。后续算子从
+shuffle topic消费。如果还涉及count之类有状态算子,那么计算时需要读写state topic,计算结束后,将结果写回到RocketMQ中。
 
 
 ## 消费模型
@@ -22,9 +25,8 @@ shuffle topic消费。如果还涉及count之类有状态算子,那么计算
 ## 状态
 ![img_3.png](../picture/33rocketmq-streams/状态存储.png)
 
-对于有状态算子,比如count,需要先对count算子进行分组,然后才能求和。分组算子groupBy会将数据按照分组的key重新写回RocketMQ,并且使相同key写入同一分区(这一过程称作shuffle),
-保证这个含有相同key的数据被同一个消费者消费。 状态本地依赖RocksDB加速读取,远程依赖RocketMQ做持久化,在做checkpoint时将本地RocksDB中状态写入到RocketMQ中。
-允许流计算任务运行时,可以只依赖本地存储RocksDB, 只需要将setLocalStorageOnly设置成true即可。这种情况下可能存在状态丢失,不建议在生产环境使用。
+对于有状态算子,比如count,需要先对count算子进行分组,然后才能求和。分组算子keyBy会将数据按照分组的key重新写回RocketMQ,并且使相同key写入同一分区(这一过程称作shuffle),
+保证这个含有相同key的数据被同一个消费者消费。 状态本地依赖RocksDB加速读取,远程依赖RocketMQ做持久化。
 
 
 ## 扩缩容
diff --git a/docs/07-streams/31RocketMQ Streams Concept.md b/docs/07-streams/31RocketMQ Streams Concept.md
index 3524e5b3d..26e8afdd7 100644
--- a/docs/07-streams/31RocketMQ Streams Concept.md	
+++ b/docs/07-streams/31RocketMQ Streams Concept.md	
@@ -1,83 +1,65 @@
 # RocketMQ Streams 核心概念
 
-## DataStream
-### DataStreamSource
-DataStreamSource 是分段式编程的源头类,用于对接数据源RocketMQ;
-+ ```dataStream(nameSpaceName,pipelineName)``` 返回DataStreamSource实例,用于分段编程实现流计算任务;
-+ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
-    + ```topic``` rocketmq消息队列的topic名称,必填参数
-    + ```groupName``` 消费者组的名称,必填参数
-    + ```namesrvAddress``` RocketMQ集群的namesrv地址
-    + ```isJson``` 是否json格式,非必填参数
-    + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
-
-### DataStream
-
-+ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
-+ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
-+ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
-+ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
-+ ```script```  针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
-+ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
-+ ```toPrint``` 将结果在控制台打印,生成新的DataStream实例
-+ ```toFile``` 将结果保存为文件,生成一个新的DataStream实例
-+ ```toRocketmq``` 将结果输出到rocketmq
-+ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
-    + ```count``` 在窗口内计数
-    + ```min``` 获取窗口内统计值的最小值
-    + ```max``` 获取窗口内统计值得最大值
-    + ```avg``` 获取窗口内统计值的平均值
-    + ```sum``` 获取窗口内统计值的加和值
-    + ```reduce``` 在窗口内进行自定义的汇总运算
-+ ```join``` 根据条件将俩个流进行内关联
-+ ```leftJoin``` 根据条件将俩个流的数据进行左关联
-+ ```union``` 将俩个流进行合并
-+ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
-+ ```setLocalStorageOnly``` 状态是否使用本地存储
-+ ```with``` with算子用来指定计算过程中的相关策略
-
-## ChainPipeline
-数据流转管道,ChainPipeline从一个目标源读取数据,数据一次流过多个stage,组成一个数据处理链条。ChainPipeline间可相互组合成处理拓扑,多个ChainPipeline组成一个流处理任务,
-例如一个流处理任务中,从两个topic中读出数据经过处理后写入到第三个topic,虽然只有一个流处理任务,但是却由两个ChainPipeline组成,一个ChainPipeline包含一个source实例和若干处理数据的stage组成。
-
-## stage
-数据具体运算节点,所有的其他运算,包括map,filter,script,window都会先构建出stage,然后以stage的身份进入ChainPipeline。流计算时,深度优先遍历ChainPipeline中的
-stage,stage中属性nextStageLabel指向下游计算节点。
-![img_2.png](../picture/33rocketmq-streams/stage.png)
-## shuffle
-当需要对数据进行分组统计时,需要针对某个key将数据分开,然后对相同的key进行统计。流计算中,往往将含有相同key的数据发送到同一节点处理。在RocketMQ Streams中,借助RocketMQ来实现这一过程能降低复杂度,
-不需要动态发现某个key应该被发送到下有哪个实例节点上。
-将需要分组的数据,使用key作路由分组,发送回RocketMQ的topic中,含有相同key的数据都在一个分区(messageQueue)中,这个过程成为shuffle。
-下游消费者节点,按照集群消费模式消费数据,一个消费者能消费到含有相同key的所有数据,因此才能做出正确的计算。
-
 ## 领域模型
-领域模型为数据在流处理算子之间传递的数据模型。RocketMQ Streams使用自定义的Message对象作为算子间信息的承载,它具有以下属性:
-```java
-public class Message {
-	private JSONObject message;
-
-    private boolean isJsonMessage = true;
-
-    protected ISystemMessage systemMessage;
-
-    protected MessageHeader header = new MessageHeader();
-    
-}
-```
-
-- message:数据内容,如果原始数据是json形式,则解析成JSONObject,如果原始对象不是,则解析成自定义的对象UserDefinedMessage,他继承自JSONObject;
-- systemMessage:表征数据是数据消息还是系统消息,系统消息是指在特定事件发生时,将产生特定类型的系统消息,下游组件接收到将做出特定处理。目前有下列几种系统消息:发现数据来自新分区产生的新增分区系统消息、不再消费某个分区数据的分区异常系统消息、数据消费位点提交时产生的checkpoint系统消息。
-- header:数据在处理过程中保留的额外信息,用于辅助后续计算。他有以下属性:
-
-|  字段类型   | 字段名称  |   含义   |
-|  ----  | ----  |-----|
-| String  | JOIN_LEFT | join时标记左右窗口|
-| String  | JOIN_RIGHT | join时标记左右窗口|
-|ISource |source | 数据来自的source实例|
-| String| routeLabels| 路由标签,标记会被哪些下游节点处理|
-| String| filterLabels|路由标记,标记不会被哪些下游节点处理|
-|MessageOffset | messageOffset|消息offset|
-|boolean |needFlush |是否需要立即将消息传递到下游|
-| boolean| isSystemMessage|是否是系统消息|
-|String | msgRouteFromLable|消息来自哪个lable|
-|String |originTable |消息来自那张sql源表|
+
+### StreamBuilder
+![img_2.png](../picture/33rocketmq-streams/领域模型-1.png)
+  
+* 一个StreamBuilder实例,有1到N个pipeline,pipeline表示一个数据处理路径;
+* 一个pipeline可以含有1到N个处理节点GroupNode;
+* 一个StreamBuilder实例,有一个TopologyBuilder,TopologyBuilder可构建出数据处理器processor; 
+* 一个JobId对应一个StreamBuilder实例。
+
+### RocketMQStream
+![img_2.png](../picture/33rocketmq-streams/领域模型-2.png)
+
+* 一个RocketMQStream实例,有一个拓扑构建器TopologyBuilder; 
+* 一个RocketMQStream实例,可实例化1到N个worker线程; 
+* 每个线程WorkerThread实例,包含一个engine; 
+* 一个engine包含执行数据处理的所有逻辑,包含一个consumer实例、一个producer实例、一个StateStore实例;
+
+### 流处理实例
+流处理实例表示一个运行RocketMQ Streams的进程;
+
+* 一个流处理实例包含一个StreamBuilder,一个RocketMQStream,一个拓扑图,一到多个pipeline;
+
+
+## StreamBuilder
++ ```StreamBuilder(jobId)``` 构建实例;
++ ```<OUT> RStream<OUT> source(topicName, deserializer) ``` 定义source topic 和反序列化方式;
+
+
+## RStream
++ ```<K> GroupedStream<K, T> keyBy(selectAction)``` 按照特定字段分组;
++ ```<O> RStream<O> map(mapperAction)``` 对数据进行一对一转化;
++ ```RStream<T> filter(predictor)``` 对数据进行过滤
++ ```<VR> RStream<T> flatMap(mapper)```对数据进行一对多转化;
++ ```<T2> JoinedStream<T, T2> join(rightStream)``` 双流Join;
++ ```sink(topicName, serializer)``` 将结果输出到特定topic;
+
+
+## GroupedStream
+对含有相同Key的数据进行操作
++ ```<OUT> GroupedStream<K, Integer> count(selectAction)``` 统计含有某个字段数据的个数;
++ ```GroupedStream<K, V> min(selectAction)``` 对某个字段统计最小值;
++ ```GroupedStream<K, V> max(selectAction)``` 对某个字段统计最大值;
++ ```GroupedStream<K, ? extends Number> sum(selectAction)``` 对某个字段统计和;
++ ```GroupedStream<K, V> filter(predictor)``` 对某个字段进行过滤;
++ ```<OUT> GroupedStream<K, OUT> map(valueMapperAction)``` 对数据进行一对一转化;
++ ```<OUT> GroupedStream<K, OUT> aggregate(accumulator)``` 对数据进行聚合操作,且聚合支持二阶聚合,例如在窗口未触发时添加数据,在窗口触发时计算结果这类算子;
++ ```WindowStream<K, V> window(windowInfo)``` 对窗口划定window;
++ ```GroupedStream<K, V> addGraphNode(name, supplier)``` 底层接口,向流处理拓扑中增加自定义算子;
++ ```RStream<V> toRStream()``` 转化为RStream,只是在接口形式上转化,对数据无任何操作;
++ ```sink(topicName, serializer)``` 按照自定义序列化形式将结果写出到topic;
+
+
+## WindowStream
+对被划分window的数据进行操作
++ ```WindowStream<K, Integer> count()``` 统计窗口内数据个数;
++ ```WindowStream<K, V> filter(predictor)``` 过滤窗口内数据;
++ ```<OUT> WindowStream<K, OUT> map(mapperAction)``` 对窗口内数据一对一转化;
++ ```<OUT> WindowStream<K, OUT> aggregate(aggregateAction)``` 对窗口内数据多对一转化;
++ ```<OUT> WindowStream<K, OUT> aggregate(accumulator)``` 对数据进行聚合操作,且聚合支持二阶聚合,例如在窗口未触发时添加数据,在窗口触发时计算结果这类算子;
++ ```void sink(topicName, serializer)``` 按照自定义序列化形式将结果写出到topic;
+
+
diff --git a/docs/07-streams/32RocketMQ Streams Quick Start.md b/docs/07-streams/32RocketMQ Streams Quick Start.md
index 6deaa45d7..075ce4375 100644
--- a/docs/07-streams/32RocketMQ Streams Quick Start.md	
+++ b/docs/07-streams/32RocketMQ Streams Quick Start.md	
@@ -4,9 +4,11 @@
 
 
 ## RocketMQ Streams工程中运行
-参考RocketMQ Streams工程rocketmq-streams-examples中文档:[examples](https://github.com/apache/rocketmq-streams/tree/main/rocketmq-streams-examples)
-
-rocketmq-streams-examples模块下程序可以直接运行(某些例子需要启动RocketMQ)。
+参考RocketMQ Streams工程rocketmq-streams-examples模块下程序可以直接运行;运行example步骤:
+* 本地启动RocketMQ 5.0及以上版本;
+* 使用mqAdmin创建example中数据源topic;
+* 启动example中例子;
+* 向RocketMQ的源topic中写入合适数据(依据示例而定);
 
 ## RocketMQ Streams以SDK方式被应用依赖
 ### 环境准备
@@ -16,104 +18,148 @@ rocketmq-streams-examples模块下程序可以直接运行(某些例子需要
 
 ### 构建RocketMQ Streams
 
-```shell
-git clone https://github.com/apache/rocketmq-streams.git
-cd rocketmq-streams
-mvn clean -DskipTests install -U
-```
+ 
 ### 添加pom依赖
 
 ```xml
  <dependencies>
     <dependency>
         <groupId>org.apache.rocketmq</groupId>
-        <artifactId>rocketmq-streams-clients</artifactId>
-          <!--替换成最新版本-->
-        <version>${version}</version>
+        <artifactId>rocketmq-streams</artifactId>
+            <!-- 根据需要修改 -->
+        <version>1.1.0</version>
     </dependency>
 </dependencies>
-
-<build>
-    <plugins>
-        <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-shade-plugin</artifactId>
-            <version>3.2.1</version>
-            <executions>
-                <execution>
-                    <phase>package</phase>
-                    <goals>
-                        <goal>shade</goal>
-                    </goals>
-                    <configuration>
-                        <minimizeJar>false</minimizeJar>
-                        <shadedArtifactAttached>true</shadedArtifactAttached>
-                        <artifactSet>
-                            <includes>
-                                <include>org.apache.rocketmq:rocketmq-streams-clients</include>
-                            </includes>
-                        </artifactSet>
-                    </configuration>
-                </execution>
-            </executions>
-        </plugin>
-    </plugins>
-</build>
 ```
 
 ### 编写流计算程序
 ```java
-public class Demo{
-    private static String topicName = "topic-1";
-    private static String groupName = "groupName-1";
-
+public class WordCount {
     public static void main(String[] args) {
-        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
-       
-        source.fromRocketmq(
-                        topicName,
-                        groupName,
-                        NAMESRV_ADDRESS
-                )
-                .map(message -> JSONObject.parseObject((String) message))
-                .filter(message -> ((JSONObject) message).getInteger("score") > 90)
-                .selectFields("name", "subject")
-                .toPrint()
-                .start();
+        StreamBuilder builder = new StreamBuilder("wordCount");
+
+        builder.source("sourceTopic", total -> {
+                    String value = new String(total, StandardCharsets.UTF_8);
+                    return new Pair<>(null, value);
+                })
+                .flatMap((ValueMapperAction<String, List<String>>) value -> {
+                    String[] splits = value.toLowerCase().split("\\W+");
+                    return Arrays.asList(splits);
+                })
+                .keyBy(value -> value)
+                .count()
+                .toRStream()
+                .print();
+
+        TopologyBuilder topologyBuilder = builder.build();
+
+        Properties properties = new Properties();
+        properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
+
+        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {
+            @Override
+            public void run() {
+                rocketMQStream.stop();
+                latch.countDown();
+            }
+        });
 
+        try {
+            rocketMQStream.start();
+            latch.await();
+        } catch (final Throwable e) {
+            System.exit(1);
+        }
+        System.exit(0);
     }
 }
 ```
 
-### 向RocketMQ topic-1中写入数据并观察结果
-如果向topic-1中写入的数据如下:
+### 向RocketMQ sourceTopic中写入数据并观察结果
+如果向sourceTopic中写入的数据如下:每行数据作为一个消息发送;
 ```xml
-{"name":"张三","class":"3","subject":"数学","score":90}
-{"name":"张三","class":"3","subject":"历史","score":81}
-{"name":"张三","class":"3","subject":"英语","score":91}
-{"name":"张三","class":"3","subject":"语文","score":70}
-{"name":"张三","class":"3","subject":"政治","score":84}
-{"name":"张三","class":"3","subject":"地理","score":99}
-{"name":"李四","class":"3","subject":"数学","score":76}
-{"name":"李四","class":"3","subject":"历史","score":83}
-{"name":"李四","class":"3","subject":"英语","score":82}
-{"name":"李四","class":"3","subject":"语文","score":92}
-{"name":"李四","class":"3","subject":"政治","score":97}
-{"name":"李四","class":"3","subject":"地理","score":89}
-{"name":"王五","class":"3","subject":"数学","score":86}
-{"name":"王五","class":"3","subject":"历史","score":88}
-{"name":"王五","class":"3","subject":"英语","score":86}
-{"name":"王五","class":"3","subject":"语文","score":93}
-{"name":"王五","class":"3","subject":"政治","score":99}
-{"name":"王五","class":"3","subject":"地理","score":88}
+"To be, or not to be,--that is the question:--",
+"Whether 'tis nobler in the mind to suffer",
+"The slings and arrows of outrageous fortune",
+"Or to take arms against a sea of troubles,",
+"And by opposing end them?--To die,--to sleep,--",
+"No more; and by a sleep to say we end",
+"The heartache, and the thousand natural shocks",
+"That flesh is heir to,--'tis a consummation",
 ```
-得到结果如下:
+统计单词出现频率,计算结果如下:
 ```xml
-{"subject":"政治","name":"王五"}
-{"subject":"地理","name":"张三"}
-{"subject":"语文","name":"李四"}
-{"subject":"语文","name":"王五"}
-{"subject":"英语","name":"张三"}
-{"subject":"政治","name":"李四"}
+(key=to, value=1)
+(key=be, value=1)
+(key=or, value=1)
+(key=not, value=1)
+(key=to, value=2)
+(key=be, value=2)
+(key=that, value=1)
+(key=is, value=1)
+(key=the, value=1)
+(key=whether, value=1)
+(key=tis, value=1)
+(key=nobler, value=1)
+(key=mind, value=1)
+(key=against, value=1)
+(key=troubles, value=1)
+(key=slings, value=1)
+(key=die, value=1)
+(key=natural, value=1)
+(key=flesh, value=1)
+(key=sea, value=1)
+(key=fortune, value=1)
+(key=shocks, value=1)
+(key=consummation, value=1)
+(key=to, value=3)
+(key=to, value=4)
+(key=to, value=5)
+(key=say, value=1)
+(key=end, value=1)
+(key=end, value=2)
+(key=to, value=6)
+(key=to, value=7)
+(key=to, value=8)
+(key=or, value=2)
+(key=them, value=1)
+(key=take, value=1)
+(key=arms, value=1)
+(key=of, value=1)
+(key=and, value=1)
+(key=of, value=2)
+(key=and, value=2)
+(key=by, value=1)
+(key=sleep, value=1)
+(key=and, value=3)
+(key=by, value=2)
+(key=sleep, value=2)
+(key=and, value=4)
+(key=that, value=2)
+(key=arrows, value=1)
+(key=heir, value=1)
+(key=question, value=1)
+(key=is, value=2)
+(key=the, value=2)
+(key=suffer, value=1)
+(key=a, value=1)
+(key=the, value=3)
+(key=no, value=1)
+(key=a, value=2)
+(key=opposing, value=1)
+(key=the, value=4)
+(key=the, value=5)
+(key=a, value=3)
+(key=in, value=1)
+(key=more, value=1)
+(key=heartache, value=1)
+(key=outrageous, value=1)
+(key=we, value=1)
+(key=thousand, value=1)
+(key=tis, value=2)
 ```
 
diff --git a/docs/07-streams/33RocketMQ Streams In Action.md b/docs/07-streams/33RocketMQ Streams In Action.md
deleted file mode 100644
index 1b9b75b5e..000000000
--- a/docs/07-streams/33RocketMQ Streams In Action.md	
+++ /dev/null
@@ -1,386 +0,0 @@
-# RocketMQ Streams 实战
-
-
-## 背景
-分布式链路追踪能实时、准确的追踪每一次请求经过的服务节点和每个节点的耗时,能帮忙快速定位错误根因,因此被广泛地运用在智能运维领域。那么,如何将分布式链路追踪系统产生的数据实时、准确的计算展示出来,
-就成为搭建分布式链路追踪系统的关键所在。
-
-分布式链路追踪系统中有下面几个核心概念:
-- traceId:唯一对应一次请求。
-- span:一次请求中某两个关键方法之间的调用,一次请求只有一个traceId,但是可以有多个span。
-  - spanId: 唯一标识一个span;
-  - parentSpanId:标识当前span的上一个span的spanId是多少;
-  
-运用上述概念,将一次请求中所产生的数据,处理成请求拓扑图或者链式调用图。由于分布式链路追踪系统产生的数量大、处理延迟要求低(延迟太高的请求链路信息对排查线上问题帮助会减弱),批量处理已经不能满足要求,必须使用流计算。
-
-输入数据如下所示,需要将下列数据处理成一个调用链。
-```json
-{
-    "empty":false,
-    "labels":[
-        "68e51083e737a3f7d9c9e7ab34b1f6a7"
-    ],
-    "spanIndex":{
-        "0.1.1":{
-            "appId":"1_1@811c06548ae3a13",
-            "clientAppId":"1_1@10b8d040e1d0f78",
-            "clientIp":"xx.xxx.xxx.xx",
-            "elapsed":1,
-            "httpStatusCode":"",
-            "kind":"sr",
-            "orgId":"1",
-            "parentSpanId":"0.1",
-            "resultCode":"0",
-            "serverAppId":"1_1@811c06548ae3a13",
-            "serverIp":"xx.xxx.xxx.xx",
-            "spanId":"0.1.1",
-            "timestamp":1657511940232,
-            "traceId":"ea1bef7f5a16575119402287990d0018"
-        }
-    },
-    "traceId":"ea1bef7f5a16575119402287990d0018"
-}
-```
-```json
-{
-    "empty":false,
-    "labels":[
-        "68e51083e737a3f7d9c9e7ab34b1f6a7"
-    ],
-    "spanIndex":{
-        "0.1":{
-            "appId":"1_1@10b8d040e1d0f78",
-            "clientAppId":"1_1@a48b12f3ecc3d60",
-            "clientIp":"xx.xxx.xxx.xx",
-            "elapsed":14,
-            "httpStatusCode":"200",
-            "kind":"sr",
-            "orgId":"1",
-            "parentSpanId":"0",
-            "resultCode":"0",
-            "serverAppId":"1_1@10b8d040e1d0f78",
-            "serverIp":"xx.xxx.xxx.xx",
-            "spanId":"0.1",
-            "timestamp":1657511940232,
-            "traceId":"ea1bef7f5a16575119402287990d0018"
-        }
-    },
-    "traceId":"ea1bef7f5a16575119402287990d0018"
-}
-```
-```json
-{
-    "empty":false,
-    "labels":[
-        "e118372a87c3d7af811e08c6eea829c6"
-    ],
-    "spanIndex":{
-        "0":{
-            "appId":"1_1@a48b12f3ecc3d60",
-            "clientAppId":"",
-            "clientIp":"",
-            "elapsed":20,
-            "httpStatusCode":"200",
-            "kind":"sr",
-            "orgId":"1",
-            "parentSpanId":"",
-            "resultCode":"0",
-            "serverAppId":"1_1@a48b12f3ecc3d60",
-            "serverIp":"xx.xxx.xxx.xx",
-            "spanId":"0",
-            "timestamp":1657511940228,
-            "traceId":"ea1bef7f5a16575119402287990d0018"
-        }
-    },
-    "traceId":"ea1bef7f5a16575119402287990d0018"
-}
-```
-```json
-{
-    "empty":false,
-    "labels":[
-        "68e51083e737a3f7d9c9e7ab34b1f6a7"
-    ],
-    "spanIndex":{
-        "0.1.2":{
-            "appId":"1_1@811c06548ae3a13",
-            "clientAppId":"1_1@10b8d040e1d0f78",
-            "clientIp":"xx.xxx.xxx.xx",
-            "elapsed":5,
-            "httpStatusCode":"",
-            "kind":"sr",
-            "orgId":"1",
-            "parentSpanId":"0.1",
-            "resultCode":"0",
-            "serverAppId":"1_1@811c06548ae3a13",
-            "serverIp":"xx.xxx.xxx.xx",
-            "spanId":"0.1.2",
-            "timestamp":1657511940234,
-            "traceId":"ea1bef7f5a16575119402287990d0018"
-        }
-    },
-    "traceId":"ea1bef7f5a16575119402287990d0018"
-}
-```
-
-上述数据经过一定简化,去掉不必要字段,也只有一个traceId,实际生产中会同时存在多个traceId,但是处理原理一样。为了得到调用链,需要先对traceId进行分组,
-分组之后,同一组内数据traceId相同,需要对多个span按照调用顺序进行排序,最后输出。这样的输出结果中就包含了一次调用的所有span,可以清晰地标识出这次请求每个span
-节点耗时多久,如果请求有错误发生,也会知道错误发生在哪里。下面展示RocketMQ Streams是如何处理这些数据的。
-
-
-## 搭建集群
-
-### 启动RocketMQ
-
-- Linux/Unix/Mac 
-- 64bit JDK 1.8+; 
-- Maven 3.2.x或以上版本; 
-- 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/);
-
-### 本地安装RocketMQ-streams
-
-```shell
-git clone https://github.com/apache/rocketmq-streams.git
-cd rocketmq-streams
-mvn clean -DskipTests install -U
-```
-
-### 构建流计算程序
-
-- 新建IDE工程
-- 添加pom依赖
-```xml
- <dependencies>
-    <dependency>
-        <groupId>org.apache.rocketmq</groupId>
-        <artifactId>rocketmq-streams-clients</artifactId>
-          <!--替换成本地install的rocketmq-streams-->
-        <version>${version}</version>
-    </dependency>
-</dependencies>
-
-<build>
-    <plugins>
-        <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-shade-plugin</artifactId>
-            <version>3.2.1</version>
-            <executions>
-                <execution>
-                    <phase>package</phase>
-                    <goals>
-                        <goal>shade</goal>
-                    </goals>
-                    <configuration>
-                        <minimizeJar>false</minimizeJar>
-                        <shadedArtifactAttached>true</shadedArtifactAttached>
-                        <artifactSet>
-                            <includes>
-                                <include>org.apache.rocketmq:rocketmq-streams-clients</include>
-                            </includes>
-                        </artifactSet>
-                    </configuration>
-                </execution>
-            </executions>
-        </plugin>
-    </plugins>
-</build>
-```
-
-
-
-- 编写流计算程序
-```java
-public class UserTest {
-    public static void main(String[] args) {
-        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
-
-      source.fromRocketmq(
-                      "traceTopic",
-                      "traceGroup",
-                      true,
-                      "127.0.0.1:9876")
-              .map((message -> {
-                JSONObject spanIndex = (JSONObject) ((JSONObject) message).get("spanIndex");
-                JSONObject span = (JSONObject) spanIndex.getInnerMap().values().stream().findFirst().get();
-
-                Object spanId = span.get("spanId");
-                Object parentSpanId = span.get("parentSpanId");
-
-                ((JSONObject) message).put("spanId", spanId);
-                ((JSONObject) message).put("parentSpanId", parentSpanId);
-
-                return message;
-              }))
-              .window(SessionWindow.of(Time.seconds(5)))
-              .groupBy("traceId")
-              //同一traceId中,原始数据中的spanIndex字段数据,放在一起形成新的字段traceList,达到一个traceId包含属于该trace的多个span的目的。
-              .addUDAF(new TestUDAF(), "traceList", "spanIndex")
-              .setLocalStorageOnly(true)
-              .toDataStream()
-              .forEach(new ForEachFunction<JSONObject>() {
-                  @Override
-                  public void foreach(JSONObject data) {
-                      //对同一trace中的span进行排序,使span顺序符合请求发生顺序
-                      JSONArray traceList = (JSONArray) data.get("traceList");
-                      HashMap<String, Object> totalMap = new HashMap<>();
-  
-                      Iterator<Object> iterator = traceList.stream().iterator();
-                      while (iterator.hasNext()) {
-                          String next = (String) iterator.next();
-                          JSONObject spanItem = JSON.parseObject(next);
-    
-                          Set<String> keySet = spanItem.keySet();
-                          for (String key : keySet) {
-                            totalMap.put(key, spanItem);
-                          }
-                      }
-  
-                      ArrayList<String> temp = new ArrayList<>(totalMap.keySet());
-                      Collections.sort(temp);
-  
-                      traceList.clear();
-                      for (String sortKey : temp) {
-                          traceList.add(totalMap.get(sortKey));
-                      }
-                  }
-              })
-              .toPrint(1)
-              .with(WindowStrategy.highPerformance())
-              .start();
-    }
-}
-```
-
-```java
-public class TestUDAF implements IAccumulator<List<String>, TestUDAF.Trace> {
-    public static class Trace {
-        public List<String> result = new ArrayList<>();
-    }
-
-    @Override
-    public Trace createAccumulator() {
-        return new Trace();
-    }
-
-    @Override
-    public List<String> getValue(Trace accumulator) {
-        return accumulator.result;
-    }
-
-    @Override
-    public void accumulate(Trace accumulator, Object... parameters) {
-        if (parameters == null || parameters.length == 0) {
-            return;
-        }
-        if (parameters.length != 1) {
-            throw new IllegalArgumentException("parameters length must be one");
-        }
-
-        JSONObject param = (JSONObject) parameters[0];
-        String result = param.toJSONString();
-
-        if (accumulator == null) {
-            accumulator = new Trace();
-        }
-
-        accumulator.result.add(result);
-    }
-
-    @Override
-    public void merge(Trace accumulator, Iterable<Trace> its) {
-
-    }
-
-    @Override
-    public void retract(Trace accumulator, String... parameters) {
-
-    }
-    
-}
-```
-
-- 输出结果:
-  结果中,一条数据包含一个trace,即一次请求所产生的数据,请求中多个span按照请求发生顺序排列。
-```json
-{
-    "traceId":"ea1bef7f5a16575119402287990d0018",
-    "start_time":"2022-10-08 15:56:22",
-    "traceList":[
-        {
-            "0":{
-                "traceId":"ea1bef7f5a16575119402287990d0018",
-                "kind":"sr",
-                "resultCode":"0",
-                "clientAppId":"",
-                "parentSpanId":"",
-                "orgId":"1",
-                "elapsed":20,
-                "spanId":"0",
-                "appId":"1_1@a48b12f3ecc3d60",
-                "clientIp":"",
-                "serverAppId":"1_1@a48b12f3ecc3d60",
-                "serverIp":"xx.xxx.xxx.xx",
-                "httpStatusCode":"200",
-                "timestamp":1657511940228
-            }
-        },
-        {
-            "0.1":{
-                "traceId":"ea1bef7f5a16575119402287990d0018",
-                "kind":"sr",
-                "resultCode":"0",
-                "clientAppId":"1_1@a48b12f3ecc3d60",
-                "parentSpanId":"0",
-                "orgId":"1",
-                "elapsed":14,
-                "spanId":"0.1",
-                "appId":"1_1@10b8d040e1d0f78",
-                "clientIp":"xx.xxx.xxx.xx",
-                "serverAppId":"1_1@10b8d040e1d0f78",
-                "serverIp":"xx.xxx.xxx.xx",
-                "httpStatusCode":"200",
-                "timestamp":1657511940232
-            }
-        },
-        {
-            "0.1.1":{
-                "traceId":"ea1bef7f5a16575119402287990d0018",
-                "kind":"sr",
-                "resultCode":"0",
-                "clientAppId":"1_1@10b8d040e1d0f78",
-                "parentSpanId":"0.1",
-                "orgId":"1",
-                "elapsed":1,
-                "spanId":"0.1.1",
-                "appId":"1_1@811c06548ae3a13",
-                "clientIp":"xx.xxx.xxx.xx",
-                "serverAppId":"1_1@811c06548ae3a13",
-                "serverIp":"xx.xxx.xxx.xx",
-                "httpStatusCode":"",
-                "timestamp":1657511940232
-            }
-        },
-        {
-            "0.1.2":{
-                "traceId":"ea1bef7f5a16575119402287990d0018",
-                "kind":"sr",
-                "resultCode":"0",
-                "clientAppId":"1_1@10b8d040e1d0f78",
-                "parentSpanId":"0.1",
-                "orgId":"1",
-                "elapsed":5,
-                "spanId":"0.1.2",
-                "appId":"1_1@811c06548ae3a13",
-                "clientIp":"xx.xxx.xxx.xx",
-                "serverAppId":"1_1@811c06548ae3a13",
-                "serverIp":"xx.xxx.xxx.xx",
-                "httpStatusCode":"",
-                "timestamp":1657511940234
-            }
-        }
-    ],
-    "fire_time":"2022-10-08 15:56:29",
-    "end_time":"2022-10-08 15:56:29"
-}
-```
\ No newline at end of file
diff --git a/docs/picture/33rocketmq-streams/stage.png b/docs/picture/33rocketmq-streams/stage.png
deleted file mode 100644
index 317fe46b3..000000000
Binary files a/docs/picture/33rocketmq-streams/stage.png and /dev/null differ
diff --git "a/docs/picture/33rocketmq-streams/\346\200\273\344\275\223-1.png" "b/docs/picture/33rocketmq-streams/\346\200\273\344\275\223-1.png"
new file mode 100644
index 000000000..79132fc66
Binary files /dev/null and "b/docs/picture/33rocketmq-streams/\346\200\273\344\275\223-1.png" differ
diff --git "a/docs/picture/33rocketmq-streams/\346\200\273\344\275\223-2.png" "b/docs/picture/33rocketmq-streams/\346\200\273\344\275\223-2.png"
new file mode 100644
index 000000000..f7184ab8e
Binary files /dev/null and "b/docs/picture/33rocketmq-streams/\346\200\273\344\275\223-2.png" differ
diff --git "a/docs/picture/33rocketmq-streams/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" "b/docs/picture/33rocketmq-streams/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png"
deleted file mode 100644
index 5eba9cec2..000000000
Binary files "a/docs/picture/33rocketmq-streams/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" and /dev/null differ
diff --git "a/docs/picture/33rocketmq-streams/\351\242\206\345\237\237\346\250\241\345\236\213-1.png" "b/docs/picture/33rocketmq-streams/\351\242\206\345\237\237\346\250\241\345\236\213-1.png"
new file mode 100644
index 000000000..408f5bd0b
Binary files /dev/null and "b/docs/picture/33rocketmq-streams/\351\242\206\345\237\237\346\250\241\345\236\213-1.png" differ
diff --git "a/docs/picture/33rocketmq-streams/\351\242\206\345\237\237\346\250\241\345\236\213-2.png" "b/docs/picture/33rocketmq-streams/\351\242\206\345\237\237\346\250\241\345\236\213-2.png"
new file mode 100644
index 000000000..07bc742f1
Binary files /dev/null and "b/docs/picture/33rocketmq-streams/\351\242\206\345\237\237\346\250\241\345\236\213-2.png" differ