You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/06/21 03:55:04 UTC

[rocketmq-streams] 05/16: merge from upstream/snapshot-1.0.3

This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch snapshot-1.0.4
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit a539688121c9a3909af0818b9a01f5347548c256
Author: 维章 <un...@gmail.com>
AuthorDate: Mon May 23 16:09:07 2022 +0800

    merge from upstream/snapshot-1.0.3
---
 NOTICE                                             |   2 +-
 README.md                                          |  16 +--
 docs/README.md                                     | 142 +++++++++++++++++++++
 docs/SUMMARY.md                                    |   8 ++
 ...225\264\344\275\223\346\236\266\346\236\204.md" |  33 +++++
 .../2.\346\236\204\345\273\272DataStream.md"       |  73 +++++++++++
 .../3.\345\220\257\345\212\250DataStream.md"       |  53 ++++++++
 ...265\201\350\275\254\350\277\207\347\250\213.md" |  63 +++++++++
 ...256\227\345\255\220\350\247\243\346\236\220.md" |  55 ++++++++
 ...256\236\347\216\260\345\256\271\351\224\231.md" |   0
 "docs/images/Pipeline\347\261\273\345\233\276.png" | Bin 0 -> 44207 bytes
 docs/images/img.png                                | Bin 38684 -> 0 bytes
 docs/images/img_1.png                              | Bin 43711 -> 0 bytes
 docs/images/img_2.png                              | Bin 103151 -> 0 bytes
 docs/images/window.png                             | Bin 0 -> 241692 bytes
 ...75\223\346\236\266\346\236\204\345\233\276.png" | Bin 0 -> 60493 bytes
 ...00\273\344\275\223\350\277\207\347\250\213.png" | Bin 0 -> 44252 bytes
 .../\346\211\251\345\256\271\345\211\215.png"      | Bin 0 -> 56733 bytes
 ...12\266\346\200\201\347\256\227\345\255\220.png" | Bin 0 -> 35766 bytes
 "docs/images/\347\212\266\346\200\201.png"         | Bin 0 -> 47527 bytes
 "docs/images/\347\274\251\345\256\271.png"         | Bin 0 -> 51087 bytes
 quick_start.md => docs/quick_start/README.md       |   0
 quick_start.md                                     |  92 +++++++++----
 .../window/operator/impl/SessionOperator.java      |   8 ++
 .../streams/window/operator/join/JoinWindow.java   |   9 ++
 stream_sink.md                                     |  10 +-
 stream_source.md                                   |   2 +-
 27 files changed, 524 insertions(+), 42 deletions(-)

diff --git a/NOTICE b/NOTICE
index 086ee9fa..a347efb1 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
 Apache RocketMQ
-Copyright 2016-2021 The Apache Software Foundation
+Copyright 2016-2022 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
diff --git a/README.md b/README.md
index 51d9a6cf..f2f475e7 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,11 @@
-# Summary
-
+# RocketMQ Streams 
+[![Build Status](https://app.travis-ci.com/apache/rocketmq-streams.svg?branch=main)](https://app.travis-ci.com/apache/rocketmq-streams)
+[![CodeCov](https://codecov.io/gh/apache/rocketmq-stream/branch/main/graph/badge.svg)](https://app.codecov.io/gh/apache/rocketmq-streams) 
+[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.com/apache/rocketmq-streams/releases)
+[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
+[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
+[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
+[![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
 
 ## [中文文档](./README-chinese.md)
 
@@ -115,9 +121,3 @@ source
     .start();
 ```
 
-=======
-* [Quick Start](quick\_start.md)
-* [创建实时任务数据源](stream\_source.md)
-* [创建实时任务数据输出](stream\_sink.md)
-* [数据处理逻辑](stream\_transform.md)
->>>>>>> 1cd2dd0291dbcab033e6773021ddca13ce819f82
diff --git a/docs/README.md b/docs/README.md
new file mode 100644
index 00000000..9d5e464e
--- /dev/null
+++ b/docs/README.md
@@ -0,0 +1,142 @@
+[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.com/apache/rocketmq-streams/releases)
+[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
+[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
+[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
+[![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
+
+# Features
+
+* 轻量级部署:可以单独部署,也支持集群部署
+* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等
+
+# DataStream Example
+
+```java
+import org.apache.rocketmq.streams.client.transform.DataStream;
+
+DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
+
+    source
+    .fromFile("~/admin/data/text.txt",false)
+    .map(message->message)
+    .toPrint(1)
+    .start();
+```
+
+# Maven Repository
+
+```xml
+
+<dependency>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-streams-clients</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+</dependency>
+```
+
+# Core API
+
+rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求;
+
+## StreamBuilder
+
+StreamBuilder 用于构建流任务的源;
+
++ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务;
+
+## DataStream API
+
+### Source
+
+DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;
+
++ ```fromFile```  从文件中读取数据, 该方法包含俩个参数
+    + ```filePath``` 文件路径,必填参数
+    + ```isJsonData```  是否json数据, 非必填参数, 默认为```true```
+
+
++ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
+    + ```topic``` rocketmq消息队列的topic名称,必填参数
+    + ```groupName``` 消费者组的名称,必填参数
+    + ```isJson``` 是否json格式,非必填参数
+    + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
+
++ ```fromMqtt``` 从满足MQTT协议的终端读取数据, 满足边缘计算的场景,其中包含9个参数
+    + ```url```  mqtt broker的地址,必填参数
+    + ```clientId``` 客户端ID, 必填参数,相同的clientId有负载的作用
+    + ```topic``` topic信息, 必填参数
+    + ```username``` 用户名, 非必填,在mqtt端添加鉴权机制时使用
+    + ```password``` 密码,非必填参数,在mqtt端添加鉴权机制时使用
+    + ```cleanSession``` 是否清理session信息, 非必填,默认为true
+    + ```connectionTimeout``` 连接超时信息, 非必填,默认是10s
+    + ```aliveInterval```  判断连接是否活跃的间隔时间,非必填,默认是60s
+    + ```automaticReconnect``` 连接断开后自动重连机制,非必填,默认是true
+
+
++ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源
+
+### transform
+
+transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类;
+
+DataStream实现了一系列常见的流计算算子
+
++ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
++ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
++ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
++ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
++ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
++ ```operate```  对每个记录执行一次自定义的函数,返回一个新的DataStream
++ ```script```  针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
++ ```toPrint``` 将结果在控制台打印,生成新的DataStream实例
++ ```toFile``` 将结果保存为文件,生成一个新的DataStream实例
++ ```toMqtt``` 将结果输出到满足mqtt协议的设备中,生成一个新的DataStream实例
++ ```toDB``` 将结果保存到数据库
++ ```toRocketmq``` 将结果输出到rocketmq
++ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
++ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
+    + ```count``` 在窗口内计数
+    + ```min``` 获取窗口内统计值的最小值
+    + ```max``` 获取窗口内统计值得最大值
+    + ```avg``` 获取窗口内统计值的平均值
+    + ```sum``` 获取窗口内统计值的加和值
+    + ```reduce``` 在窗口内进行自定义的汇总运算
++ ```join``` 根据条件将俩个流进行内关联
++ ```leftJoin``` 根据条件将俩个流的数据进行左关联
++ ```dimJoin``` 根据条件将流与维表进行内关联,维表的数据可以来自于文件,也可以来自于数据库
++ ```dimLeftJoin``` 根据条件将流与维表进行左关联,维表的数据可以来自于文件,也可以来自于数据库
++ ```union``` 将俩个流进行合并
++ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
++ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等
+
+#### Strategy
+
+策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型;
+
+```java
+//指定checkpoint的存储策略
+source
+    .fromRocketmq("TSG_META_INFO","")
+    .map(message->message+"--")
+    .toPrint(1)
+    .with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
+    .start();
+```
+
+# 运行
+
+Rocketmq-Streams 作为典型的java应用,既可以集成在业务系统里运行,也可以作为一个独立的jar包来运行;
+
+首先对应用的源码进行编译
+
+```shell
+mvn -Prelease-all -DskipTests clean install -U
+```
+
+然后直接通过java指令来运行
+
+```shell
+ java -jar jarName mainClass
+```
+
+更多详细的案例可以看[这里](docs/SUMMARY.md)
\ No newline at end of file
diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md
new file mode 100644
index 00000000..950f99bd
--- /dev/null
+++ b/docs/SUMMARY.md
@@ -0,0 +1,8 @@
+# Summary
+
+* [Introduction](README.md)
+* [Quick Start](quick_start/README.md)
+* [创建实时任务数据源](stream_source/README.md)
+* [创建实时任务数据输出](stream_sink/README.md)
+* [数据处理逻辑](stream_transform/README.md)
+
diff --git "a/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md" "b/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md"
new file mode 100644
index 00000000..8e564a8c
--- /dev/null
+++ "b/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md"
@@ -0,0 +1,33 @@
+### 总体架构
+
+![img.png](../images/总体架构图.png)
+
+数据从RocketMQ中被RocketMQ-streams消费,经过处理最终被写回到RocketMQ。
+如果流处理任务中含有算子groupBy,则需要将数据按照Key进行分组,将分组数据写入shuffle topic。后续算子从
+shuffle topic消费。如果还涉及count之类有状态算子,那么计算时需要读写状态,在窗口触发之后将计算结果写出。
+
+
+### 任务并行度模型
+
+![img_2.png](../images/扩容前.png)
+
+计算实例实质上是依赖了Rocket-streams SDK的client,因此,计算实例消费的MQ依赖RocketMQ rebalance分配,
+计算实例总个数也不能大于消费总MQ个数,否则将有部分计算实例处于等待状态,消费不到数据。
+
+一个计算实例可以消费多个MQ,一个实例内也只有一张计算拓扑图。
+
+### 状态
+![img_3.png](../images/状态.png)
+
+对于有状态算子,他的状态本地依赖RocksDB加速读取,远程依赖Mysql做持久化。允许流计算任务时,可以只依赖本地存储
+RocksDB, 只需要将setLocalStorageOnly设置成true即可。这种情况下可能存在状态丢失。
+
+
+
+### 扩缩容
+
+![img.png](../images/缩容.png)
+
+当计算实例从3个缩容到2个,借助于RocketMQ的rebalance,MQ会在计算实例之间重新分配。
+Instance1上消费的MQ2和MQ3被分配到Instance2和Instance3上,这两个MQ的状态数据也需要迁移到Instance2
+和Instance3上,这也暗示,状态数据是根据源数据分片保存的;扩容则是刚好相反的过程。
diff --git "a/docs/design/2.\346\236\204\345\273\272DataStream.md" "b/docs/design/2.\346\236\204\345\273\272DataStream.md"
new file mode 100644
index 00000000..af12e27a
--- /dev/null
+++ "b/docs/design/2.\346\236\204\345\273\272DataStream.md"
@@ -0,0 +1,73 @@
+DataStreamSource中有一个PipelineBuilder,在后续构建过程中,这个PipelineBuilder会一直向后流传,
+将构建过程中产生的source、stage添加进来;最后在start的时候,真正利用PipelineBuilder构建出拓扑图。
+
+### source类型
+ - 设置source的namespace、configureName;
+ - 将source保存到PipelineBuilder中;
+ - 将source作为source节点保存到PipelineBuilder中的ChainPipeline中;
+
+### ChainStage类型
+
+所有的其他运算,包括map,filter,script,window都会先构建出ChainStage,然后以ChainStage的身份进入
+PipelineBuilder,参加后续构建。
+
+在DataStream中一个典型的添加新算子,过程如下所示:
+```java
+
+public DataStream script(String script) {
+     //将用户定义的cript转化成ChainStage
+     // ChainStage<?> stage = this.mainPipelineBuilder.createStage(new ScriptOperator(script));
+     //将ChainStage添加到PipelineBuilder中,构建拓扑。
+     this.mainPipelineBuilder.setTopologyStages(currentChainStage, stage);
+     //将PipelineBuilder构建成DataStream,向后传递,后续还可以用该PipelineBuilder构建拓扑
+     return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, stage);
+}
+
+```
+
+### 创建ChainStage
+
+PipelineBuilder创建,创建过程中会设置label,并将这个ChainStage添加到PipelineBuilder持有ChainPipeline中
+
+- 把ChainStage添加到pipeline中
+  在构建过程中,所有的添加算子都使用一个共同的PipelineBuilder实例,PipelineBuilder结构如图所示,他持有
+  一个ChainPipeline实例,ChainPipeline实例中含有一个ISource和多个stages,还有一个label与stage的映射关系,
+  以及用于寻找下个stage的label。 
+  ![img.png](../images/Pipeline类图.png)
+在createStage过程中,将chainStage加入到Pipeline中。
+  
+在setTopologyStages 过程中将label加入到Pipeline中;
+
+### 设置拓扑
+```java
+public void setTopologyStages(ChainStage currentChainStage, List<ChainStage> nextStages) {
+        if (isBreak) {
+            return;
+        }
+        if (nextStages == null) {
+            return;
+        }
+        List<String> lableNames = new ArrayList<>();
+        for (ChainStage stage : nextStages) {
+            lableNames.add(stage.getLabel());
+        }
+
+        if (currentChainStage == null) {
+            this.pipeline.setChannelNextStageLabel(lableNames);
+        } else {
+            currentChainStage.setNextStageLabels(lableNames);
+            for (ChainStage stage : nextStages) {
+                stage.getPrevStageLabels().add(currentChainStage.getLabel());
+            }
+        }
+    }
+```
+
+如果是首个ChainStage,则设置下一跳的label;如果不是首个,需要将下个stage的label设置进入当前stage。
+同时,下个stage也需要设置前一个stage的label标签。形成双向链表的结构。
+
+
+
+
+
+
diff --git "a/docs/design/3.\345\220\257\345\212\250DataStream.md" "b/docs/design/3.\345\220\257\345\212\250DataStream.md"
new file mode 100644
index 00000000..91f91f2f
--- /dev/null
+++ "b/docs/design/3.\345\220\257\345\212\250DataStream.md"
@@ -0,0 +1,53 @@
+### Start流程
+
+流式计算在运行时可以拉起多个相同实例进行扩容,所以不能直接启动上述已经构建好的拓扑图,需要将上述构建好的拓扑
+图保存起来,需要扩容时,直接拿出算子的副本,实例化启动即可。 
+
+### 统一管理点
+
+- 加载统一管理点IConfigurableService;
+
+    三种方式存储:Memory, db, file
+  
+- PipelineBuilder的build方法,将构建构成中保存起来的IConfigurable,source和statge都是IConfigurable,
+  保存到IConfigurableService中;
+  
+- IConfigurableService的refreshConfigurable方法;
+
+  1.主要做的事可以概括:从统一管理点加载出组件,赋值,init,在调用后置方法doProcessAfterRefreshConfigurable。
+    
+  2.ChainPipeline的后置方法比较特殊,会调用pipeline中各个组件的后置方法,如果这个组件是普通UDFChainStage,
+  那么将会反序列化,实例成StageBuilder。如果是WindowChainStage,会讲用户数据接收的window实例化出来。
+  
+  3.从IConfigurable中加载实例副本出来;
+
+  4.将实例副本赋值;
+
+  5.初始化实例副本,实例都是AbstractConfigurable的继承类,调用他的的init方法。比如在初始化rocketmqSource
+  的时候,就会在此时调用init方法,先于启动方法调用;
+  
+  6.调用IConfigurable的doProcessAfterRefreshConfigurable方法,目前只有ChainPipeline会调用,
+  (典型的是ChainPipeline),会在此方法中构建label与stage映射的stageMap;设置source;再调用
+  ChainPipeline中各个stage的doProcessAfterRefreshConfigurable方法;
+  
+  7.这里ChainPipeline的stage都是UDFChainStage类似。UDFChainStage的
+  doProcessAfterRefreshConfigurable方法会将之前序列化好的StageBuilder反序列化,成为StageBuilder实例。
+  
+  8.如果这个stage是window类型的WindowChainStage,ChainPipeline调用各个stage的
+  doProcessAfterRefreshConfigurable。这里会将用于数据接收的window实例化赋值;
+  
+  9.OutputChainStage此时会从统一管理点IConfigurableService查询出sink实例,并赋值给自己sink字段;
+
+
+### ChainPipeline的启动
+```java
+pipeline.startChannel();
+```
+
+将ChainPipeline作为整个数据接收的入口,并启动source;
+
+当source有数据进来时,ChainPipeline将会收到数据;具体方法是ChainPipeline的doMessageInner方法;
+
+该方法将数据封装承AbstractContext后,向后传递;
+
+
diff --git "a/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md" "b/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md"
new file mode 100644
index 00000000..1eb791cb
--- /dev/null
+++ "b/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md"
@@ -0,0 +1,63 @@
+###总体过程
+
+![img_1.png](../images/总体过程.png)
+
+数据流转整体过程如图所示,黑色箭头线是数据流,橙色为控制流。数据的整体流向是从source中接收到,经过
+AbstractSource判断是否发出系统消息,在进入ChainPipeline,ChainPipeline根据之间构建好的处理拓扑图,使用
+深度优先策略找出下一个处理节点stage,交给Pipeline。Pipeline发现如果是系统消息则对stage执行特殊的控制逻辑,
+如果不是,则用stage来处理具体数据。
+
+### 无window算子执行流程
+- source从RocketMQ中消费数据,进入RocketMQSource的父类AbstractSource;
+- AbstractSource启动控制流,判断是否数据来自新分片,如果是,首先向下游传递一条NewSplitMessage消息,等待系
+  统消息处理完成返回后,才能继续处理该数据。
+- NewSplitMessage进入Pipeline,如果是系统消息,stage执行该类系统消息对应的控制操作。如果不是系统消息则用
+stage处理数据;
+- Pipeline执行完成后,返回到ChainPipeline,选择下一个stage继续执行;
+- 遍历stage直到结束。  
+  
+### 含有window算子执行流程
+
+![img_2.png](../images/有状态算子.png)
+
+- 数据流和控制流在上述流程一致,即先进入source,然后由AbstractSource判断是否发出发出系统消息,再进入
+  ChainPipeline按照已经构建好的拓扑图执行。
+- 不同的是,如果是window算子,那么这条数据在执行具体计算之前需要先按照groupBy分组,在执行算子,例如count。
+分组操作需要借助于shuffle topic完成,即写入shuffle topic之前先按照groupBy的值,计算数据写入目的
+  MessageQueue,相同groupBy值的数据将被写入一个MessageQueue中。这样shuffle数据被读取时,
+  groupBy值相同的数据总会被一个client处理,达到按照groupBy分组处理的效果。
+  
+- ShuffleChannel会自动订阅、消费shuffle topic。数据会经过shuffle并在ShuffleChannel中再次被消费到。
+- 判断是否是系统消息,如果是,执行该种类系统消息对应的控制流操作。
+- 如果不是系统消息,触发window中算子计算,比如算子是count,就对某个key出现的次数加1;count算子用到的状
+  态会在接收到NewSplitMessage类型系统消息时提前加载好。计算结束后的状态保存到RocksDB或者mysql中。
+
+- window到时间后,将计算结果输出到下游stage继续计算,并清理RocksDB、Mysql中对应的状态。  
+
+
+### 系统消息
+
+#### NewSplitMessage
+当发现数据来自新分片(MessageQueue)时,由AbstractSource产生并向下游拓扑传递。
+
+作用于window算子,使其提前加载该分片对应的状态数据到内存,使得状态数据对该分片数据进行计算时,能使用
+到对应的状态,得出正确的结果。
+
+#### CheckPointMessage
+
+##### 产生时机:
+- 消费分片移除时;
+- RocketMQ-streams向broker提交消费offset时;
+- 处理完一批次消息后;
+
+##### 作用
+- 作用于各个缓存,例如将数据写入shuffle topic之前的WindowCache,使缓存中数据写出到下游。
+- 作用于sink,将sink中缓存而未写出的数据写出;
+- 将有状态算子的状态flush到存储;
+
+#### RemoveSplitMessage
+比较RocketMQ client触发rebalance前后消费的分片,如果某个分片不在被消费,需要将该分片移除,在移除该分配时发出
+RemoveSplitMessage类型消息。
+
+作用于window算子,将RocksDB中状态清除;
+
diff --git "a/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md" "b/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md"
new file mode 100644
index 00000000..4e8be748
--- /dev/null
+++ "b/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md"
@@ -0,0 +1,55 @@
+### window算子初始化
+window的实例化和初始化时机,与普通无状态算子一样,在构建DataStream阶段以stage形式加入pipeline。在启动
+DataStream阶段完成window的初始化。
+
+![img.png](../images/window.png)
+
+- 给window初始化WindowStorage用户状态存储;
+  
+  WindowStorage包括localStorage存储和remoteStorage存储;localStorage使用RocksDB,
+  remoteStorage使用mysql;
+
+- 向window添加一个WindowCache的匿名实例,用于存储写入shuffle topic之前数据;
+- 向window添加SQLCache,作为写入Mysql之前的缓存;
+- 向window添加ShuffleChannel,作为写出shuffle和接收来自shufffle topic数据的通道;
+    
+
+### ShuffleChannel写出shuffle数据
+AbstractShuffleWindow的doMessage方法,将数据写入shuffleChannel
+```java
+public AbstractContext<IMessage> doMessage(IMessage message, AbstractContext context) {
+        shuffleChannel.startChannel();
+        return super.doMessage(message, context);
+}
+```
+
+- shuffleChannel.startChannel
+启动shuffleChannel中的consumer,从shuffletopic中消费数据;如果有消费到数据,将由
+  shuffleChannel的doMessage处理。
+
+- AbstractWindow.doMessage方法
+
+对于一条消息来说,window 首先需要检查是否有窗口实例,如果没有则创建。如果窗口实例已经超过最大的watermark,
+数据丢弃,否则进行消息积累 消息会先经历batchAdd 然后flush加入到windowCache中;windowCache定时触发,加入到
+shuffleMsgCache中,shuffleMsgCache中定时发出,用shuffleMsgCache中的producer写出到rocketmq。
+
+### ShuffleChannel接收到shuffle数据
+ShuffleChannel#doMessage方法;
+
+将shuffle消息加入到shuffleCache中
+
+最终进入ShuffleCache#batchInsert中
+
+WindowOperator#shuffleCalculate中
+
+实际窗口计算:WindowValue#calculate
+
+计算后并不会马上触发窗口,窗口需要定时出发
+
+### window触发
+  WindowFireSource#startSource启动定时任务,1s检查一次窗口是否触发WindowFireSource#fireWindowInstance
+WindowOperator#fireWindowInstance
+
+windowFireSource.executeMessage 
+
+windowFireSource.executeMessage这个方法里面会执行pipeline的下个节点
\ No newline at end of file
diff --git "a/docs/design/6.RocketMQ-streams\345\246\202\344\275\225\345\256\236\347\216\260\345\256\271\351\224\231.md" "b/docs/design/6.RocketMQ-streams\345\246\202\344\275\225\345\256\236\347\216\260\345\256\271\351\224\231.md"
new file mode 100644
index 00000000..e69de29b
diff --git "a/docs/images/Pipeline\347\261\273\345\233\276.png" "b/docs/images/Pipeline\347\261\273\345\233\276.png"
new file mode 100644
index 00000000..dafe81a1
Binary files /dev/null and "b/docs/images/Pipeline\347\261\273\345\233\276.png" differ
diff --git a/docs/images/img.png b/docs/images/img.png
deleted file mode 100644
index b814adf7..00000000
Binary files a/docs/images/img.png and /dev/null differ
diff --git a/docs/images/img_1.png b/docs/images/img_1.png
deleted file mode 100644
index 16a45cc1..00000000
Binary files a/docs/images/img_1.png and /dev/null differ
diff --git a/docs/images/img_2.png b/docs/images/img_2.png
deleted file mode 100644
index 0b75ab05..00000000
Binary files a/docs/images/img_2.png and /dev/null differ
diff --git a/docs/images/window.png b/docs/images/window.png
new file mode 100644
index 00000000..30ba8945
Binary files /dev/null and b/docs/images/window.png differ
diff --git "a/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" "b/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png"
new file mode 100644
index 00000000..5eba9cec
Binary files /dev/null and "b/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" differ
diff --git "a/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png" "b/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png"
new file mode 100644
index 00000000..7a68947e
Binary files /dev/null and "b/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png" differ
diff --git "a/docs/images/\346\211\251\345\256\271\345\211\215.png" "b/docs/images/\346\211\251\345\256\271\345\211\215.png"
new file mode 100644
index 00000000..5232b762
Binary files /dev/null and "b/docs/images/\346\211\251\345\256\271\345\211\215.png" differ
diff --git "a/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png" "b/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png"
new file mode 100644
index 00000000..a9ec479c
Binary files /dev/null and "b/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png" differ
diff --git "a/docs/images/\347\212\266\346\200\201.png" "b/docs/images/\347\212\266\346\200\201.png"
new file mode 100644
index 00000000..e2fd9b2e
Binary files /dev/null and "b/docs/images/\347\212\266\346\200\201.png" differ
diff --git "a/docs/images/\347\274\251\345\256\271.png" "b/docs/images/\347\274\251\345\256\271.png"
new file mode 100644
index 00000000..05dcee41
Binary files /dev/null and "b/docs/images/\347\274\251\345\256\271.png" differ
diff --git a/quick_start.md b/docs/quick_start/README.md
similarity index 100%
copy from quick_start.md
copy to docs/quick_start/README.md
diff --git a/quick_start.md b/quick_start.md
index a60dbb95..adcb529d 100644
--- a/quick_start.md
+++ b/quick_start.md
@@ -1,46 +1,84 @@
-# 快速开发
+## rocketmq-streams 快速搭建
+---
 
-## 引入相关的jar包
+### 前言
 
-```xml
+本文档主要介绍如何基于rocketmq-streams快速搭建流处理任务,搭建过程中某些例子会用到rocketmq,可以参考[rocketmq搭建文档](https://rocketmq.apache.org/docs/quick-start/)
 
-<dependency>
-    <groupId>org.apache.rocketmq</groupId>
-    <artifactId>rocketmq-streams-clients</artifactId>
-</dependency>
+### 1、源码构建
 
-```
+#### 1.1、构建环境
 
-## 开发实时应用程序
+- JDK 1.8 and above
+- Maven 3.2 and above
 
-```java
+#### 1.2、构建Rocketmq-streams
 
-public class RocketmqExample {
+```shell
+git clone https://github.com/apache/rocketmq-streams.git
+cd rocketmq-streams
+mvn clean -DskipTests  install -U
 
-    public static void main(String[] args) {
+```
 
-        DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
+### 2、基于rocketmq-streams创建应用
 
-        dataStream
-            .fromFile("data.csv", false)   //构建实时任务的数据源
-            .map(message -> message.split(","))   //构建实时任务处理的逻辑过程
-            .toPrint(1)   //构建实时任务的输出
-            .start();    //启动实时任务
-    }
-}
+#### 2.1、pom依赖
 
+```xml
+
+<dependency>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-streams-clients</artifactId>
+</dependency>
 ```
 
-## 运行
+#### 2.2、shade clients依赖包
 
-打包
+```xml
 
-```shell
-mvn -Prelease-all -DskipTests clean install -U
+<build>
+    <plugins>
+        <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+            <version>3.2.1</version>
+            <executions>
+                <execution>
+                    <phase>package</phase>
+                    <goals>
+                        <goal>shade</goal>
+                    </goals>
+                    <configuration>
+                        <minimizeJar>false</minimizeJar>
+                        <shadedArtifactAttached>true</shadedArtifactAttached>
+                        <artifactSet>
+                            <includes>
+                                <include>org.apache.rocketmq:rocketmq-streams-clients</include>
+                            </includes>
+                        </artifactSet>
+                    </configuration>
+                </execution>
+            </executions>
+        </plugin>
+    </plugins>
+</build>
 ```
 
-运行
+#### 2.3、编写业务代码
+
+快速编写一个统计页面点击次数的小程序:Please see the [rocketmq-streams-examples](rocketmq-streams-examples/README.md)
+
+#### 2.4、运行
+
+- 前提:在从rocketmq中读取数据做流处理时,需要运行topic在rocketmq中自动创建,因为做groupBy操作时,需要用到rocketmq作为shuffle数据的读写目的地。
+- 命令:
 
-```shell
- java -jar jarName mainClass
 ```
+   java -jar XXXX-shade.jar \ 
+        -Dlog4j.level=ERROR \
+        -Dlog4j.home=/logs  \
+        -Xms1024m \
+        -Xmx1024m 
+```
+
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
index 0c10a9bf..14e1ffd3 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
@@ -524,4 +524,12 @@ public class SessionOperator extends WindowOperator {
         }
         return numer;
     }
+
+    public int getSessionTimeOut() {
+        return sessionTimeOut;
+    }
+
+    public void setSessionTimeOut(int sessionTimeOut) {
+        this.sessionTimeOut = sessionTimeOut;
+    }
 }
\ No newline at end of file
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index f356a286..103f40e3 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@ -60,6 +60,7 @@ public class JoinWindow extends AbstractShuffleWindow {
 
     protected String joinType;//join类型,值为INNER,LEFT
     protected String expression;//条件表达式。在存在非等值比较时使用
+    protected String rightDependentTableName;
 
     @Override
     protected int doFireWindowInstance(WindowInstance instance) {
@@ -557,4 +558,12 @@ public class JoinWindow extends AbstractShuffleWindow {
     public void setExpression(String expression) {
         this.expression = expression;
     }
+
+    public String getRightDependentTableName() {
+        return rightDependentTableName;
+    }
+
+    public void setRightDependentTableName(String rightDependentTableName) {
+        this.rightDependentTableName = rightDependentTableName;
+    }
 }
diff --git a/stream_sink.md b/stream_sink.md
index d7718865..a30aae59 100644
--- a/stream_sink.md
+++ b/stream_sink.md
@@ -54,7 +54,7 @@
 
     String topic=.....; //rocketmq 的topic
     String namesrvAddress=......; //rocketmq的nameserver
-    DataStream dataStream=dataStream.toRocketmq(topic,namesrvAddress);
+    DataStream dataStream=dataStreamSource.toRocketmq(topic,namesrvAddress);
 
 ```
 
@@ -65,7 +65,7 @@
     String topic=.....; //rocketmq 的topic
     String groupName=.....; // rocketmq的消费组
     String namesrvAddress=......; //rocketmq的nameserver
-    DataStream dataStream=dataStream.toRocketmq(topic,groupName,namesrvAddress);
+    DataStream dataStream=dataStreamSource.toRocketmq(topic,groupName,namesrvAddress);
 
 ```
 
@@ -77,7 +77,7 @@
     String groupName=.....; // rocketmq的消费组
     String namesrvAddress=......; //rocketmq的nameserver
     String tags=......; // rocketmq的tag信息
-    DataStream dataStream=dataStream.toRocketmq(topic,tags,groupName,namesrvAddress);
+    DataStream dataStream=dataStreamSource.toRocketmq(topic,tags,groupName,namesrvAddress);
 
 ```
 ## kafka
@@ -95,7 +95,7 @@
     String url=......;
     String clientId=......;
     String topic=......;
-    DataStream dataStream=dataStream.toMqtt(url,cliientId,topic);
+    DataStream dataStream=dataStreamSource.toMqtt(url,cliientId,topic);
 
 ```
 
@@ -108,7 +108,7 @@
     String topic=......;
     String username=......;
     String password=......;
-    DataStream dataStream=dataStream.toMqtt(url,cliientId,topic,username,password);
+    DataStream dataStream=dataStreamSource.toMqtt(url,cliientId,topic,username,password);
 
 ```
 
diff --git a/stream_source.md b/stream_source.md
index 8ae7b7e4..77fb96c4 100644
--- a/stream_source.md
+++ b/stream_source.md
@@ -54,7 +54,7 @@
     String groupName = .....; // rocketmq的消费组
     String namesrvAddress = ......; //rocketmq的nameserver
     Boolean isJsonData = true; //是否json     
-    String tags = ......; //rocketmq的tag信息
+    String tags = ......; // rocketmq的tag信息
     DataStream dataStream = dataStreamSource.fromRocketmq(topic, groupName, tags, isJsonData, namesrvAddress);
 
 ```