You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/06/21 03:55:04 UTC
[rocketmq-streams] 05/16: merge from upstream/snapshot-1.0.3
This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch snapshot-1.0.4
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit a539688121c9a3909af0818b9a01f5347548c256
Author: 维章 <un...@gmail.com>
AuthorDate: Mon May 23 16:09:07 2022 +0800
merge from upstream/snapshot-1.0.3
---
NOTICE | 2 +-
README.md | 16 +--
docs/README.md | 142 +++++++++++++++++++++
docs/SUMMARY.md | 8 ++
...225\264\344\275\223\346\236\266\346\236\204.md" | 33 +++++
.../2.\346\236\204\345\273\272DataStream.md" | 73 +++++++++++
.../3.\345\220\257\345\212\250DataStream.md" | 53 ++++++++
...265\201\350\275\254\350\277\207\347\250\213.md" | 63 +++++++++
...256\227\345\255\220\350\247\243\346\236\220.md" | 55 ++++++++
...256\236\347\216\260\345\256\271\351\224\231.md" | 0
"docs/images/Pipeline\347\261\273\345\233\276.png" | Bin 0 -> 44207 bytes
docs/images/img.png | Bin 38684 -> 0 bytes
docs/images/img_1.png | Bin 43711 -> 0 bytes
docs/images/img_2.png | Bin 103151 -> 0 bytes
docs/images/window.png | Bin 0 -> 241692 bytes
...75\223\346\236\266\346\236\204\345\233\276.png" | Bin 0 -> 60493 bytes
...00\273\344\275\223\350\277\207\347\250\213.png" | Bin 0 -> 44252 bytes
.../\346\211\251\345\256\271\345\211\215.png" | Bin 0 -> 56733 bytes
...12\266\346\200\201\347\256\227\345\255\220.png" | Bin 0 -> 35766 bytes
"docs/images/\347\212\266\346\200\201.png" | Bin 0 -> 47527 bytes
"docs/images/\347\274\251\345\256\271.png" | Bin 0 -> 51087 bytes
quick_start.md => docs/quick_start/README.md | 0
quick_start.md | 92 +++++++++----
.../window/operator/impl/SessionOperator.java | 8 ++
.../streams/window/operator/join/JoinWindow.java | 9 ++
stream_sink.md | 10 +-
stream_source.md | 2 +-
27 files changed, 524 insertions(+), 42 deletions(-)
diff --git a/NOTICE b/NOTICE
index 086ee9fa..a347efb1 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
Apache RocketMQ
-Copyright 2016-2021 The Apache Software Foundation
+Copyright 2016-2022 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
diff --git a/README.md b/README.md
index 51d9a6cf..f2f475e7 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,11 @@
-# Summary
-
+# RocketMQ Streams
+[![Build Status](https://app.travis-ci.com/apache/rocketmq-streams.svg?branch=main)](https://app.travis-ci.com/apache/rocketmq-streams)
+[![CodeCov](https://codecov.io/gh/apache/rocketmq-stream/branch/main/graph/badge.svg)](https://app.codecov.io/gh/apache/rocketmq-streams)
+[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.com/apache/rocketmq-streams/releases)
+[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
+[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
+[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
+[![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
## [中文文档](./README-chinese.md)
@@ -115,9 +121,3 @@ source
.start();
```
-=======
-* [Quick Start](quick\_start.md)
-* [创建实时任务数据源](stream\_source.md)
-* [创建实时任务数据输出](stream\_sink.md)
-* [数据处理逻辑](stream\_transform.md)
->>>>>>> 1cd2dd0291dbcab033e6773021ddca13ce819f82
diff --git a/docs/README.md b/docs/README.md
new file mode 100644
index 00000000..9d5e464e
--- /dev/null
+++ b/docs/README.md
@@ -0,0 +1,142 @@
+[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.com/apache/rocketmq-streams/releases)
+[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
+[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
+[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
+[![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
+
+# Features
+
+* 轻量级部署:可以单独部署,也支持集群部署
+* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等
+
+# DataStream Example
+
+```java
+import org.apache.rocketmq.streams.client.transform.DataStream;
+
+DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
+
+ source
+ .fromFile("~/admin/data/text.txt",false)
+ .map(message->message)
+ .toPrint(1)
+ .start();
+```
+
+# Maven Repository
+
+```xml
+
+<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-clients</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+</dependency>
+```
+
+# Core API
+
+rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求;
+
+## StreamBuilder
+
+StreamBuilder 用于构建流任务的源;
+
++ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务;
+
+## DataStream API
+
+### Source
+
+DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;
+
++ ```fromFile``` 从文件中读取数据, 该方法包含俩个参数
+ + ```filePath``` 文件路径,必填参数
+ + ```isJsonData``` 是否json数据, 非必填参数, 默认为```true```
+
+
++ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
+ + ```topic``` rocketmq消息队列的topic名称,必填参数
+ + ```groupName``` 消费者组的名称,必填参数
+ + ```isJson``` 是否json格式,非必填参数
+ + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
+
++ ```fromMqtt``` 从满足MQTT协议的终端读取数据, 满足边缘计算的场景,其中包含9个参数
+ + ```url``` mqtt broker的地址,必填参数
+ + ```clientId``` 客户端ID, 必填参数,相同的clientId有负载的作用
+ + ```topic``` topic信息, 必填参数
+ + ```username``` 用户名, 非必填,在mqtt端添加鉴权机制时使用
+ + ```password``` 密码,非必填参数,在mqtt端添加鉴权机制时使用
+ + ```cleanSession``` 是否清理session信息, 非必填,默认为true
+ + ```connectionTimeout``` 连接超时信息, 非必填,默认是10s
+ + ```aliveInterval``` 判断连接是否活跃的间隔时间,非必填,默认是60s
+ + ```automaticReconnect``` 连接断开后自动重连机制,非必填,默认是true
+
+
++ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源
+
+### transform
+
+transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类;
+
+DataStream实现了一系列常见的流计算算子
+
++ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
++ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
++ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
++ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
++ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
++ ```operate``` 对每个记录执行一次自定义的函数,返回一个新的DataStream
++ ```script``` 针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
++ ```toPrint``` 将结果在控制台打印,生成新的DataStream实例
++ ```toFile``` 将结果保存为文件,生成一个新的DataStream实例
++ ```toMqtt``` 将结果输出到满足mqtt协议的设备中,生成一个新的DataStream实例
++ ```toDB``` 将结果保存到数据库
++ ```toRocketmq``` 将结果输出到rocketmq
++ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
++ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
+ + ```count``` 在窗口内计数
+ + ```min``` 获取窗口内统计值的最小值
+ + ```max``` 获取窗口内统计值得最大值
+ + ```avg``` 获取窗口内统计值的平均值
+ + ```sum``` 获取窗口内统计值的加和值
+ + ```reduce``` 在窗口内进行自定义的汇总运算
++ ```join``` 根据条件将俩个流进行内关联
++ ```leftJoin``` 根据条件将俩个流的数据进行左关联
++ ```dimJoin``` 根据条件将流与维表进行内关联,维表的数据可以来自于文件,也可以来自于数据库
++ ```dimLeftJoin``` 根据条件将流与维表进行左关联,维表的数据可以来自于文件,也可以来自于数据库
++ ```union``` 将俩个流进行合并
++ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
++ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等
+
+#### Strategy
+
+策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型;
+
+```java
+//指定checkpoint的存储策略
+source
+ .fromRocketmq("TSG_META_INFO","")
+ .map(message->message+"--")
+ .toPrint(1)
+ .with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
+ .start();
+```
+
+# 运行
+
+Rocketmq-Streams 作为典型的java应用,既可以集成在业务系统里运行,也可以作为一个独立的jar包来运行;
+
+首先对应用的源码进行编译
+
+```shell
+mvn -Prelease-all -DskipTests clean install -U
+```
+
+然后直接通过java指令来运行
+
+```shell
+ java -jar jarName mainClass
+```
+
+更多详细的案例可以看[这里](docs/SUMMARY.md)
\ No newline at end of file
diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md
new file mode 100644
index 00000000..950f99bd
--- /dev/null
+++ b/docs/SUMMARY.md
@@ -0,0 +1,8 @@
+# Summary
+
+* [Introduction](README.md)
+* [Quick Start](quick_start/README.md)
+* [创建实时任务数据源](stream_source/README.md)
+* [创建实时任务数据输出](stream_sink/README.md)
+* [数据处理逻辑](stream_transform/README.md)
+
diff --git "a/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md" "b/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md"
new file mode 100644
index 00000000..8e564a8c
--- /dev/null
+++ "b/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md"
@@ -0,0 +1,33 @@
+### 总体架构
+
+![img.png](../images/总体架构图.png)
+
+数据从RocketMQ中被RocketMQ-streams消费,经过处理最终被写回到RocketMQ。
+如果流处理任务中含有算子groupBy,则需要将数据按照Key进行分组,将分组数据写入shuffle topic。后续算子从
+shuffle topic消费。如果还涉及count之类有状态算子,那么计算时需要读写状态,在窗口触发之后将计算结果写出。
+
+
+### 任务并行度模型
+
+![img_2.png](../images/扩容前.png)
+
+计算实例实质上是依赖了Rocket-streams SDK的client,因此,计算实例消费的MQ依赖RocketMQ rebalance分配,
+计算实例总个数也不能大于消费总MQ个数,否则将有部分计算实例处于等待状态,消费不到数据。
+
+一个计算实例可以消费多个MQ,一个实例内也只有一张计算拓扑图。
+
+### 状态
+![img_3.png](../images/状态.png)
+
+对于有状态算子,他的状态本地依赖RocksDB加速读取,远程依赖Mysql做持久化。允许流计算任务时,可以只依赖本地存储
+RocksDB, 只需要将setLocalStorageOnly设置成true即可。这种情况下可能存在状态丢失。
+
+
+
+### 扩缩容
+
+![img.png](../images/缩容.png)
+
+当计算实例从3个缩容到2个,借助于RocketMQ的rebalance,MQ会在计算实例之间重新分配。
+Instance1上消费的MQ2和MQ3被分配到Instance2和Instance3上,这两个MQ的状态数据也需要迁移到Instance2
+和Instance3上,这也暗示,状态数据是根据源数据分片保存的;扩容则是刚好相反的过程。
diff --git "a/docs/design/2.\346\236\204\345\273\272DataStream.md" "b/docs/design/2.\346\236\204\345\273\272DataStream.md"
new file mode 100644
index 00000000..af12e27a
--- /dev/null
+++ "b/docs/design/2.\346\236\204\345\273\272DataStream.md"
@@ -0,0 +1,73 @@
+DataStreamSource中有一个PipelineBuilder,在后续构建过程中,这个PipelineBuilder会一直向后流传,
+将构建过程中产生的source、stage添加进来;最后在start的时候,真正利用PipelineBuilder构建出拓扑图。
+
+### source类型
+ - 设置source的namespace、configureName;
+ - 将source保存到PipelineBuilder中;
+ - 将source作为source节点保存到PipelineBuilder中的ChainPipeline中;
+
+### ChainStage类型
+
+所有的其他运算,包括map,filter,script,window都会先构建出ChainStage,然后以ChainStage的身份进入
+PipelineBuilder,参加后续构建。
+
+在DataStream中一个典型的添加新算子,过程如下所示:
+```java
+
+public DataStream script(String script) {
+ //将用户定义的cript转化成ChainStage
+ // ChainStage<?> stage = this.mainPipelineBuilder.createStage(new ScriptOperator(script));
+ //将ChainStage添加到PipelineBuilder中,构建拓扑。
+ this.mainPipelineBuilder.setTopologyStages(currentChainStage, stage);
+ //将PipelineBuilder构建成DataStream,向后传递,后续还可以用该PipelineBuilder构建拓扑
+ return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, stage);
+}
+
+```
+
+### 创建ChainStage
+
+PipelineBuilder创建,创建过程中会设置label,并将这个ChainStage添加到PipelineBuilder持有ChainPipeline中
+
+- 把ChainStage添加到pipeline中
+ 在构建过程中,所有的添加算子都使用一个共同的PipelineBuilder实例,PipelineBuilder结构如图所示,他持有
+ 一个ChainPipeline实例,ChainPipeline实例中含有一个ISource和多个stages,还有一个label与stage的映射关系,
+ 以及用于寻找下个stage的label。
+ ![img.png](../images/Pipeline类图.png)
+在createStage过程中,将chainStage加入到Pipeline中。
+
+在setTopologyStages 过程中将label加入到Pipeline中;
+
+### 设置拓扑
+```java
+public void setTopologyStages(ChainStage currentChainStage, List<ChainStage> nextStages) {
+ if (isBreak) {
+ return;
+ }
+ if (nextStages == null) {
+ return;
+ }
+ List<String> lableNames = new ArrayList<>();
+ for (ChainStage stage : nextStages) {
+ lableNames.add(stage.getLabel());
+ }
+
+ if (currentChainStage == null) {
+ this.pipeline.setChannelNextStageLabel(lableNames);
+ } else {
+ currentChainStage.setNextStageLabels(lableNames);
+ for (ChainStage stage : nextStages) {
+ stage.getPrevStageLabels().add(currentChainStage.getLabel());
+ }
+ }
+ }
+```
+
+如果是首个ChainStage,则设置下一跳的label;如果不是首个,需要将下个stage的label设置进入当前stage。
+同时,下个stage也需要设置前一个stage的label标签。形成双向链表的结构。
+
+
+
+
+
+
diff --git "a/docs/design/3.\345\220\257\345\212\250DataStream.md" "b/docs/design/3.\345\220\257\345\212\250DataStream.md"
new file mode 100644
index 00000000..91f91f2f
--- /dev/null
+++ "b/docs/design/3.\345\220\257\345\212\250DataStream.md"
@@ -0,0 +1,53 @@
+### Start流程
+
+流式计算在运行时可以拉起多个相同实例进行扩容,所以不能直接启动上述已经构建好的拓扑图,需要将上述构建好的拓扑
+图保存起来,需要扩容时,直接拿出算子的副本,实例化启动即可。
+
+### 统一管理点
+
+- 加载统一管理点IConfigurableService;
+
+ 三种方式存储:Memory, db, file
+
+- PipelineBuilder的build方法,将构建构成中保存起来的IConfigurable,source和statge都是IConfigurable,
+ 保存到IConfigurableService中;
+
+- IConfigurableService的refreshConfigurable方法;
+
+ 1.主要做的事可以概括:从统一管理点加载出组件,赋值,init,在调用后置方法doProcessAfterRefreshConfigurable。
+
+ 2.ChainPipeline的后置方法比较特殊,会调用pipeline中各个组件的后置方法,如果这个组件是普通UDFChainStage,
+ 那么将会反序列化,实例成StageBuilder。如果是WindowChainStage,会讲用户数据接收的window实例化出来。
+
+ 3.从IConfigurable中加载实例副本出来;
+
+ 4.将实例副本赋值;
+
+ 5.初始化实例副本,实例都是AbstractConfigurable的继承类,调用他的的init方法。比如在初始化rocketmqSource
+ 的时候,就会在此时调用init方法,先于启动方法调用;
+
+ 6.调用IConfigurable的doProcessAfterRefreshConfigurable方法,目前只有ChainPipeline会调用,
+ (典型的是ChainPipeline),会在此方法中构建label与stage映射的stageMap;设置source;再调用
+ ChainPipeline中各个stage的doProcessAfterRefreshConfigurable方法;
+
+ 7.这里ChainPipeline的stage都是UDFChainStage类似。UDFChainStage的
+ doProcessAfterRefreshConfigurable方法会将之前序列化好的StageBuilder反序列化,成为StageBuilder实例。
+
+ 8.如果这个stage是window类型的WindowChainStage,ChainPipeline调用各个stage的
+ doProcessAfterRefreshConfigurable。这里会将用于数据接收的window实例化赋值;
+
+ 9.OutputChainStage此时会从统一管理点IConfigurableService查询出sink实例,并赋值给自己sink字段;
+
+
+### ChainPipeline的启动
+```java
+pipeline.startChannel();
+```
+
+将ChainPipeline作为整个数据接收的入口,并启动source;
+
+当source有数据进来时,ChainPipeline将会收到数据;具体方法是ChainPipeline的doMessageInner方法;
+
+该方法将数据封装承AbstractContext后,向后传递;
+
+
diff --git "a/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md" "b/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md"
new file mode 100644
index 00000000..1eb791cb
--- /dev/null
+++ "b/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md"
@@ -0,0 +1,63 @@
+###总体过程
+
+![img_1.png](../images/总体过程.png)
+
+数据流转整体过程如图所示,黑色箭头线是数据流,橙色为控制流。数据的整体流向是从source中接收到,经过
+AbstractSource判断是否发出系统消息,在进入ChainPipeline,ChainPipeline根据之间构建好的处理拓扑图,使用
+深度优先策略找出下一个处理节点stage,交给Pipeline。Pipeline发现如果是系统消息则对stage执行特殊的控制逻辑,
+如果不是,则用stage来处理具体数据。
+
+### 无window算子执行流程
+- source从RocketMQ中消费数据,进入RocketMQSource的父类AbstractSource;
+- AbstractSource启动控制流,判断是否数据来自新分片,如果是,首先向下游传递一条NewSplitMessage消息,等待系
+ 统消息处理完成返回后,才能继续处理该数据。
+- NewSplitMessage进入Pipeline,如果是系统消息,stage执行该类系统消息对应的控制操作。如果不是系统消息则用
+stage处理数据;
+- Pipeline执行完成后,返回到ChainPipeline,选择下一个stage继续执行;
+- 遍历stage直到结束。
+
+### 含有window算子执行流程
+
+![img_2.png](../images/有状态算子.png)
+
+- 数据流和控制流在上述流程一致,即先进入source,然后由AbstractSource判断是否发出发出系统消息,再进入
+ ChainPipeline按照已经构建好的拓扑图执行。
+- 不同的是,如果是window算子,那么这条数据在执行具体计算之前需要先按照groupBy分组,在执行算子,例如count。
+分组操作需要借助于shuffle topic完成,即写入shuffle topic之前先按照groupBy的值,计算数据写入目的
+ MessageQueue,相同groupBy值的数据将被写入一个MessageQueue中。这样shuffle数据被读取时,
+ groupBy值相同的数据总会被一个client处理,达到按照groupBy分组处理的效果。
+
+- ShuffleChannel会自动订阅、消费shuffle topic。数据会经过shuffle并在ShuffleChannel中再次被消费到。
+- 判断是否是系统消息,如果是,执行该种类系统消息对应的控制流操作。
+- 如果不是系统消息,触发window中算子计算,比如算子是count,就对某个key出现的次数加1;count算子用到的状
+ 态会在接收到NewSplitMessage类型系统消息时提前加载好。计算结束后的状态保存到RocksDB或者mysql中。
+
+- window到时间后,将计算结果输出到下游stage继续计算,并清理RocksDB、Mysql中对应的状态。
+
+
+### 系统消息
+
+#### NewSplitMessage
+当发现数据来自新分片(MessageQueue)时,由AbstractSource产生并向下游拓扑传递。
+
+作用于window算子,使其提前加载该分片对应的状态数据到内存,使得状态数据对该分片数据进行计算时,能使用
+到对应的状态,得出正确的结果。
+
+#### CheckPointMessage
+
+##### 产生时机:
+- 消费分片移除时;
+- RocketMQ-streams向broker提交消费offset时;
+- 处理完一批次消息后;
+
+##### 作用
+- 作用于各个缓存,例如将数据写入shuffle topic之前的WindowCache,使缓存中数据写出到下游。
+- 作用于sink,将sink中缓存而未写出的数据写出;
+- 将有状态算子的状态flush到存储;
+
+#### RemoveSplitMessage
+比较RocketMQ client触发rebalance前后消费的分片,如果某个分片不在被消费,需要将该分片移除,在移除该分配时发出
+RemoveSplitMessage类型消息。
+
+作用于window算子,将RocksDB中状态清除;
+
diff --git "a/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md" "b/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md"
new file mode 100644
index 00000000..4e8be748
--- /dev/null
+++ "b/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md"
@@ -0,0 +1,55 @@
+### window算子初始化
+window的实例化和初始化时机,与普通无状态算子一样,在构建DataStream阶段以stage形式加入pipeline。在启动
+DataStream阶段完成window的初始化。
+
+![img.png](../images/window.png)
+
+- 给window初始化WindowStorage用户状态存储;
+
+ WindowStorage包括localStorage存储和remoteStorage存储;localStorage使用RocksDB,
+ remoteStorage使用mysql;
+
+- 向window添加一个WindowCache的匿名实例,用于存储写入shuffle topic之前数据;
+- 向window添加SQLCache,作为写入Mysql之前的缓存;
+- 向window添加ShuffleChannel,作为写出shuffle和接收来自shufffle topic数据的通道;
+
+
+### ShuffleChannel写出shuffle数据
+AbstractShuffleWindow的doMessage方法,将数据写入shuffleChannel
+```java
+public AbstractContext<IMessage> doMessage(IMessage message, AbstractContext context) {
+ shuffleChannel.startChannel();
+ return super.doMessage(message, context);
+}
+```
+
+- shuffleChannel.startChannel
+启动shuffleChannel中的consumer,从shuffletopic中消费数据;如果有消费到数据,将由
+ shuffleChannel的doMessage处理。
+
+- AbstractWindow.doMessage方法
+
+对于一条消息来说,window 首先需要检查是否有窗口实例,如果没有则创建。如果窗口实例已经超过最大的watermark,
+数据丢弃,否则进行消息积累 消息会先经历batchAdd 然后flush加入到windowCache中;windowCache定时触发,加入到
+shuffleMsgCache中,shuffleMsgCache中定时发出,用shuffleMsgCache中的producer写出到rocketmq。
+
+### ShuffleChannel接收到shuffle数据
+ShuffleChannel#doMessage方法;
+
+将shuffle消息加入到shuffleCache中
+
+最终进入ShuffleCache#batchInsert中
+
+WindowOperator#shuffleCalculate中
+
+实际窗口计算:WindowValue#calculate
+
+计算后并不会马上触发窗口,窗口需要定时出发
+
+### window触发
+ WindowFireSource#startSource启动定时任务,1s检查一次窗口是否触发WindowFireSource#fireWindowInstance
+WindowOperator#fireWindowInstance
+
+windowFireSource.executeMessage
+
+windowFireSource.executeMessage这个方法里面会执行pipeline的下个节点
\ No newline at end of file
diff --git "a/docs/design/6.RocketMQ-streams\345\246\202\344\275\225\345\256\236\347\216\260\345\256\271\351\224\231.md" "b/docs/design/6.RocketMQ-streams\345\246\202\344\275\225\345\256\236\347\216\260\345\256\271\351\224\231.md"
new file mode 100644
index 00000000..e69de29b
diff --git "a/docs/images/Pipeline\347\261\273\345\233\276.png" "b/docs/images/Pipeline\347\261\273\345\233\276.png"
new file mode 100644
index 00000000..dafe81a1
Binary files /dev/null and "b/docs/images/Pipeline\347\261\273\345\233\276.png" differ
diff --git a/docs/images/img.png b/docs/images/img.png
deleted file mode 100644
index b814adf7..00000000
Binary files a/docs/images/img.png and /dev/null differ
diff --git a/docs/images/img_1.png b/docs/images/img_1.png
deleted file mode 100644
index 16a45cc1..00000000
Binary files a/docs/images/img_1.png and /dev/null differ
diff --git a/docs/images/img_2.png b/docs/images/img_2.png
deleted file mode 100644
index 0b75ab05..00000000
Binary files a/docs/images/img_2.png and /dev/null differ
diff --git a/docs/images/window.png b/docs/images/window.png
new file mode 100644
index 00000000..30ba8945
Binary files /dev/null and b/docs/images/window.png differ
diff --git "a/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" "b/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png"
new file mode 100644
index 00000000..5eba9cec
Binary files /dev/null and "b/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" differ
diff --git "a/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png" "b/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png"
new file mode 100644
index 00000000..7a68947e
Binary files /dev/null and "b/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png" differ
diff --git "a/docs/images/\346\211\251\345\256\271\345\211\215.png" "b/docs/images/\346\211\251\345\256\271\345\211\215.png"
new file mode 100644
index 00000000..5232b762
Binary files /dev/null and "b/docs/images/\346\211\251\345\256\271\345\211\215.png" differ
diff --git "a/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png" "b/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png"
new file mode 100644
index 00000000..a9ec479c
Binary files /dev/null and "b/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png" differ
diff --git "a/docs/images/\347\212\266\346\200\201.png" "b/docs/images/\347\212\266\346\200\201.png"
new file mode 100644
index 00000000..e2fd9b2e
Binary files /dev/null and "b/docs/images/\347\212\266\346\200\201.png" differ
diff --git "a/docs/images/\347\274\251\345\256\271.png" "b/docs/images/\347\274\251\345\256\271.png"
new file mode 100644
index 00000000..05dcee41
Binary files /dev/null and "b/docs/images/\347\274\251\345\256\271.png" differ
diff --git a/quick_start.md b/docs/quick_start/README.md
similarity index 100%
copy from quick_start.md
copy to docs/quick_start/README.md
diff --git a/quick_start.md b/quick_start.md
index a60dbb95..adcb529d 100644
--- a/quick_start.md
+++ b/quick_start.md
@@ -1,46 +1,84 @@
-# 快速开发
+## rocketmq-streams 快速搭建
+---
-## 引入相关的jar包
+### 前言
-```xml
+本文档主要介绍如何基于rocketmq-streams快速搭建流处理任务,搭建过程中某些例子会用到rocketmq,可以参考[rocketmq搭建文档](https://rocketmq.apache.org/docs/quick-start/)
-<dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-streams-clients</artifactId>
-</dependency>
+### 1、源码构建
-```
+#### 1.1、构建环境
-## 开发实时应用程序
+- JDK 1.8 and above
+- Maven 3.2 and above
-```java
+#### 1.2、构建Rocketmq-streams
-public class RocketmqExample {
+```shell
+git clone https://github.com/apache/rocketmq-streams.git
+cd rocketmq-streams
+mvn clean -DskipTests install -U
- public static void main(String[] args) {
+```
- DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
+### 2、基于rocketmq-streams创建应用
- dataStream
- .fromFile("data.csv", false) //构建实时任务的数据源
- .map(message -> message.split(",")) //构建实时任务处理的逻辑过程
- .toPrint(1) //构建实时任务的输出
- .start(); //启动实时任务
- }
-}
+#### 2.1、pom依赖
+```xml
+
+<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-clients</artifactId>
+</dependency>
```
-## 运行
+#### 2.2、shade clients依赖包
-打包
+```xml
-```shell
-mvn -Prelease-all -DskipTests clean install -U
+<build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <minimizeJar>false</minimizeJar>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <artifactSet>
+ <includes>
+ <include>org.apache.rocketmq:rocketmq-streams-clients</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+</build>
```
-运行
+#### 2.3、编写业务代码
+
+快速编写一个统计页面点击次数的小程序:Please see the [rocketmq-streams-examples](rocketmq-streams-examples/README.md)
+
+#### 2.4、运行
+
+- 前提:在从rocketmq中读取数据做流处理时,需要运行topic在rocketmq中自动创建,因为做groupBy操作时,需要用到rocketmq作为shuffle数据的读写目的地。
+- 命令:
-```shell
- java -jar jarName mainClass
```
+ java -jar XXXX-shade.jar \
+ -Dlog4j.level=ERROR \
+ -Dlog4j.home=/logs \
+ -Xms1024m \
+ -Xmx1024m
+```
+
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
index 0c10a9bf..14e1ffd3 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
@@ -524,4 +524,12 @@ public class SessionOperator extends WindowOperator {
}
return numer;
}
+
+ public int getSessionTimeOut() {
+ return sessionTimeOut;
+ }
+
+ public void setSessionTimeOut(int sessionTimeOut) {
+ this.sessionTimeOut = sessionTimeOut;
+ }
}
\ No newline at end of file
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index f356a286..103f40e3 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@ -60,6 +60,7 @@ public class JoinWindow extends AbstractShuffleWindow {
protected String joinType;//join类型,值为INNER,LEFT
protected String expression;//条件表达式。在存在非等值比较时使用
+ protected String rightDependentTableName;
@Override
protected int doFireWindowInstance(WindowInstance instance) {
@@ -557,4 +558,12 @@ public class JoinWindow extends AbstractShuffleWindow {
public void setExpression(String expression) {
this.expression = expression;
}
+
+ public String getRightDependentTableName() {
+ return rightDependentTableName;
+ }
+
+ public void setRightDependentTableName(String rightDependentTableName) {
+ this.rightDependentTableName = rightDependentTableName;
+ }
}
diff --git a/stream_sink.md b/stream_sink.md
index d7718865..a30aae59 100644
--- a/stream_sink.md
+++ b/stream_sink.md
@@ -54,7 +54,7 @@
String topic=.....; //rocketmq 的topic
String namesrvAddress=......; //rocketmq的nameserver
- DataStream dataStream=dataStream.toRocketmq(topic,namesrvAddress);
+ DataStream dataStream=dataStreamSource.toRocketmq(topic,namesrvAddress);
```
@@ -65,7 +65,7 @@
String topic=.....; //rocketmq 的topic
String groupName=.....; // rocketmq的消费组
String namesrvAddress=......; //rocketmq的nameserver
- DataStream dataStream=dataStream.toRocketmq(topic,groupName,namesrvAddress);
+ DataStream dataStream=dataStreamSource.toRocketmq(topic,groupName,namesrvAddress);
```
@@ -77,7 +77,7 @@
String groupName=.....; // rocketmq的消费组
String namesrvAddress=......; //rocketmq的nameserver
String tags=......; // rocketmq的tag信息
- DataStream dataStream=dataStream.toRocketmq(topic,tags,groupName,namesrvAddress);
+ DataStream dataStream=dataStreamSource.toRocketmq(topic,tags,groupName,namesrvAddress);
```
## kafka
@@ -95,7 +95,7 @@
String url=......;
String clientId=......;
String topic=......;
- DataStream dataStream=dataStream.toMqtt(url,cliientId,topic);
+ DataStream dataStream=dataStreamSource.toMqtt(url,cliientId,topic);
```
@@ -108,7 +108,7 @@
String topic=......;
String username=......;
String password=......;
- DataStream dataStream=dataStream.toMqtt(url,cliientId,topic,username,password);
+ DataStream dataStream=dataStreamSource.toMqtt(url,cliientId,topic,username,password);
```
diff --git a/stream_source.md b/stream_source.md
index 8ae7b7e4..77fb96c4 100644
--- a/stream_source.md
+++ b/stream_source.md
@@ -54,7 +54,7 @@
String groupName = .....; // rocketmq的消费组
String namesrvAddress = ......; //rocketmq的nameserver
Boolean isJsonData = true; //是否json
- String tags = ......; //rocketmq的tag信息
+ String tags = ......; // rocketmq的tag信息
DataStream dataStream = dataStreamSource.fromRocketmq(topic, groupName, tags, isJsonData, namesrvAddress);
```