You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/05/05 12:12:02 UTC

[rocketmq-streams] branch main_develop updated: Snapshot 1.0.2 (#148)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main_develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main_develop by this push:
     new a740bdd  Snapshot 1.0.2 (#148)
a740bdd is described below

commit a740bdd6f4bceb389152c54f6257bb5e3badf810
Author: Jason J J Cheng <ju...@alibaba-inc.com>
AuthorDate: Thu May 5 20:11:57 2022 +0800

    Snapshot 1.0.2 (#148)
    
    * GitBook: [#2] No subject
    
    * GitBook: [#3] No subject
    
    * SDK增加udaf的支持
    
    * SDK增加UDAF的支持
    
    * SDK增加udaf的支持
    
    * SDK增加udaf的支持
---
 README-chinese.md                                  | 150 -----------------
 README.md                                          | 146 +---------------
 SUMMARY.md                                         |   7 +
 docs/README.md                                     | 150 -----------------
 docs/SUMMARY.md                                    |   8 -
 docs/quick_start/README.md                         |  46 ------
 pom.xml                                            |  13 +-
 quick_start.md                                     |  92 +++--------
 .../streams/db/sink/AbstractMultiTableSink.java    |  12 +-
 .../streams/db/sink/DynamicMultipleDBSink.java     |  11 +-
 .../streams/db/sink/SelfMultiTableSink.java        |   2 +-
 .../streams/db/sink/SplitBySerialNumber.java       |   2 +-
 .../streams/db/sink/SplitByTimeMultiTableSink.java |   2 +-
 .../rocketmq/streams/es/sink/ESChannelBuilder.java |  65 --------
 .../rocketmq/streams/es/sink/ESSinkBuilder.java    |   1 +
 .../streams/es/sink/ESSinkOnlyChannel.java         |  33 ++++
 .../streams/kafka/KafkaChannelBuilder.java         |  13 +-
 .../rocketmq/streams/kafka/source/KafkaSource.java |   2 +-
 .../rocketmq/streams/mqtt/source/PahoSource.java   |  17 +-
 rocketmq-streams-channel-rocketmq/pom.xml          |  20 ---
 .../apache/rocketmq/streams/debug/DebugWriter.java |  92 ++++++-----
 .../apache/rocketmq/streams/sink/RocketMQSink.java |  44 ++---
 .../rocketmq/streams/syslog/SyslogChannel.java     |  35 ++--
 .../streams/syslog/SyslogChannelBuilder.java       |   4 +
 .../streams/syslog/SyslogChannelManager.java       |   6 +-
 .../rocketmq/streams/syslog/SyslogServer.java      |  35 ++--
 .../rocketmq/streams/syslog/SyslogClient.java      |  22 ++-
 rocketmq-streams-clients/pom.xml                   |   8 +
 .../streams/client/transform/WindowStream.java     |  17 +-
 rocketmq-streams-commons/pom.xml                   |   4 +-
 .../streams/common/channel/AbstractChannel.java    |  11 +-
 .../AbstractSupportShuffleChannelBuilder.java      |   2 +-
 .../channel/builder/IShuffleChannelBuilder.java    |   4 +-
 .../streams/common/channel/impl/view/ViewSink.java |  25 ---
 .../streams/common/channel/sink/AbstractSink.java  |   6 +-
 .../streams/common/channel/sink/ISink.java         |   2 +-
 .../impl/AbstractMultiSplitMessageCache.java       |  19 ++-
 .../common/channel/source/AbstractSource.java      |   3 +-
 .../channel/source/AbstractUnreliableSource.java   |   2 +-
 .../common/configurable/BasedConfigurable.java     |   4 +-
 .../streams/common/context/AbstractContext.java    |   2 +
 .../common/monitor/ConsoleMonitorManager.java      |  23 +--
 .../streams/common/monitor/DataSyncConstants.java  |  10 ++
 .../rocketmq/streams/common/monitor/IMonitor.java  |  16 --
 .../common/monitor/TopologyFilterMonitor.java      |  16 --
 .../service/impl/RocketMQMonitorDataSyncImpl.java  |  32 +++-
 .../streams/common/schedule/ScheduleManager.java   |   2 +-
 .../common/threadpool/ThreadPoolFactory.java       |  51 +++++-
 .../AbstractMutilPipelineChainPipline.java         |  67 +++++---
 .../streams/common/topology/model/Pipeline.java    |   8 +-
 .../topology/stages/AbstractWindowStage.java       |   5 +-
 .../common/topology/stages/ViewChainStage.java     |  10 +-
 .../streams/common/topology/task/StreamsTask.java  |  44 +----
 .../streams/common/utils/ContantsUtil.java         |  33 ++--
 .../rocketmq/streams/common/utils/FileUtil.java    |  38 ++++-
 .../service/impl/FileConfigureService.java         |   4 +-
 .../intelligence/AbstractIntelligenceCache.java    |   6 +-
 rocketmq-streams-examples/README.md                | 184 ---------------------
 .../function/expression/CompareFunction.java       |  16 +-
 .../optimization/homologous/HomologousCompute.java |   2 +-
 .../schedule/job/ConfigurableExecutorJob.java      |  30 ++--
 .../function/impl/distinct/DistinctFunction.java   |   1 -
 .../function/impl/json/JsonCreatorFunction.java    |   4 +-
 .../function/impl/json/UDTFFieldNameFunction.java  |  46 ++++++
 .../script/function/impl/parser/GrokFunction.java  |   4 +-
 .../function/impl/parser/Paser2JsonFunction.java   |  17 +-
 .../function/impl/parser/PaserBySplitFunction.java |  44 ++---
 .../function/impl/parser/RegexParserFunction.java  |  24 +--
 .../script/operator/impl/AggregationScript.java    |   4 +
 .../streams/script/service/IAccumulator.java       |   4 +-
 .../script/service/udf/SimpleUDAFScript.java       |  27 ++-
 .../streams/script/function/FunctionTest.java      |  17 ++
 .../rocketmq/streams/window/model/WindowCache.java |  55 +++---
 .../window/offset/WindowMaxValueManager.java       |   8 +-
 .../window/operator/AbstractShuffleWindow.java     |  31 ++--
 .../streams/window/operator/AbstractWindow.java    |   3 +-
 .../window/shuffle/AbstractSystemChannel.java      |  45 +++--
 .../streams/window/shuffle/ShuffleChannel.java     |   3 +-
 .../window/storage/AbstractWindowStorage.java      |  10 +-
 .../window/storage/ShufflePartitionManager.java    |   8 +-
 docs/stream_sink/README.md => stream_sink.md       |   2 +-
 docs/stream_source/README.md => stream_source.md   |   2 +-
 .../README.md => stream_transform.md               |   0
 83 files changed, 754 insertions(+), 1341 deletions(-)

diff --git a/README-chinese.md b/README-chinese.md
deleted file mode 100644
index da7d448..0000000
--- a/README-chinese.md
+++ /dev/null
@@ -1,150 +0,0 @@
-[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.com/apache/rocketmq-streams/releases)
-[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
-[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
-[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
-[![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
-
-# Features
-
-* 轻量级部署:可以单独部署,也支持集群部署
-* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等
-
-# DataStream Example
-
-```java
-import org.apache.rocketmq.streams.client.transform.DataStream;
-
-DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
-
-    source
-    .fromFile("~/admin/data/text.txt",false)
-    .map(message->message)
-    .toPrint(1)
-    .start();
-```
-
-# Maven Repository
-
-```xml
-
-<dependency>
-    <groupId>org.apache.rocketmq</groupId>
-    <artifactId>rocketmq-streams-clients</artifactId>
-    <version>1.0.2-SNAPSHOT</version>
-</dependency>
-```
-
-# Core API
-
-rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求;
-
-## StreamBuilder
-
-StreamBuilder 用于构建流任务的源;
-
-+ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务;
-
-## DataStream API
-
-### Source
-
-DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;
-
-+ ```fromFile```  从文件中读取数据, 该方法包含俩个参数
-    + ```filePath``` 文件路径,必填参数
-    + ```isJsonData```  是否json数据, 非必填参数, 默认为```true```
-
-
-+ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
-    + ```topic``` rocketmq消息队列的topic名称,必填参数
-    + ```groupName``` 消费者组的名称,必填参数
-    + ```isJson``` 是否json格式,非必填参数
-    + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
-
-+ ```fromKafka``` 从kafka中获取数据,包含5个参数
-    + ```bootstrapserver``` kafka的bootstrapserver 地址,包括ip和端口,多个值以逗号分隔, 必填参数
-    + ```topic``` kafka的topic, 必填参数
-    + ```groupName``` 消费组, 必填参数
-    + ```isJson``` 是否json格式,非必填参数,默认为true
-    + ```maxThread``` 客户端最大线程数,非必填参数,默认为1
-
-+ ```fromMqtt``` 从满足MQTT协议的终端读取数据, 满足边缘计算的场景,其中包含9个参数
-    + ```url```  mqtt broker的地址,必填参数
-    + ```clientId``` 客户端ID, 必填参数,相同的clientId有负载的作用
-    + ```topic``` topic信息, 必填参数
-    + ```username``` 用户名, 非必填,在mqtt端添加鉴权机制时使用
-    + ```password``` 密码,非必填参数,在mqtt端添加鉴权机制时使用
-    + ```cleanSession``` 是否清理session信息, 非必填,默认为true
-    + ```connectionTimeout``` 连接超时信息, 非必填,默认是10s
-    + ```aliveInterval```  判断连接是否活跃的间隔时间,非必填,默认是60s
-    + ```automaticReconnect``` 连接断开后自动重连机制,非必填,默认是true
-
-
-+ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源
-
-### transform
-
-transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类;
-
-DataStream实现了一系列常见的流计算算子
-
-+ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
-+ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
-+ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
-+ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
-+ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
-+ ```operate```  对每个记录执行一次自定义的函数,返回一个新的DataStream
-+ ```script```  针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
-+ ```toPrint``` 将结果在控制台打印,生成新的DataStream实例
-+ ```toFile``` 将结果保存为文件,生成一个新的DataStream实例
-+ ```toMqtt``` 将结果输出到满足mqtt协议的设备中,生成一个新的DataStream实例
-+ ```toDB``` 将结果保存到数据库
-+ ```toRocketmq``` 将结果输出到rocketmq
-+ ```toKafka``` 将结果输出到kafka
-+ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
-+ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
-    + ```count``` 在窗口内计数
-    + ```min``` 获取窗口内统计值的最小值
-    + ```max``` 获取窗口内统计值得最大值
-    + ```avg``` 获取窗口内统计值的平均值
-    + ```sum``` 获取窗口内统计值的加和值
-    + ```reduce``` 在窗口内进行自定义的汇总运算
-+ ```join``` 根据条件将俩个流进行内关联
-+ ```leftJoin``` 根据条件将俩个流的数据进行左关联
-+ ```dimJoin``` 根据条件将流与维表进行内关联,维表的数据可以来自于文件,也可以来自于数据库
-+ ```dimLeftJoin``` 根据条件将流与维表进行左关联,维表的数据可以来自于文件,也可以来自于数据库
-+ ```union``` 将俩个流进行合并
-+ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
-+ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等
-
-#### Strategy
-
-策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型;
-
-```java
-//指定checkpoint的存储策略
-source
-    .fromRocketmq("TSG_META_INFO","")
-    .map(message->message+"--")
-    .toPrint(1)
-    .with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
-    .start();
-```
-
-# 运行
-
-Rocketmq-Streams 作为典型的java应用,既可以集成在业务系统里运行,也可以作为一个独立的jar包来运行;
-
-首先对应用的源码进行编译
-
-```shell
-mvn -Prelease-all -DskipTests clean install -U
-```
-
-然后直接通过java指令来运行
-
-```shell
- java -jar jarName mainClass
-```
-
-更多详细的案例可以看[这里](docs/SUMMARY.md)
\ No newline at end of file
diff --git a/README.md b/README.md
index 23c4087..e420569 100644
--- a/README.md
+++ b/README.md
@@ -1,142 +1,6 @@
-# RocketMQ Streams [![Build Status](https://app.travis-ci.com/apache/rocketmq-streams.svg?branch=main)](https://app.travis-ci.com/apache/rocketmq-streams) [![CodeCov](https://codecov.io/gh/apache/rocketmq-stream/branch/main/graph/badge.svg)](https://app.codecov.io/gh/apache/rocketmq-streams)
+# Summary
 
-[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.com/apache/rocketmq-streams/releases)
-[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
-[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
-[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
-[![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
-
-## [中文文档](./README-Chinese.md)
-
-## [Quick Start](./quick_start.md)
-
-## Features
-
-* Lightweight deployment: RocketMQ Streams can be deployed separately or in cluster mode.
-* Various types of data input and output: source supports RocketMQ while sink supports databases and RocketMQ, etc.
-
-## DataStream Example
-
-```java
-import org.apache.rocketmq.streams.client.transform.DataStream;
-
-DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
-    source
-    .fromFile("~/admin/data/text.txt",false)
-    .map(message->message)
-    .toPrint(1)
-    .start();
-```
-
-## Maven Repository
-
-```xml
-
-<dependency>
-    <groupId>org.apache.rocketmq</groupId>
-    <artifactId>rocketmq-streams-clients</artifactId>
-    <version>1.0.2-SNAPSHOT</version>
-</dependency>
-```
-
-# Core API
-
-RocketMQ Streams implements a series of advanced APIs, allowing users to write stream computing programs conveniently and achieve their own business requirements.
-
-## StreamBuilder
-
-StreamBuilder is used to build the source of stream tasks. It contains two methods: ```dataStream()``` and ```tableStream()```, which return two sources, DataStreamSource and TableStreamSource, respectively.
-
-+ [dataStream(nameSpaceName,pipelineName)]() returns an instance of DataStreamSource, used for segmented programming to achieve stream computing tasks.
-+ [tableStream(nameSpaceName,pipelineName)]() returns an instance of TableStreamSource, used for script programming to achieve stream computing tasks.
-
-## DataStream API
-
-### Source
-
-DataStreamSource is a source class of segmented programming, used to interface with various data sources and obtain data from major message queues.
-
-+ ```fromFile```: reads data from the file. This method contains two parameters:
-    + ```filePath```: specifies which file path to read. Required.
-    + ```isJsonData```: specifies whether data is in JSON format. Optional. Default value: ```true```.
-    + ```tags```: the tags for filtering messages used by the RocketMQ consumer. Optional.
-
-
-+ ```fromRocketmq```: obtains data from RocketMQ, including four parameters:
-    + ```topic```:  the topic name of RocketMQ. Required.
-    + ```groupName```: the name of the consumer group. Required.
-    + ```isJson```: specifies whether data is in JSON format. Optional.
-    + ```tags```: the tags for filtering messages used by the RocketMQ consumer. Optional.
-    
-+ ```fromKafka``` read data from the Kafka, including five parameters:
-  + ```bootstrapserver``` the Kafka bootstrap servers. Required.
-  + ```topic``` the topic name of Kafka. Required.
-  + ```groupName``` the name of the consumer group. Required.
-  + ```isJson``` specifies whether data is in JSON format. Optional.
-  + ```maxThread``` the number of the Kafka consumer max Threads.Optional.
-
-+ ```fromMqtt``` reads data from MQTT service, including nine parameters:
-  + ```url```  the broker of the MQTT service. Required.
-  + ```clientId``` the client id. Required
-  + ```topic``` the name of the MQTT topic. Required.
-  + ```username``` username. Optional. 
-  + ```password``` password. Optional. 
-  + ```cleanSession``` specifies Whether to clear the session during the restart. Optional.
-  + ```connectionTimeout``` the connection timeout. Optional.
-  + ```aliveInterval``` Survival time interval. Optional.
-  + ```automaticReconnect``` specifies Whether to reconnect. Optional.
-
-+ ```from```: custom data source. You can specify your own data source by implementing ISource interface.
-
-### transform
-
-transform allows the input source data to be modified during the stream calculation process for the next step; DataStream API includes ```DataStream```, ```JoinStream```, ```SplitStream```, ```WindowStream```, and many other transform classes.
-
-#### DataStream
-
-DataStream implements a series of common stream calculation operators as follows:
-
-+ ```map```: returns a new DataStream by passing each record of the source to the **func** function.
-+ ```flatmap```: similar to map. One input item corresponds to 0 or more output items.
-+ ```filter```: returns a new DataStream based on the record of the source DataStream only when the ** func** function returns **true**.
-+ ```forEach```: executes the **func** function once for each record and returns a new DataStream.
-+ ```selectFields```: returns the corresponding field value for each record, and returns a new DataStream.
-+ ```operate```: executes a custom function for each record and returns a new DataStream.
-+ ```script```: executes a script for each recorded field, returns new fields, and generates a new DataStream.
-+ ```toPrint```: prints the result on the console and generates a new DataStreamAction instance.
-+ ```toFile```: saves the result as a file and generates a new DataStreamAction instance.
-+ ```toDB```: saves the result to the database.
-+ ```toRocketmq```: outputs the result to RocketMQ.
-+ ```toKafka```: outputs the result to Kafka.
-+ ```to```: outputs the result to the specified storage through the custom ISink interface.
-+ ```window```: performs relevant statistical analysis in the window, generally used in conjunction with ```groupBy```. ```window()``` is used to define the size of the window, and ```groupBy( )``` used to define the main key of statistical analysis. You can specify multiple main keys:
-    + ```count```: counts in the window.
-    + ```min```: gets the minimum of the statistical value in the window.
-    + ```max```: gets the maximum of the statistical value in the window.
-    + ```avg```: gets the average of the statistical values in the window.
-    + ```sum```: gets the sum of the statistical values in the window.
-    + ```reduce```: performs custom summary calculations in the window.
-+ ```join```: associates the two streams or one stream and one physical table according to the conditions and merges them into a large stream for related calculations.
-    + ```dimJoin```  associate a stream with a physical table which can be a file or a db table, and all matching records are retained
-    + ```dimLeftJoin```  After a flow is associated with a physical table, all data of the flow is reserved and fields that do not match the physical table are left blank
-    + ```join```
-    + ```leftJoin```
-+ ```union```: merges the two streams.
-+ ```split```: splits a data stream into different data streams according to tags for downstream analysis and calculation.
-+ ```with```: specifies related strategies during the calculation, including Checkpoint and state storage strategies, etc.
-
-# Strategy
-
-The Strategy mechanism is mainly used to control the underlying logic during the operation of the computing engine, such as the storage methods of Checkpoint and state etc. Subsequent controls for windows, dual-stream joins, and so on will be added. All control strategies are transmitted through the ```with``` operator. Multiple policy types can be transmitted at the same time.
-
-```java
-//Specify the storage strategy for Checkpoint.
-source
-    .fromRocketmq("TSG_META_INFO","")
-    .map(message->message+"--")
-    .toPrint(1)
-    .with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
-    .start();
-```
-
-——————————————
+* [Quick Start](quick\_start.md)
+* [创建实时任务数据源](stream\_source.md)
+* [创建实时任务数据输出](stream\_sink.md)
+* [数据处理逻辑](stream\_transform.md)
diff --git a/SUMMARY.md b/SUMMARY.md
new file mode 100644
index 0000000..2c99143
--- /dev/null
+++ b/SUMMARY.md
@@ -0,0 +1,7 @@
+# Table of contents
+
+* [Summary](README.md)
+* [快速开发](quick\_start.md)
+* [stream\_source](stream\_source.md)
+* [stream\_sink](stream\_sink.md)
+* [数据处理逻辑](stream\_transform.md)
diff --git a/docs/README.md b/docs/README.md
deleted file mode 100644
index 3067952..0000000
--- a/docs/README.md
+++ /dev/null
@@ -1,150 +0,0 @@
-[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.com/apache/rocketmq-streams/releases)
-[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
-[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
-[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
-[![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
-
-# Features
-
-* 轻量级部署:可以单独部署,也支持集群部署
-* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等
-
-# DataStream Example
-
-```java
-import org.apache.rocketmq.streams.client.transform.DataStream;
-
-DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
-
-    source
-    .fromFile("~/admin/data/text.txt",false)
-    .map(message->message)
-    .toPrint(1)
-    .start();
-```
-
-# Maven Repository
-
-```xml
-
-<dependency>
-    <groupId>org.apache.rocketmq</groupId>
-    <artifactId>rocketmq-streams-clients</artifactId>
-    <version>1.0.0-SNAPSHOT</version>
-</dependency>
-```
-
-# Core API
-
-rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求;
-
-## StreamBuilder
-
-StreamBuilder 用于构建流任务的源;
-
-+ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务;
-
-## DataStream API
-
-### Source
-
-DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;
-
-+ ```fromFile```  从文件中读取数据, 该方法包含俩个参数
-    + ```filePath``` 文件路径,必填参数
-    + ```isJsonData```  是否json数据, 非必填参数, 默认为```true```
-
-
-+ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
-    + ```topic``` rocketmq消息队列的topic名称,必填参数
-    + ```groupName``` 消费者组的名称,必填参数
-    + ```isJson``` 是否json格式,非必填参数
-    + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
-  
-+ ```fromKafka``` 从kafka中获取数据,包含5个参数
-  + ```bootstrapserver``` kafka的bootstrapserver 地址,包括ip和端口,多个值以逗号分隔, 必填参数
-  + ```topic``` kafka的topic, 必填参数
-  + ```groupName``` 消费组, 必填参数
-  + ```isJson``` 是否json格式,非必填参数,默认为true
-  + ```maxThread``` 客户端最大线程数,非必填参数,默认为1
-
-+ ```fromMqtt``` 从满足MQTT协议的终端读取数据, 满足边缘计算的场景,其中包含9个参数
-    + ```url```  mqtt broker的地址,必填参数
-    + ```clientId``` 客户端ID, 必填参数,相同的clientId有负载的作用
-    + ```topic``` topic信息, 必填参数
-    + ```username``` 用户名, 非必填,在mqtt端添加鉴权机制时使用
-    + ```password``` 密码,非必填参数,在mqtt端添加鉴权机制时使用
-    + ```cleanSession``` 是否清理session信息, 非必填,默认为true
-    + ```connectionTimeout``` 连接超时信息, 非必填,默认是10s
-    + ```aliveInterval```  判断连接是否活跃的间隔时间,非必填,默认是60s
-    + ```automaticReconnect``` 连接断开后自动重连机制,非必填,默认是true
-
-
-+ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源
-
-### transform
-
-transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类;
-
-DataStream实现了一系列常见的流计算算子
-
-+ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
-+ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
-+ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
-+ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
-+ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
-+ ```operate```  对每个记录执行一次自定义的函数,返回一个新的DataStream
-+ ```script```  针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
-+ ```toPrint``` 将结果在控制台打印,生成新的DataStream实例
-+ ```toFile``` 将结果保存为文件,生成一个新的DataStream实例
-+ ```toMqtt``` 将结果输出到满足mqtt协议的设备中,生成一个新的DataStream实例
-+ ```toDB``` 将结果保存到数据库
-+ ```toRocketmq``` 将结果输出到rocketmq
-+ ```toKafka``` 将结果输出到kafka
-+ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
-+ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
-    + ```count``` 在窗口内计数
-    + ```min``` 获取窗口内统计值的最小值
-    + ```max``` 获取窗口内统计值得最大值
-    + ```avg``` 获取窗口内统计值的平均值
-    + ```sum``` 获取窗口内统计值的加和值
-    + ```reduce``` 在窗口内进行自定义的汇总运算
-+ ```join``` 根据条件将俩个流进行内关联
-+ ```leftJoin``` 根据条件将俩个流的数据进行左关联
-+ ```dimJoin``` 根据条件将流与维表进行内关联,维表的数据可以来自于文件,也可以来自于数据库
-+ ```dimLeftJoin``` 根据条件将流与维表进行左关联,维表的数据可以来自于文件,也可以来自于数据库
-+ ```union``` 将俩个流进行合并
-+ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
-+ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等
-
-#### Strategy
-
-策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型;
-
-```java
-//指定checkpoint的存储策略
-source
-    .fromRocketmq("TSG_META_INFO","")
-    .map(message->message+"--")
-    .toPrint(1)
-    .with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
-    .start();
-```
-
-# 运行
-
-Rocketmq-Streams 作为典型的java应用,既可以集成在业务系统里运行,也可以作为一个独立的jar包来运行;
-
-首先对应用的源码进行编译
-
-```shell
-mvn -Prelease-all -DskipTests clean install -U
-```
-
-然后直接通过java指令来运行
-
-```shell
- java -jar jarName mainClass
-```
-
-更多详细的案例可以看[这里](docs/SUMMARY.md)
\ No newline at end of file
diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md
deleted file mode 100644
index 950f99b..0000000
--- a/docs/SUMMARY.md
+++ /dev/null
@@ -1,8 +0,0 @@
-# Summary
-
-* [Introduction](README.md)
-* [Quick Start](quick_start/README.md)
-* [创建实时任务数据源](stream_source/README.md)
-* [创建实时任务数据输出](stream_sink/README.md)
-* [数据处理逻辑](stream_transform/README.md)
-
diff --git a/docs/quick_start/README.md b/docs/quick_start/README.md
deleted file mode 100644
index a60dbb9..0000000
--- a/docs/quick_start/README.md
+++ /dev/null
@@ -1,46 +0,0 @@
-# 快速开发
-
-## 引入相关的jar包
-
-```xml
-
-<dependency>
-    <groupId>org.apache.rocketmq</groupId>
-    <artifactId>rocketmq-streams-clients</artifactId>
-</dependency>
-
-```
-
-## 开发实时应用程序
-
-```java
-
-public class RocketmqExample {
-
-    public static void main(String[] args) {
-
-        DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
-
-        dataStream
-            .fromFile("data.csv", false)   //构建实时任务的数据源
-            .map(message -> message.split(","))   //构建实时任务处理的逻辑过程
-            .toPrint(1)   //构建实时任务的输出
-            .start();    //启动实时任务
-    }
-}
-
-```
-
-## 运行
-
-打包
-
-```shell
-mvn -Prelease-all -DskipTests clean install -U
-```
-
-运行
-
-```shell
- java -jar jarName mainClass
-```
diff --git a/pom.xml b/pom.xml
index 6929c67..632f7c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,7 +71,6 @@
         <java.version>1.8</java.version>
         <java.encoding>UTF-8</java.encoding>
         <project.build.sourceEncoding>${java.encoding}</project.build.sourceEncoding>
-        <log4j.version>1.2.17</log4j.version>
         <commons-logging.version>1.1</commons-logging.version>
         <spring.version>5.1.14.RELEASE</spring.version>
         <auto-service.version>1.0-rc5</auto-service.version>
@@ -100,6 +99,7 @@
         <elasticsearch.version>7.4.0</elasticsearch.version>
         <kafka.version>1.1.0</kafka.version>
         <paho.version>1.2.2</paho.version>
+        <slf4j-log4j12.version>1.7.36</slf4j-log4j12.version>
     </properties>
 
 
@@ -125,10 +125,7 @@
                         <exclude>build_without_test.sh</exclude>
                         <exclude>NOTICE</exclude>
                         <exclude>LICENSE</exclude>
-                        <exclude>README.md</exclude>
-                        <exclude>README-chinese.md</exclude>
-                        <exclude>QUICKSTART.md</exclude>
-                        <exclude>quick_start.md</exclude>
+                        <exclude>*.md</exclude>
                         <exclude>.github/**</exclude>
                         <exclude>*/target/**</exclude>
                         <exclude>*/*.iml</exclude>
@@ -399,9 +396,9 @@
             </dependency>
 
             <dependency>
-                <groupId>log4j</groupId>
-                <artifactId>log4j</artifactId>
-                <version>${log4j.version}</version>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-log4j12</artifactId>
+                <version>${slf4j-log4j12.version}</version>
             </dependency>
 
             <dependency>
diff --git a/quick_start.md b/quick_start.md
index adcb529..a60dbb9 100644
--- a/quick_start.md
+++ b/quick_start.md
@@ -1,29 +1,6 @@
-## rocketmq-streams 快速搭建
----
+# 快速开发
 
-### 前言
-
-本文档主要介绍如何基于rocketmq-streams快速搭建流处理任务,搭建过程中某些例子会用到rocketmq,可以参考[rocketmq搭建文档](https://rocketmq.apache.org/docs/quick-start/)
-
-### 1、源码构建
-
-#### 1.1、构建环境
-
-- JDK 1.8 and above
-- Maven 3.2 and above
-
-#### 1.2、构建Rocketmq-streams
-
-```shell
-git clone https://github.com/apache/rocketmq-streams.git
-cd rocketmq-streams
-mvn clean -DskipTests  install -U
-
-```
-
-### 2、基于rocketmq-streams创建应用
-
-#### 2.1、pom依赖
+## 引入相关的jar包
 
 ```xml
 
@@ -31,54 +8,39 @@ mvn clean -DskipTests  install -U
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-streams-clients</artifactId>
 </dependency>
-```
 
-#### 2.2、shade clients依赖包
+```
 
-```xml
+## 开发实时应用程序
 
-<build>
-    <plugins>
-        <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-shade-plugin</artifactId>
-            <version>3.2.1</version>
-            <executions>
-                <execution>
-                    <phase>package</phase>
-                    <goals>
-                        <goal>shade</goal>
-                    </goals>
-                    <configuration>
-                        <minimizeJar>false</minimizeJar>
-                        <shadedArtifactAttached>true</shadedArtifactAttached>
-                        <artifactSet>
-                            <includes>
-                                <include>org.apache.rocketmq:rocketmq-streams-clients</include>
-                            </includes>
-                        </artifactSet>
-                    </configuration>
-                </execution>
-            </executions>
-        </plugin>
-    </plugins>
-</build>
-```
+```java
 
-#### 2.3、编写业务代码
+public class RocketmqExample {
 
-快速编写一个统计页面点击次数的小程序:Please see the [rocketmq-streams-examples](rocketmq-streams-examples/README.md)
+    public static void main(String[] args) {
 
-#### 2.4、运行
+        DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
 
-- 前提:在从rocketmq中读取数据做流处理时,需要运行topic在rocketmq中自动创建,因为做groupBy操作时,需要用到rocketmq作为shuffle数据的读写目的地。
-- 命令:
+        dataStream
+            .fromFile("data.csv", false)   //构建实时任务的数据源
+            .map(message -> message.split(","))   //构建实时任务处理的逻辑过程
+            .toPrint(1)   //构建实时任务的输出
+            .start();    //启动实时任务
+    }
+}
 
 ```
-   java -jar XXXX-shade.jar \ 
-        -Dlog4j.level=ERROR \
-        -Dlog4j.home=/logs  \
-        -Xms1024m \
-        -Xmx1024m 
+
+## 运行
+
+打包
+
+```shell
+mvn -Prelease-all -DskipTests clean install -U
 ```
 
+运行
+
+```shell
+ java -jar jarName mainClass
+```
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/AbstractMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/AbstractMultiTableSink.java
index a46bb16..ef9380d 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/AbstractMultiTableSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/AbstractMultiTableSink.java
@@ -34,7 +34,7 @@ public abstract class AbstractMultiTableSink extends EnhanceDBSink {
     protected transient AtomicLong messageCount = new AtomicLong(0);
     protected transient MultiTableSplitFunction<IMessage> multiTableSplitFunction;
 
-    public AbstractMultiTableSink(){
+    public AbstractMultiTableSink() {
     }
 
     public AbstractMultiTableSink(String url, String userName, String password) {
@@ -44,16 +44,16 @@ public abstract class AbstractMultiTableSink extends EnhanceDBSink {
     }
 
     @Override
-    protected boolean initConfigurable(){
+    protected boolean initConfigurable() {
         Iterator<EnhanceDBSink> it = tableSinks.values().iterator();
-        while(it.hasNext()){
+        while (it.hasNext()) {
             it.next().initConfigurable();
         }
         return true;
     }
 
     @Override
-    public boolean batchAdd(IMessage message, ISplit split) {
+    public boolean batchAdd(IMessage message, ISplit<?, ?> split) {
 
         EnhanceDBSink sink = getOrCreateDBSink(split.getQueueId());
         boolean success = sink.batchAdd(message, split);
@@ -68,7 +68,7 @@ public abstract class AbstractMultiTableSink extends EnhanceDBSink {
 
     @Override
     public boolean batchAdd(IMessage message) {
-        ISplit split = getSplitFromMessage(message);
+        ISplit<?, ?> split = getSplitFromMessage(message);
         return batchAdd(message, split);
     }
 
@@ -142,7 +142,7 @@ public abstract class AbstractMultiTableSink extends EnhanceDBSink {
 
     protected abstract String createTableName(String splitId);
 
-    protected abstract ISplit getSplitFromMessage(IMessage message);
+    protected abstract ISplit<?, ?> getSplitFromMessage(IMessage message);
 
     protected class SingleDBSinkCache extends MessageCache<IMessage> {
 
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DynamicMultipleDBSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DynamicMultipleDBSink.java
index ebcf9d8..d57fe01 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DynamicMultipleDBSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DynamicMultipleDBSink.java
@@ -32,7 +32,7 @@ public class DynamicMultipleDBSink extends AbstractMultiTableSink implements IAf
     String logicTableName;
     String fieldName;
 
-    public DynamicMultipleDBSink(){
+    public DynamicMultipleDBSink() {
     }
 
     public String getLogicTableName() {
@@ -63,21 +63,21 @@ public class DynamicMultipleDBSink extends AbstractMultiTableSink implements IAf
     }
 
     @Override
-    protected ISplit getSplitFromMessage(IMessage message) {
+    protected ISplit<?, ?> getSplitFromMessage(IMessage message) {
         return this.multiTableSplitFunction.createSplit(message);
     }
 
-
     @Override
     public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
 
-        if(this.multiTableSplitFunction == null){
+        if (this.multiTableSplitFunction == null) {
 
             this.multiTableSplitFunction = new MultiTableSplitFunction<IMessage>() {
                 @Override
-                public ISplit createSplit(IMessage message) {
+                public ISplit<?, ?> createSplit(IMessage message) {
                     return new DynamicMultipleDBSplit(message.getMessageBody().getString(fieldName), logicTableName);
                 }
+
                 @Override
                 public String createTableFromSplitId(String splitId) {
                     return splitId;
@@ -86,6 +86,5 @@ public class DynamicMultipleDBSink extends AbstractMultiTableSink implements IAf
 
         }
 
-
     }
 }
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java
index dc5271a..709fb77 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java
@@ -41,7 +41,7 @@ public class SelfMultiTableSink extends AbstractMultiTableSink implements IAfter
     }
 
     @Override
-    protected ISplit getSplitFromMessage(IMessage message) {
+    protected ISplit<?, ?> getSplitFromMessage(IMessage message) {
         return multiTableSplitFunction.createSplit(message);
     }
 
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitBySerialNumber.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitBySerialNumber.java
index c2a49b7..5c3ca3e 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitBySerialNumber.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitBySerialNumber.java
@@ -30,7 +30,7 @@ public class SplitBySerialNumber extends AbstractMultiTableSink {
     }
 
     @Override
-    protected ISplit getSplitFromMessage(IMessage message) {
+    protected ISplit<?, ?> getSplitFromMessage(IMessage message) {
         return null;
     }
 }
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java
index 87a2b3e..d40c59f 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java
@@ -30,7 +30,7 @@ public class SplitByTimeMultiTableSink extends AbstractMultiTableSink {
     }
 
     @Override
-    protected ISplit getSplitFromMessage(IMessage message) {
+    protected ISplit<?, ?> getSplitFromMessage(IMessage message) {
         return null;
     }
 }
diff --git a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESChannelBuilder.java b/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESChannelBuilder.java
deleted file mode 100644
index 2025606..0000000
--- a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESChannelBuilder.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.es.sink;
-
-import com.alibaba.fastjson.JSONObject;
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.builder.IShuffleChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
-
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = ESChannelBuilder.TYPE, aliasName = "ES")
-public class ESChannelBuilder implements IChannelBuilder  {
-    public static final String TYPE = "es";
-
-    protected JSONObject createFormatProperty(Properties properties) {
-        JSONObject formatProperties = new JSONObject();
-        for (Object object : properties.keySet()) {
-            String key = (String) object;
-            if ("type".equals(key)) {
-                continue;
-            }
-            formatProperties.put(key, properties.get(key));
-        }
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "host", "endPoint");
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "esIndex", "index");
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "esIndexType", "typeName");;
-
-        return formatProperties;
-    }
-
-    @Override public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
-        throw new RuntimeException("can not support source for ES");
-    }
-
-    @Override
-    public String getType() {
-        return TYPE;
-    }
-
-    @Override
-    public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
-        return (ISink) ConfigurableUtil.create(ESSinkOnlyChannel.class.getName(), namespace, name, createFormatProperty(properties), null);
-    }
-
-}
diff --git a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkBuilder.java b/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkBuilder.java
index 2a28eb9..9ebd822 100644
--- a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkBuilder.java
+++ b/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkBuilder.java
@@ -70,6 +70,7 @@ public class ESSinkBuilder extends AbstractSupportShuffleChannelBuilder {
         IChannelBuilder.formatPropertiesName(formatProperties, properties, "esIndexType", "esindextype");
         IChannelBuilder.formatPropertiesName(formatProperties, properties, "host", "endpoint");
         IChannelBuilder.formatPropertiesName(formatProperties, properties, "maxThread", "maxthread");
+        IChannelBuilder.formatPropertiesName(formatProperties, properties, "esMsgId", "es_msg_id");
         return formatProperties;
     }
 
diff --git a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkOnlyChannel.java b/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkOnlyChannel.java
index 6abd9b1..c14063a 100644
--- a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkOnlyChannel.java
+++ b/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkOnlyChannel.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.streams.es.sink;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -31,6 +33,7 @@ import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
 import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
 import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
@@ -66,6 +69,8 @@ public class ESSinkOnlyChannel extends AbstractSink {
 
     private String esIndexType = "log";
 
+    protected String esMsgId;
+
     private transient RestHighLevelClient client;
 
     public ESSinkOnlyChannel() {
@@ -131,8 +136,29 @@ public class ESSinkOnlyChannel extends AbstractSink {
         messages.forEach(message -> {
             IndexRequest indexRequest = new IndexRequest(esIndex);
             Object object = message.getMessageValue();
+            if(object!=null&&!(object instanceof Map)){
+                String str=object.toString();
+                if(str.startsWith("{")&&str.endsWith("}")){
+                    try {
+                        JSONObject jsonObject= JSON.parseObject(str);
+                        object=jsonObject;
+                    }catch (Exception e){
+                        LOG.warn("the sink msg is not json, convert error");
+                    }
+
+                }
+            }
             if (object instanceof Map) {
                 indexRequest.source((Map<String, ?>) object);
+                if(StringUtil.isNotEmpty(esMsgId)){
+                    Map map=(Map)object;
+                    Object msgId=map.get(esMsgId);
+                    if(msgId!=null){
+                        indexRequest.id(msgId.toString());
+                    }
+                }
+
+
             } else {
                 indexRequest.source(object.toString());
             }
@@ -238,4 +264,11 @@ public class ESSinkOnlyChannel extends AbstractSink {
         this.port = port;
     }
 
+    public String getEsMsgId() {
+        return esMsgId;
+    }
+
+    public void setEsMsgId(String esMsgId) {
+        this.esMsgId = esMsgId;
+    }
 }
diff --git a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KafkaChannelBuilder.java b/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KafkaChannelBuilder.java
index 7f79d8f..489bcff 100644
--- a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KafkaChannelBuilder.java
+++ b/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/KafkaChannelBuilder.java
@@ -41,10 +41,8 @@ public class KafkaChannelBuilder extends AbstractSupportShuffleChannelBuilder {
      * @return
      */
     @Override
-    public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
-
-        KafkaSource kafkaSource = (KafkaSource) ConfigurableUtil.create(KafkaSource.class.getName(), namespace, name, createFormatProperty(properties), null);
-        return kafkaSource;
+    public ISource<?> createSource(String namespace, String name, Properties properties, MetaData metaData) {
+        return (KafkaSource) ConfigurableUtil.create(KafkaSource.class.getName(), namespace, name, createFormatProperty(properties), null);
     }
 
     @Override
@@ -53,7 +51,7 @@ public class KafkaChannelBuilder extends AbstractSupportShuffleChannelBuilder {
     }
 
     @Override
-    public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
+    public ISink<?> createSink(String namespace, String name, Properties properties, MetaData metaData) {
         return (KafkaSink) ConfigurableUtil.create(KafkaSink.class.getName(), namespace, name, createFormatProperty(properties), null);
     }
 
@@ -85,10 +83,9 @@ public class KafkaChannelBuilder extends AbstractSupportShuffleChannelBuilder {
     }
 
     @Override
-    public ISink createBySource(ISource pipelineSource) {
+    public ISink<?> createBySource(ISource<?> pipelineSource) {
         KafkaSource kafkaSource = (KafkaSource) pipelineSource;
         String topic = kafkaSource.getTopic();
-        KafkaSink sink = new KafkaSink(kafkaSource.getBootstrapServers(), topic);
-        return sink;
+        return new KafkaSink(kafkaSource.getBootstrapServers(), topic);
     }
 }
diff --git a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/source/KafkaSource.java b/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/source/KafkaSource.java
index d80abc2..a062a0f 100644
--- a/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/source/KafkaSource.java
+++ b/rocketmq-streams-channel-kafka/src/main/java/org/apache/rocketmq/streams/kafka/source/KafkaSource.java
@@ -77,7 +77,7 @@ public class KafkaSource extends AbstractSupportShuffleSource {
         props.put("key.serializer.encoding", getEncoding());
         props.put("value.serializer.encoding", getEncoding());
         this.props = props;
-        return true;
+        return super.initConfigurable();
     }
 
     @Override protected boolean startSource() {
diff --git a/rocketmq-streams-channel-mqtt/src/main/java/org/apache/rocketmq/streams/mqtt/source/PahoSource.java b/rocketmq-streams-channel-mqtt/src/main/java/org/apache/rocketmq/streams/mqtt/source/PahoSource.java
index eb538e9..11490ec 100644
--- a/rocketmq-streams-channel-mqtt/src/main/java/org/apache/rocketmq/streams/mqtt/source/PahoSource.java
+++ b/rocketmq-streams-channel-mqtt/src/main/java/org/apache/rocketmq/streams/mqtt/source/PahoSource.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.mqtt.source;
 import com.alibaba.fastjson.JSONObject;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.rocketmq.streams.common.channel.source.AbstractBatchSource;
 import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
 import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -29,9 +28,13 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PahoSource extends AbstractSource {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(PahoSource.class);
+
     private String url;
     private String clientId;
     private String topic;
@@ -79,7 +82,7 @@ public class PahoSource extends AbstractSource {
             this.client.setCallback(new MqttCallback() {
 
                 @Override public void connectionLost(Throwable throwable) {
-                    System.out.println("Reconnecting to broker: " + url);
+                    LOGGER.info("Reconnecting to broker: " + url);
                     while (true) {
                         MqttConnectOptions connOpts = new MqttConnectOptions();
                         if (username != null && password != null) {
@@ -111,13 +114,13 @@ public class PahoSource extends AbstractSource {
                         try {
                             if (!client.isConnected()) {
                                 client.connect(connOpts);
-                                System.out.println("Reconnecting success");
+                                LOGGER.info("Reconnecting success");
                             }
                             client.subscribe(topic);
                             break;
                         } catch (MqttException e) {
                             try {
-                                System.err.println("Reconnecting err: " + e.getMessage());
+                                LOGGER.error("Reconnecting err: " + e.getMessage());
                                 e.printStackTrace();
                                 Thread.sleep(10000);
                             } catch (InterruptedException ex) {
@@ -134,7 +137,7 @@ public class PahoSource extends AbstractSource {
                 }
 
                 @Override public void deliveryComplete(IMqttDeliveryToken token) {
-                    System.out.println("deliveryComplete---------" + token.isComplete());
+                    LOGGER.info("deliveryComplete---------" + token.isComplete());
                 }
             });
 
@@ -165,10 +168,10 @@ public class PahoSource extends AbstractSource {
                 connOpts.setAutomaticReconnect(this.automaticReconnect);
             }
 
-            System.out.println("Connecting to broker: " + url);
+            LOGGER.info("Connecting to broker: " + url);
             if (!this.client.isConnected()) {
                 this.client.connect(connOpts);
-                System.out.println("Connected");
+                LOGGER.info("Connected");
             }
             this.client.subscribe(topic);
             return true;
diff --git a/rocketmq-streams-channel-rocketmq/pom.xml b/rocketmq-streams-channel-rocketmq/pom.xml
index 5203198..3b12c21 100644
--- a/rocketmq-streams-channel-rocketmq/pom.xml
+++ b/rocketmq-streams-channel-rocketmq/pom.xml
@@ -66,26 +66,6 @@
             <scope>test</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <version>1.7.26</version>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>jcl-over-slf4j</artifactId>
-            <version>1.7.26</version>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.26</version>
-        </dependency>
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-            <version>1.2.17</version>
-        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/debug/DebugWriter.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/debug/DebugWriter.java
index 2726dd8..1c622a8 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/debug/DebugWriter.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/debug/DebugWriter.java
@@ -28,90 +28,98 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
 import org.apache.rocketmq.streams.common.utils.FileUtil;
 import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DebugWriter {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(DebugWriter.class);
 
-    protected String dir="/tmp/rocksmq-streams/mq";
-    protected static Map<String,DebugWriter> debugWriterMap=new HashMap<>();
-    public static DebugWriter getInstance(String topic){
-        DebugWriter debugWriter=debugWriterMap.get(topic);
-        if(debugWriter==null){
-            debugWriter=new DebugWriter();
-            debugWriterMap.put(topic,debugWriter);
+    protected String dir = "/tmp/rocksmq-streams/mq";
+    protected static Map<String, DebugWriter> debugWriterMap = new HashMap<>();
+
+    public static DebugWriter getInstance(String topic) {
+        DebugWriter debugWriter = debugWriterMap.get(topic);
+        if (debugWriter == null) {
+            debugWriter = new DebugWriter();
+            debugWriterMap.put(topic, debugWriter);
         }
         return debugWriter;
     }
 
-    public static boolean isOpenDebug(){
+    public static boolean isOpenDebug() {
         return false;
     }
 
-    public DebugWriter(){}
-    public DebugWriter(String dir){
-        this.dir=dir;
+    public DebugWriter() {
+    }
+
+    public DebugWriter(String dir) {
+        this.dir = dir;
     }
 
     /**
      * write offset 2 file
+     *
      * @param offsets
      */
-    public void writeSaveOffset(Map<MessageQueue, AtomicLong> offsets){
-        if(isOpenDebug()==false){
+    public void writeSaveOffset(Map<MessageQueue, AtomicLong> offsets) {
+        if (!isOpenDebug()) {
             return;
         }
-        String path=dir+"/offsets/offset.txt";
-        if(offsets==null||offsets.size()==0){
+        String path = dir + "/offsets/offset.txt";
+        if (offsets == null || offsets.size() == 0) {
             return;
         }
         Iterator<Map.Entry<MessageQueue, AtomicLong>> it = offsets.entrySet().iterator();
-        List<String> rows=new ArrayList<>();
-        while(it.hasNext()){
-            Map.Entry<MessageQueue, AtomicLong> entry=it.next();
-            String queueId=new RocketMQMessageQueue(entry.getKey()).getQueueId();
-            JSONObject msg=new JSONObject();
-            Long offset=entry.getValue().get();
-            msg.put(queueId,offset);
+        List<String> rows = new ArrayList<>();
+        while (it.hasNext()) {
+            Map.Entry<MessageQueue, AtomicLong> entry = it.next();
+            String queueId = new RocketMQMessageQueue(entry.getKey()).getQueueId();
+            JSONObject msg = new JSONObject();
+            Long offset = entry.getValue().get();
+            msg.put(queueId, offset);
             msg.put("saveTime", DateUtil.getCurrentTimeString());
             msg.put("queueId", queueId);
             rows.add(msg.toJSONString());
         }
-        FileUtil.write(path,rows,true);
+        FileUtil.write(path, rows, true);
     }
 
-    public void writeSaveOffset(MessageQueue messageQueue, AtomicLong offset){
-        if(isOpenDebug()==false){
+    public void writeSaveOffset(MessageQueue messageQueue, AtomicLong offset) {
+        if (!isOpenDebug()) {
             return;
         }
-        Map<MessageQueue, AtomicLong> offsets=new HashMap<>();
-        offsets.put(messageQueue,offset);
+        Map<MessageQueue, AtomicLong> offsets = new HashMap<>();
+        offsets.put(messageQueue, offset);
         writeSaveOffset(offsets);
     }
 
-
-    public void receiveFirstData(String queueId,Long offset){
-        if(isOpenDebug()==false){
+    public void receiveFirstData(String queueId, Long offset) {
+        if (!isOpenDebug()) {
             return;
         }
-        Map<String,Long> offsets=load();
-        Long saveOffset=offsets.get(queueId);
-        System.out.println("queueId is "+queueId+"current offset "+offset+"===="+saveOffset);
+        Map<String, Long> offsets = load();
+        Long saveOffset = offsets.get(queueId);
+        LOGGER.info("queueId is " + queueId + "current offset " + offset + "====" + saveOffset);
     }
+
     /**
      * load offsets
+     *
      * @return
      */
-    public Map<String,Long> load(){
-        if(isOpenDebug()==false){
+    public Map<String, Long> load() {
+        if (!isOpenDebug()) {
             return null;
         }
-        String path=dir+"/offsets/offset.txt";
-        List<String> lines=FileUtil.loadFileLine(path);
-        Map<String,Long> offsets=new HashMap<>();
-        for(String line:lines){
-            JSONObject row=JSONObject.parseObject(line);
-            String queueId=row.getString("queueId");
-            offsets.put(queueId,row.getLong(queueId));
+        String path = dir + "/offsets/offset.txt";
+        List<String> lines = FileUtil.loadFileLine(path);
+        Map<String, Long> offsets = new HashMap<>();
+        for (String line : lines) {
+            JSONObject row = JSONObject.parseObject(line);
+            String queueId = row.getString("queueId");
+            offsets.put(queueId, row.getLong(queueId));
         }
         return offsets;
     }
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
index c7b97f1..2a6d9be 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
@@ -17,16 +17,14 @@
 
 package org.apache.rocketmq.streams.sink;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.TopicConfig;
@@ -40,10 +38,12 @@ import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.CommandUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RocketMQSink extends AbstractSupportShuffleSink {
 
-    private static final Log LOG = LogFactory.getLog(RocketMQSink.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQSink.class);
     @ENVDependence
     private String tags = "*";
 
@@ -90,8 +90,8 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
             return true;
         }
         if (StringUtil.isEmpty(topic)) {
-            if (LOG.isErrorEnabled()) {
-                LOG.error("topic is blank");
+            if (LOGGER.isErrorEnabled()) {
+                LOGGER.error("topic is blank");
             }
             return false;
         }
@@ -102,19 +102,15 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
             Map<String, MessageQueue> messageQueueMap = new HashMap<>();//if has queue id in message, save the map for queueid 2 messagequeeue
             String defaultQueueId = "<null>";//message is not contains queue ,use default
             for (IMessage msg : messages) {
-                ISplit<RocketMQMessageQueue, MessageQueue> channelQueue = getSplit(msg);
+                ISplit<RocketMQMessageQueue, MessageQueue> channelQueue = (ISplit<RocketMQMessageQueue, MessageQueue>) getSplit(msg);
                 String queueId = defaultQueueId;
                 if (channelQueue != null) {
                     queueId = channelQueue.getQueueId();
                     RocketMQMessageQueue metaqMessageQueue = (RocketMQMessageQueue) channelQueue;
                     messageQueueMap.put(queueId, metaqMessageQueue.getQueue());
                 }
-                List<Message> messageList = msgsByQueueId.get(queueId);
-                if (messageList == null) {
-                    messageList = new ArrayList<>();
-                    msgsByQueueId.put(queueId, messageList);
-                }
-                messageList.add(new Message(topic, tags, null, msg.getMessageBody().toJSONString().getBytes("UTF-8")));
+                List<Message> messageList = msgsByQueueId.computeIfAbsent(queueId, k -> new ArrayList<>());
+                messageList.add(new Message(topic, tags, null, msg.getMessageBody().toJSONString().getBytes(StandardCharsets.UTF_8)));
             }
             List<Message> messageList = msgsByQueueId.get(defaultQueueId);
             if (messageList != null) {
@@ -174,8 +170,8 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
                 producer.shutdown();
                 producer = null;
             } catch (Throwable t) {
-                if (LOG.isWarnEnabled()) {
-                    LOG.warn(t.getMessage(), t);
+                if (LOGGER.isWarnEnabled()) {
+                    LOGGER.warn(t.getMessage(), t);
                 }
             }
         }
@@ -195,7 +191,7 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
     @Override
     protected void createTopicIfNotExist(int splitNum) {
         if (StringUtil.isEmpty(topic)) {
-            LOG.error("Topic should be empty");
+            LOGGER.error("Topic should be empty");
             throw new RuntimeException("Topic should be empty");
         }
         DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
@@ -213,7 +209,7 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
                 CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
             for (String master : masterSet) {
                 defaultMQAdminExt.createAndUpdateTopicConfig(master, topicConfig);
-                LOG.info("Create topic to success: " + master);
+                LOGGER.info("Create topic to success: " + master);
             }
 
             if (this.order) {
@@ -228,10 +224,10 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
                 }
                 defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
                     orderConf.toString(), true);
-                System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", order, orderConf);
+                LOGGER.info("set cluster orderConf. isOrder={}, orderConf=[{}]", order, orderConf);
             }
         } catch (Exception e) {
-            LOG.error("Create topic error", e);
+            LOGGER.error("Create topic error", e);
             throw new RuntimeException("Create topic error " + topic, e);
         } finally {
             defaultMQAdminExt.shutdown();
@@ -241,10 +237,16 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
     @Override
     public List<ISplit<?, ?>> getSplitList() {
         initProducer();
-        List<ISplit<?, ?>> messageQueues;
+        List<ISplit<?, ?>> messageQueues = new ArrayList<>();
+        List<MessageQueue> metaqQueueSet = new ArrayList<>();
         try {
 
-            List<MessageQueue> metaqQueueSet = producer.fetchPublishMessageQueues(topic);
+            try {
+                metaqQueueSet = producer.fetchPublishMessageQueues(topic);
+            } catch (Exception e) {
+                producer.send(new Message(topic, "test", "test".getBytes(StandardCharsets.UTF_8)));
+                metaqQueueSet = producer.fetchPublishMessageQueues(topic);
+            }
             List<ISplit<?, ?>> queueList = new ArrayList<>();
             for (MessageQueue queue : metaqQueueSet) {
                 RocketMQMessageQueue rocketMQMessageQueue = new RocketMQMessageQueue(queue);
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannel.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannel.java
index d771c82..7869244 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannel.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannel.java
@@ -53,7 +53,7 @@ public class SyslogChannel extends AbstractChannel implements ISyslogRouter {
     protected String keywords;
     protected String ipListStr;
 
-    private transient org.graylog2.syslog4j.SyslogIF syslog;
+    private transient org.graylog2.syslog4j.SyslogIF syslogClient;
 
     public SyslogChannel() {
     }
@@ -74,7 +74,7 @@ public class SyslogChannel extends AbstractChannel implements ISyslogRouter {
 
     @Override
     protected ISink createSink() {
-        return new AbstractSink() {
+        this.sink= new AbstractSink() {
             @Override
             protected boolean batchInsert(List<IMessage> messages) {
                 if (messages == null || !syslogClientInit) {
@@ -95,32 +95,36 @@ public class SyslogChannel extends AbstractChannel implements ISyslogRouter {
                             encode = abstractSource.getEncoding();
                         }
                         message = URLDecoder.decode(msg.getMessageValue().toString(), encode);
-                        syslog.getConfig().setLocalName(IPUtil.getLocalIP());
-                        syslog.getConfig().setSendLocalTimestamp(true);
-                        syslog.getConfig().setSendLocalName(true);//如果这个值是false,需要确保json数据无空格
-                        //本机测试必须设置,否则ip地址变成了127.0.0.1,如果是远端server,必须注释掉这一行,否则server发生覆盖
-                        //syslog.getConfig().setHost(IPUtil.getLocalIP());
+                        syslogClient.getConfig().setLocalName(IPUtil.getLocalIP());
+                        syslogClient.getConfig().setSendLocalTimestamp(true);
+                        syslogClient.getConfig().setSendLocalName(true);//如果这个值是false,需要确保json数据无空格
+                        if("127.0.0.1".equals(syslogClient.getConfig().getHost())){
+                            //本机测试必须设置,否则ip地址变成了127.0.0.1,如果是远端server,必须注释掉这一行,否则server发生覆盖
+                            syslogClient.getConfig().setHost(IPUtil.getLocalIP());
+                        }
 
                     } catch (Exception e) {
-                        LOG.error("syslog decode message error " + msg.getMessageValue().toString(), e);
+                        LOG.error("syslogClient decode message error " + msg.getMessageValue().toString(), e);
                     }
-                    syslog.log(level, message);
+                    syslogClient.log(level, message);
                 }
-                syslog.flush();
+                syslogClient.flush();
                 return true;
             }
         };
+        return this.sink;
     }
 
     @Override
     protected ISource createSource() {
-        return new AbstractUnreliableSource() {
+         this.source=new AbstractUnreliableSource() {
             @Override
             protected boolean startSource() {
                 SyslogChannelManager.start(protol);
                 return true;
             }
         };
+         return this.source;
     }
 
     @Override
@@ -138,13 +142,12 @@ public class SyslogChannel extends AbstractChannel implements ISyslogRouter {
                     ipList.add(value);
                 }
             }
-            syslog = Syslog.getInstance(protol);
-            SyslogConfigIF config = syslog.getConfig();
+            syslogClient = Syslog.getInstance(protol);
+            SyslogConfigIF config = syslogClient.getConfig();
             config.setHost(serverIp);
-            config.setPort(
-                protol == SyslogChannelManager.UDP ? SyslogChannelManager.udpPort : SyslogChannelManager.tcpPort);
+            config.setPort((protol .equals( SyslogChannelManager.UDP) )? SyslogChannelManager.udpPort : SyslogChannelManager.tcpPort);
         } catch (Throwable throwable) {
-            LOG.warn("syslog client init fail " + throwable);
+            LOG.warn("syslogClient client init fail " + throwable);
             syslogClientInit = false;
         }
 
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelBuilder.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelBuilder.java
index 8b1ae92..60ddf52 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelBuilder.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelBuilder.java
@@ -41,6 +41,8 @@ public class SyslogChannelBuilder implements IChannelBuilder {
     public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
 
         SyslogChannel source = (SyslogChannel) ConfigurableUtil.create(SyslogChannel.class.getName(), namespace, name, createFormatProperty(properties), null);
+        source.createSource();
+        source.setType(ISource.TYPE);
         return source;
     }
 
@@ -52,6 +54,8 @@ public class SyslogChannelBuilder implements IChannelBuilder {
     @Override
     public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
         SyslogChannel sink = (SyslogChannel) ConfigurableUtil.create(SyslogChannel.class.getName(), namespace, name, createFormatProperty(properties), null);
+        sink.createSink();
+        sink.setType(ISink.TYPE);
         return sink;
     }
 
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelManager.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelManager.java
index 19826df..0e1be82 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelManager.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelManager.java
@@ -28,8 +28,8 @@ public class SyslogChannelManager {
     public static final String TCP_PORT_PROPERTY_KEY = "dipper.syslog.server.tcp.port";//当需要改变端口值时,通过配置文件增加dipper.syslog.server.tcp.port=新端口的值
     public static final String UDP_PORT_PROPERTY_KEY = "dipper.syslog.server.udp.port";//当需要改变端口值时,通过配置文件增加dipper.syslog.server.tcp.port=新端口的值
 
-    public transient static int tcpPort = 12345;//syslog server默认端口
-    public transient static int udpPort = 12346;//syslog server默认端口
+    public  static int tcpPort = 12345;//syslog server默认端口
+    public  static int udpPort = 12346;//syslog server默认端口
 
     private static AtomicBoolean tcpStart = new AtomicBoolean(false);//标记是否启动tcp server,只会启动一次
     private static AtomicBoolean updStart = new AtomicBoolean(false);//标记是否启动udp server,只会启动一次
@@ -39,7 +39,7 @@ public class SyslogChannelManager {
     public static void registeTCP(SyslogChannel syslogRouter) {
         if (!TCP_CHANNEL.getRouters().contains(syslogRouter)) {
             TCP_CHANNEL.getRouters().add(syslogRouter);
-            if (tcpPort == 12345) {
+            if (tcpPort == 12345&&syslogRouter.getPort()>0) {
                 tcpPort = syslogRouter.getPort();
             }
         }
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogServer.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogServer.java
index 4169c43..b792052 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogServer.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogServer.java
@@ -82,13 +82,16 @@ public class SyslogServer extends AbstractUnreliableSource {
         SyslogServerSessionEventHandlerIF var5 = new SyslogServerEventHandler();
         var3.addEventHandler(var5);
         setSingleType(true);//单消费者
-        return true;
+        return super.initConfigurable();
     }
 
     @Override public boolean startSource() {
         setReceiver(new IStreamOperator() {
             @Override public Object doMessage(IMessage message, AbstractContext context) {
-                String hostAddress = message.getMessageBody().getString("_hostAddress");
+                String hostAddress = message.getMessageBody().getString("hostAddress");
+                if(hostAddress==null){
+                    return null;
+                }
                 List<SyslogChannel> syslogChannels = cache.get(hostAddress);
                 LOG.info("receive syslog msg, ip is  " + hostAddress + " msg is " + message.getMessageBody());
                 boolean hasMatch = false;
@@ -118,7 +121,7 @@ public class SyslogServer extends AbstractUnreliableSource {
                 return message;
             }
         });
-        org.graylog2.syslog4j.server.SyslogServer.getThreadedInstance(protocol);
+        SyslogServerIF serverIF=org.graylog2.syslog4j.server.SyslogServer.getThreadedInstance(protocol);
         return true;
     }
 
@@ -194,7 +197,7 @@ public class SyslogServer extends AbstractUnreliableSource {
         }
 
         @Override public void event(Object var1, SyslogServerIF var2, SocketAddress var3, SyslogServerEventIF var4) {
-            JSONObject msg = new JSONObject();
+
             String hostAddress = null;
             if (InetSocketAddress.class.isInstance(var3)) {
                 InetSocketAddress address = (InetSocketAddress) var3;
@@ -230,19 +233,17 @@ public class SyslogServer extends AbstractUnreliableSource {
                 }
 
             }
-
-            msg.put("_facility", var4.getFacility());
-            msg.put("_hostName", hostName);
-            msg.put("_hostAddress", hostAddress);
-            msg.put("_level", var4.getLevel());
-            msg.put("_data", message);
-            msg.put("_date", DateUtil.format(date));
-            msg.put("_tag", tag);
-            msg.put("_pid", pid);
-            UserDefinedMessage userDefinedMessage = new UserDefinedMessage(message);
-            userDefinedMessage.putAll(msg);
-            userDefinedMessage.put(IMessage.DATA_KEY, message);
-            doReceiveMessage(userDefinedMessage);
+            JSONObject msg = new JSONObject();
+            msg.put("data",message);
+            msg.put("facility", var4.getFacility());
+            msg.put("hostName", hostName);
+            msg.put("hostAddress", hostAddress);
+            msg.put("level", var4.getLevel());
+            msg.put("log_time", DateUtil.format(date));
+            msg.put("tag", tag);
+            msg.put("pid", pid);
+
+            doReceiveMessage(msg);
 
         }
 
diff --git a/rocketmq-streams-channel-syslog/src/test/java/org/apache/rocketmq/streams/syslog/SyslogClient.java b/rocketmq-streams-channel-syslog/src/test/java/org/apache/rocketmq/streams/syslog/SyslogClient.java
index 31b6e5c..d3c1229 100644
--- a/rocketmq-streams-channel-syslog/src/test/java/org/apache/rocketmq/streams/syslog/SyslogClient.java
+++ b/rocketmq-streams-channel-syslog/src/test/java/org/apache/rocketmq/streams/syslog/SyslogClient.java
@@ -36,27 +36,35 @@ public class SyslogClient {
         syslogChannel.start(new IStreamOperator() {
             @Override
             public Object doMessage(IMessage message, AbstractContext context) {
-                System.out.println(message.getMessageBody());
+                if(!message.getHeader().isSystemMessage()){
+                    System.out.println(message.getMessageBody());
+                }
                 return null;
             }
         });
-        addData(syslogChannel);
+        System.out.println("start.....");
+        Thread.sleep(3000);
+        sendTestData();
         Thread.sleep(1000000000l);
     }
 
-    private void addData(IChannel channel) {
+
+    @Test
+    public void sendTestData() throws InterruptedException {
+        IChannel channel=createSyslogChannel();
         JSONObject msg = new JSONObject();
         msg.put("name", "chris");
-        //msg.put("host",IPUtil.getLocalIP());
+        msg.put("host",IPUtil.getLocalIP());
         channel.batchAdd(new Message(msg));
         channel.flush();
+        Thread.sleep(3000);
     }
 
     private SyslogChannel createSyslogChannel() {
-        SyslogChannel syslogChannel = new SyslogChannel();
-        syslogChannel.setUDPProtol();
+        SyslogChannel syslogChannel = new SyslogChannel(IPUtil.getLocalIP(),SyslogChannelManager.tcpPort);
+        syslogChannel.setTCPProtol();
         syslogChannel.addIps(IPUtil.getLocalIP());
-        syslogChannel.setServerIp("11.158.144.159");
+        System.out.println(IPUtil.getLocalIP());
         syslogChannel.init();
         return syslogChannel;
     }
diff --git a/rocketmq-streams-clients/pom.xml b/rocketmq-streams-clients/pom.xml
index 9444365..4c5d0d9 100644
--- a/rocketmq-streams-clients/pom.xml
+++ b/rocketmq-streams-clients/pom.xml
@@ -57,6 +57,14 @@
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-streams-filter</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-streams-channel-syslog</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-streams-channel-kafka</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-streams-dbinit</artifactId>
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/WindowStream.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/WindowStream.java
index 1ff3564..1547514 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/WindowStream.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/WindowStream.java
@@ -21,10 +21,16 @@ import java.util.Set;
 import org.apache.rocketmq.streams.client.transform.window.Time;
 import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
 import org.apache.rocketmq.streams.common.functions.ReduceFunction;
+import org.apache.rocketmq.streams.common.model.NameCreator;
+import org.apache.rocketmq.streams.common.model.NameCreatorContext;
 import org.apache.rocketmq.streams.common.topology.ChainStage;
 import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
 import org.apache.rocketmq.streams.common.topology.stages.udf.IReducer;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.script.operator.impl.AggregationScript;
+import org.apache.rocketmq.streams.script.service.IAccumulator;
+import org.apache.rocketmq.streams.script.service.udf.SimpleUDAFScript;
+import org.apache.rocketmq.streams.script.service.udf.UDAFScript;
 import org.apache.rocketmq.streams.window.operator.AbstractWindow;
 
 /**
@@ -90,10 +96,13 @@ public class WindowStream {
      * @return
      */
     public WindowStream count_distinct(String fieldName, String asName) {
-        String distinctName = "__" + fieldName + "_distinct_" + asName + "__";
-        String prefix = distinctName + "=distinct(" + fieldName + ")";
-        String suffix = asName + "=count(" + distinctName + ")";
-        window.getSelectMap().put(asName, prefix + ";" + suffix);
+        return count_distinct_2(fieldName,asName);
+    }
+
+    public WindowStream addUDAF(IAccumulator accumulator, String asName,String... fieldNames) {
+        AggregationScript.registUDAF(accumulator.getClass().getSimpleName(),accumulator.getClass());
+        String prefix = asName + "="+accumulator.getClass().getSimpleName()+"(" + MapKeyUtil.createKeyBySign(",",fieldNames)+")";
+        window.getSelectMap().put(asName,prefix);
         return this;
     }
 
diff --git a/rocketmq-streams-commons/pom.xml b/rocketmq-streams-commons/pom.xml
index 6c47abb..0e9f4cf 100755
--- a/rocketmq-streams-commons/pom.xml
+++ b/rocketmq-streams-commons/pom.xml
@@ -67,8 +67,8 @@
             <artifactId>commons-io</artifactId>
         </dependency>
         <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
         </dependency>
         <dependency>
             <groupId>com.google.code.gson</groupId>
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java
index 8b37668..3510707 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java
@@ -74,11 +74,16 @@ public abstract class AbstractChannel extends BasedConfigurable implements IChan
         create();
         if (sourceValue != null) {
             source = InstantiationUtil.deserializeObject(Base64Utils.decode(sourceValue));
-            source.init();
+            if(source!=null){
+                source.init();
+            }
+
         }
         if (sinkValue != null) {
             sink = InstantiationUtil.deserializeObject(Base64Utils.decode(sinkValue));
-            sink.init();
+            if(sink!=null){
+                sink.init();
+            }
         }
 
     }
@@ -245,4 +250,6 @@ public abstract class AbstractChannel extends BasedConfigurable implements IChan
 
     }
 
+
+
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
index c2f1d45..cc149b8 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
@@ -24,7 +24,7 @@ import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
 public abstract class AbstractSupportShuffleChannelBuilder implements IChannelBuilder, IShuffleChannelBuilder {
 
     @Override
-    public ISource copy(ISource pipelineSource) {
+    public ISource<?> copy(ISource<?> pipelineSource) {
         JSONObject jsonObject = JSONObject.parseObject(pipelineSource.toJson());
         return ConfigurableUtil.create(pipelineSource.getNameSpace(), pipelineSource.getConfigureName(), jsonObject, pipelineSource.getClass().getName());
     }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/IShuffleChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/IShuffleChannelBuilder.java
index a6e64f4..a03684f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/IShuffleChannelBuilder.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/IShuffleChannelBuilder.java
@@ -27,7 +27,7 @@ public interface IShuffleChannelBuilder {
      * @param pipelineSource
      * @return
      */
-    ISource copy(ISource pipelineSource);
+    ISource<?> copy(ISource<?> pipelineSource);
 
     /**
      * 根据数据源source 创建一个source shuffle出来
@@ -35,5 +35,5 @@ public interface IShuffleChannelBuilder {
      * @param pipelineSource
      * @return
      */
-    ISink createBySource(ISource pipelineSource);
+    ISink<?> createBySource(ISource<?> pipelineSource);
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/view/ViewSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/view/ViewSink.java
index 6a0d1b6..82265d4 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/view/ViewSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/view/ViewSink.java
@@ -17,36 +17,11 @@
 
 package org.apache.rocketmq.streams.common.channel.impl.view;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.ServiceLoader;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.cache.compress.BitSetCache;
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
-import org.apache.rocketmq.streams.common.component.ComponentCreator;
-import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
-import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
-import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.context.AbstractContext;
-import org.apache.rocketmq.streams.common.context.Context;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.optimization.IHomologousOptimization;
-import org.apache.rocketmq.streams.common.optimization.MessageGlobleTrace;
-import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintCache;
-import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintMetric;
-import org.apache.rocketmq.streams.common.threadpool.ThreadPoolFactory;
-import org.apache.rocketmq.streams.common.topology.ChainPipeline;
-import org.apache.rocketmq.streams.common.topology.model.Pipeline;
-import org.apache.rocketmq.streams.common.topology.task.TaskAssigner;
-import org.apache.rocketmq.streams.common.utils.CollectionUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
 
 public class ViewSink extends AbstractSink {
     private static final Log LOG = LogFactory.getLog(ViewSink.class);
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
index 79fb687..3b4c831 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
@@ -72,13 +72,13 @@ public abstract class AbstractSink extends BasedConfigurable implements ISink<Ab
     }
 
     @Override
-    public boolean batchAdd(IMessage message,  ISplit split) {
+    public boolean batchAdd(IMessage message,  ISplit<?,?> split) {
         message.getMessageBody().put(TARGET_QUEUE, split);
         return batchAdd(message);
     }
 
-    public ISplit getSplit(IMessage message) {
-        return (ISplit) message.getMessageBody().get(TARGET_QUEUE);
+    public ISplit<?,?> getSplit(IMessage message) {
+        return (ISplit<?,?>) message.getMessageBody().get(TARGET_QUEUE);
     }
 
     @Override
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/ISink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/ISink.java
index 83cbafc..fb3d055 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/ISink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/ISink.java
@@ -40,7 +40,7 @@ public interface ISink<T extends ISink> extends IConfigurable, IStageBuilder<T>,
      * @param message
      * @return
      */
-    boolean batchAdd(IMessage message, ISplit split);
+    boolean batchAdd(IMessage message, ISplit<?,?> split);
 
     /**
      * 根据channel推断 meta,或者不需要meta,如消息对垒
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache.java
index 837b58c..551fd74 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache.java
@@ -29,6 +29,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
 import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.threadpool.ThreadPoolFactory;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
 
 public abstract class AbstractMultiSplitMessageCache<R> extends MessageCache<R> {
     protected ConcurrentHashMap<String, MessageCache<IMessage>> queueMessageCaches = new ConcurrentHashMap();
@@ -38,9 +40,11 @@ public abstract class AbstractMultiSplitMessageCache<R> extends MessageCache<R>
     public AbstractMultiSplitMessageCache(
         IMessageFlushCallBack<R> flushCallBack) {
         super(null);
-        this.executorService = new ThreadPoolExecutor(10, 10,
-            0L, TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<Runnable>());
+//        this.executorService = new ThreadPoolExecutor(10, 10,
+//            0L, TimeUnit.MILLISECONDS,
+//            new LinkedBlockingQueue<Runnable>(), new ThreadPoolFactory.DipperThreadFactory("AbstractMultiSplitMessageCache"));
+        this.executorService = ThreadPoolFactory.createThreadPool(10, 10, 0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(), "AbstractMultiSplitMessageCache");
         this.flushCallBack = new MessageFlushCallBack(flushCallBack);
     }
 
@@ -98,7 +102,11 @@ public abstract class AbstractMultiSplitMessageCache<R> extends MessageCache<R>
             return 0;
         }
         if (splitIds.size() == 1) {
-            IMessageCache cache = queueMessageCaches.get(splitIds.iterator().next());
+            String spiltId = splitIds.iterator().next();
+            if(StringUtil.isEmpty(spiltId)){
+                return 0;
+            }
+            IMessageCache cache = queueMessageCaches.get(spiltId);
             if (cache == null) {
                 return 0;
             }
@@ -108,6 +116,9 @@ public abstract class AbstractMultiSplitMessageCache<R> extends MessageCache<R>
         }
         CountDownLatch countDownLatch = new CountDownLatch(splitIds.size());
         for (String splitId : splitIds) {
+            if(StringUtil.isEmpty(splitId)){
+                continue;
+            }
             executorService.execute(new Runnable() {
                 @Override public void run() {
                     IMessageCache cache = queueMessageCaches.get(splitId);
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
index 2ff6c20..3c79dd3 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
@@ -119,12 +119,13 @@ public abstract class AbstractSource extends BasedConfigurable implements ISourc
     /**
      * 做checkpoint的管理
      */
-    protected transient CheckPointManager checkPointManager = new CheckPointManager();
+    protected transient CheckPointManager checkPointManager = null;
 
     @Override
     protected boolean initConfigurable() {
         hasStart = new AtomicBoolean(false);
         openMock = false;
+        checkPointManager = new CheckPointManager();
         return super.initConfigurable();
     }
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractUnreliableSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractUnreliableSource.java
index 608f108..4eab84f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractUnreliableSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractUnreliableSource.java
@@ -40,7 +40,7 @@ import org.apache.rocketmq.streams.common.disruptor.DisruptorProducer;
 public abstract class AbstractUnreliableSource extends AbstractBatchSource {
     private static final Log LOG = LogFactory.getLog(AbstractUnreliableSource.class);
 
-    protected Boolean enableAsyncReceive = true;
+    protected Boolean enableAsyncReceive = false;
     protected boolean isSingleType = false;//是否只有单个生产者,如果是,则为true
 
     private transient ExecutorService cachedThreadPool = null;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configurable/BasedConfigurable.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configurable/BasedConfigurable.java
index 9543066..180e991 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configurable/BasedConfigurable.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configurable/BasedConfigurable.java
@@ -165,7 +165,7 @@ public class BasedConfigurable extends AbstractConfigurable {
             }
             DataType dataType = DataTypeUtil.createFieldDataType(this, field.getName());
             String fieldJsonStr = jsonObject.getString(field.getName());
-            fieldJsonStr = getENVParamter(field, fieldJsonStr);
+            fieldJsonStr = getENVParameter(field, fieldJsonStr);
             Object fieldValue = dataType.getData(fieldJsonStr);
             if (fieldValue != null) {
                 ReflectUtil.setBeanFieldValue(this, field.getName(), fieldValue);
@@ -183,7 +183,7 @@ public class BasedConfigurable extends AbstractConfigurable {
      * @param fieldValue
      * @return
      */
-    protected String getENVParamter(Field field, String fieldValue) {
+    protected String getENVParameter(Field field, String fieldValue) {
         ENVDependence dependence = field.getAnnotation(ENVDependence.class);
         if (dependence == null) {
             return fieldValue;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/AbstractContext.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/AbstractContext.java
index 95c2706..4e45b60 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/AbstractContext.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/AbstractContext.java
@@ -96,6 +96,7 @@ public abstract class AbstractContext<T extends IMessage> extends HashMap {
         this.isBreak = subContext.isBreak;
         this.quickFilterResult = subContext.quickFilterResult;
         this.homologousResult = subContext.homologousResult;
+        this.isContinue=subContext.isContinue;
     }
 
     public <C extends AbstractContext<T>> C syncSubContext(C subContext) {
@@ -109,6 +110,7 @@ public abstract class AbstractContext<T extends IMessage> extends HashMap {
         subContext.isBreak = isBreak;
         subContext.quickFilterResult = quickFilterResult;
         subContext.homologousResult = homologousResult;
+        subContext.isContinue=isContinue;
         return subContext;
     }
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/ConsoleMonitorManager.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/ConsoleMonitorManager.java
index 293041a..b5f8fd7 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/ConsoleMonitorManager.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/ConsoleMonitorManager.java
@@ -79,15 +79,17 @@ public class ConsoleMonitorManager {
                 try {
                     queryValidTraceIds();
                     Map<String, JobStage> jobStageMap = cache;
-                    cache = new ConcurrentHashMap();
+                    synchronized (this) {
+                        cache = new ConcurrentHashMap();
+                    }
                     long current = System.currentTimeMillis();
                     for (JobStage jobStage : jobStageMap.values()) {
                         jobStage.setMachineName("");
-                        JSONObject msgObj = (JSONObject) jobStage.getLastInputMsgObj().clone();
-                        String msg = msgObj.toJSONString();
-                        if (msg != null && !"".equalsIgnoreCase(msg)) {
-                            jobStage.setLastInputMsg(msg);
-                        }
+//                        jobStage.setLastInputMsgObj(new JSONObject());
+//                        String msg = msgObj.toJSONString();
+//                        if (msg != null && !"".equalsIgnoreCase(msg)){
+//                            jobStage.setLastInputMsg(msg);
+//                        }
                         jobStage.setInput(jobStage.getSafeInput().getAndSet(0));
                         jobStage.setOutput(jobStage.getSafeOutput().getAndSet(0));
 
@@ -123,7 +125,7 @@ public class ConsoleMonitorManager {
                     LOG.error("ConsoleMonitorManager report error!", e);
                 }
             }
-        }, 1, 20, TimeUnit.SECONDS);
+        }, 20, 30, TimeUnit.SECONDS);
     }
 
     public Set<String> getValidTraceIds() {
@@ -168,7 +170,7 @@ public class ConsoleMonitorManager {
 //        }
 
         jobStage.getSafeInput().incrementAndGet();
-        jobStage.setLastInputMsgObj(msg);
+//        jobStage.setLastInputMsgObj(msg);
         if (clientTime != 0) {
             jobStage.setLastInputMsgTime(new Date(clientTime));
         } else {
@@ -208,7 +210,8 @@ public class ConsoleMonitorManager {
         JSONObject msg = message.getMessageBody();
         JobStage jobStage = getJobStage(stage.getLabel());
         jobStage.getSafeInput().incrementAndGet();
-        jobStage.setLastInputMsgObj(msg);
+//        jobStage.setLastInputMsgObj(msg);
+//        jobStage.setLastInputMsg(msg.toJSONString());
         jobStage.setLastInputMsgTime(new Date());
 
         String traceId = message.getHeader().getTraceId();
@@ -262,7 +265,7 @@ public class ConsoleMonitorManager {
         }
     }
 
-    public JobStage getJobStage(String uniqKey) {
+    public synchronized JobStage getJobStage(String uniqKey) {
 //        String key = createKey(uniqKey);
         JobStage jobStage = cache.get(uniqKey);
         if (jobStage == null) {
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/DataSyncConstants.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/DataSyncConstants.java
index 5d91131..612e15a 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/DataSyncConstants.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/DataSyncConstants.java
@@ -23,11 +23,21 @@ public class DataSyncConstants {
      */
     public static final String RULE_UP_TOPIC = "dipper.console.topic.up";
 
+   /**
+     * rocketmq-stream更新模块对应的topic
+     */
+    public static final String RULE_UP_TAG = "dipper.console.topic.up.tags";
+
     /**
      * rocketmq-stream更新模块对应的topic
      */
     public static final String RULE_DOWN_TOPIC = "dipper.console.topic.down";
 
+    /**
+     * rocketmq-stream更新模块对应的topic
+     */
+    public static final String RULE_DOWN_TAG = "dipper.console.topic.down.tags";
+
     /**
      * rocketmq-stream更新模块对应的tag
      */
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/IMonitor.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/IMonitor.java
index 6a8f96f..51baeed 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/IMonitor.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/IMonitor.java
@@ -1,19 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/TopologyFilterMonitor.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/TopologyFilterMonitor.java
index 320462a..ca8c498 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/TopologyFilterMonitor.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/TopologyFilterMonitor.java
@@ -1,19 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/service/impl/RocketMQMonitorDataSyncImpl.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/service/impl/RocketMQMonitorDataSyncImpl.java
index da30436..5a64b1d 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/service/impl/RocketMQMonitorDataSyncImpl.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/service/impl/RocketMQMonitorDataSyncImpl.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.streams.common.monitor.service.impl;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -36,7 +37,7 @@ import org.apache.rocketmq.streams.common.monitor.model.JobStage;
 import org.apache.rocketmq.streams.common.monitor.model.TraceIdsDO;
 import org.apache.rocketmq.streams.common.monitor.model.TraceMonitorDO;
 import org.apache.rocketmq.streams.common.monitor.service.MonitorDataSyncService;
-import org.apache.rocketmq.streams.common.utils.IPUtil;
+import org.apache.rocketmq.streams.common.utils.CompressUtil;
 import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
 
 public class RocketMQMonitorDataSyncImpl implements MonitorDataSyncService {
@@ -111,9 +112,10 @@ public class RocketMQMonitorDataSyncImpl implements MonitorDataSyncService {
 
         protected Long pullIntervalMs;
         protected String ruleUpTopic = ComponentCreator.getProperties().getProperty(DataSyncConstants.RULE_UP_TOPIC);
+        protected String ruleUpTag = ComponentCreator.getProperties().getProperty(DataSyncConstants.RULE_UP_TAG, "T_MSG_DIPPER_RULE");
         protected String ruleDownTopic = ComponentCreator.getProperties().getProperty(DataSyncConstants.RULE_DOWN_TOPIC);
-        protected String tags = "*";
-        public String CHARSET = "UTF-8";
+        protected String ruleDownTag = ComponentCreator.getProperties().getProperty(DataSyncConstants.RULE_DOWN_TAG, "T_MSG_DIPPER_RULE_PUSH");
+        public  String CHARSET = "UTF-8";
 
         protected transient DefaultMQProducer producer;
 
@@ -126,17 +128,18 @@ public class RocketMQMonitorDataSyncImpl implements MonitorDataSyncService {
         protected DefaultMQPushConsumer initConsumer() {
             try {
 //                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(IPUtil.getLocalIdentification().replaceAll("\\.", "_"));
-                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jobconfig_comsumer");
+                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jobconfig_comsumer" + RuntimeUtil.getPid());
                 if (pullIntervalMs != null) {
                     consumer.setPullInterval(pullIntervalMs);
                 }
 //                consumer.setNamesrvAddr(this.namesrvAddr);
-                consumer.subscribe(ruleDownTopic, tags);
+                consumer.subscribe(ruleDownTopic, ruleDownTag);
                 consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
                     try {
                         int i = 0;
                         for (MessageExt msg : msgs) {
                             String ruleMsg = new String(msg.getBody(), CHARSET);
+                            LOG.info("receive message is :" + ruleMsg);
                             dealMessage(ruleMsg);
                         }
                     } catch (Exception e) {
@@ -168,9 +171,26 @@ public class RocketMQMonitorDataSyncImpl implements MonitorDataSyncService {
             sendMsg(msg, ruleUpTopic);
         }
 
+//        protected void sendMsg(byte[] msg) {
+//            sendMsg(msg, ruleUpTopic);
+//        }
+
         protected void sendMsg(JSONObject msg, String topic) {
             try {
-                Message message = new Message(topic, tags, null, msg.toJSONString().getBytes("UTF-8"));
+//                byte[] bytes = CompressUtil.gZip(msg.toJSONString());
+                byte[] bytes = msg.toJSONString().getBytes(StandardCharsets.UTF_8);
+                LOG.info("sendMsg is: " + msg.toJSONString() + "   topic is: " + topic + " tag is: " + ruleUpTag + "  byte length is " + bytes.length);
+                Message message = new Message(topic, ruleUpTag, null, bytes);
+                producer.send(message);
+            } catch (Exception e) {
+                LOG.error("updater sendMsg error: ", e);
+                e.printStackTrace();
+            }
+        }
+
+        protected void sendMsg(byte[] msg, String topic) {
+            try {
+                Message message = new Message(topic, ruleUpTag, null, msg);
                 producer.send(message);
             } catch (Exception e) {
                 LOG.error("updater sendMsg error: ", e);
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/schedule/ScheduleManager.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/schedule/ScheduleManager.java
index 8da230e..53c12b6 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/schedule/ScheduleManager.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/schedule/ScheduleManager.java
@@ -29,7 +29,7 @@ public class ScheduleManager {
     protected List<ScheduleTask> scheduleTasks = new ArrayList<>();
     protected AtomicBoolean isStart = new AtomicBoolean(false);
     protected ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
-    protected ExecutorService executorService = ThreadPoolFactory.createThreadPool(2, 50);
+    protected ExecutorService executorService = ThreadPoolFactory.createThreadPool(2, 50, "ScheduleManager");
     private static ScheduleManager scheduleManager = new ScheduleManager();
 
     public static ScheduleManager getInstance() {
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/threadpool/ThreadPoolFactory.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/threadpool/ThreadPoolFactory.java
index f194026..af1e88e 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/threadpool/ThreadPoolFactory.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/threadpool/ThreadPoolFactory.java
@@ -16,24 +16,59 @@
  */
 package org.apache.rocketmq.streams.common.threadpool;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class ThreadPoolFactory {
-    public static ExecutorService createThreadPool(int coreSize){
-        ExecutorService executorService= new ThreadPoolExecutor(coreSize, coreSize,
-            1000*60L, TimeUnit.MILLISECONDS,
-            new SynchronousQueue<Runnable>(),new ThreadPoolExecutor.CallerRunsPolicy());
+    public static ExecutorService createThreadPool(int coreSize, String poolNamePrefix) {
+        ExecutorService executorService = new ThreadPoolExecutor(coreSize, coreSize,
+            1000 * 60L, TimeUnit.MILLISECONDS,
+            new SynchronousQueue<Runnable>(), new DipperThreadFactory(poolNamePrefix), new ThreadPoolExecutor.CallerRunsPolicy());
         return executorService;
     }
 
+    public static ExecutorService createThreadPool(int min, int max, String poolNamePrefix) {
+        ExecutorService executorService = new ThreadPoolExecutor(min, max,
+            1000 * 60L, TimeUnit.MILLISECONDS,
+            new SynchronousQueue<Runnable>(), new DipperThreadFactory(poolNamePrefix), new ThreadPoolExecutor.CallerRunsPolicy());
+        return executorService;
+    }
 
-    public static ExecutorService createThreadPool(int min,int max){
-        ExecutorService executorService= new ThreadPoolExecutor(min, max,
-            1000*60L, TimeUnit.MILLISECONDS,
-            new SynchronousQueue<Runnable>(),new ThreadPoolExecutor.CallerRunsPolicy());
+    public static ExecutorService createThreadPool(int min, int max, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> workQueue, String poolNamePrefix) {
+        ExecutorService executorService = new ThreadPoolExecutor(min, max,
+            keepAliveTime, timeUnit,
+            workQueue, new DipperThreadFactory(poolNamePrefix), new ThreadPoolExecutor.CallerRunsPolicy());
         return executorService;
     }
+    public static class DipperThreadFactory implements ThreadFactory {
+        private final AtomicInteger poolNumber = new AtomicInteger(1);
+        private final ThreadGroup group;
+        private final AtomicInteger threadNumber = new AtomicInteger(1);
+        private final String namePrefix;
+
+        public DipperThreadFactory(String poolNamePrefix) {
+            SecurityManager s = System.getSecurityManager();
+            group = (s != null) ? s.getThreadGroup() :
+                Thread.currentThread().getThreadGroup();
+            namePrefix = poolNamePrefix + "-dipper-" +
+                poolNumber.getAndIncrement() +
+                "-thread-";
+        }
+
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(group, r,
+                namePrefix + threadNumber.getAndIncrement(),
+                0);
+            if (t.isDaemon())
+                t.setDaemon(false);
+            if (t.getPriority() != Thread.NORM_PRIORITY)
+                t.setPriority(Thread.NORM_PRIORITY);
+            return t;
+        }
+    }
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/AbstractMutilPipelineChainPipline.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/AbstractMutilPipelineChainPipline.java
index 1b556d0..1742cbe 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/AbstractMutilPipelineChainPipline.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/AbstractMutilPipelineChainPipline.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
 import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
 import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
@@ -31,9 +32,12 @@ import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.Context;
 import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.topology.model.AbstractStage;
 import org.apache.rocketmq.streams.common.topology.model.IStageHandle;
 import org.apache.rocketmq.streams.common.topology.model.Pipeline;
+import org.apache.rocketmq.streams.common.topology.stages.JoinChainStage;
 import org.apache.rocketmq.streams.common.topology.stages.UnionChainStage;
+import org.apache.rocketmq.streams.common.topology.stages.WindowChainStage;
 import org.apache.rocketmq.streams.common.utils.CollectionUtil;
 
 /**
@@ -47,7 +51,7 @@ public abstract class AbstractMutilPipelineChainPipline<T extends IMessage> exte
      */
     protected List<String> piplineNames = new ArrayList<>();
     //每个pipline,对应一个消息来源,在消息头上会有消息来源的name,根据name转发数据
-    protected Map<String, String> piplineName2MsgSourceName;
+    protected Map<String, Set<String>> piplineName2MsgSourceName;
 
     /**
      * piplineNames的对象表示
@@ -63,31 +67,34 @@ public abstract class AbstractMutilPipelineChainPipline<T extends IMessage> exte
             String msgSourceName = message.getHeader().getMsgRouteFromLable();
             if (piplines.size() > 0) {
                 List<IMessage> messages = new ArrayList<>();
-                Iterator<Entry<String, String>> it = piplineName2MsgSourceName.entrySet().iterator();
+                Iterator<Entry<String, Set<String>>> it = piplineName2MsgSourceName.entrySet().iterator();
                 while (it.hasNext()) {
-                    Entry<String, String> entry = it.next();
+                    Entry<String, Set<String>> entry = it.next();
                     String piplineName = entry.getKey();
-                    String value = entry.getValue();
-                    if (msgSourceName != null && msgSourceName.equals(value)) {//如果来源数据的标签和map中的相同,转发这条消息给对应的pipline
-                        ChainPipeline pipline = piplines.get(piplineName);
-                        IMessage copyMessage = message.deepCopy();
-                        //copyMessage.getMessageBody().put(ORI_MESSAGE_KEY,message.getMessageBody());
-                        // 保留一份最原始的数据,后续对字段的修改不影响这个字段
-                        Context newContext = new Context(copyMessage);
-                        copyMessage.getHeader().setMsgRouteFromLable(msgSourceName);
-                        boolean needReturn = executePipline(pipline, copyMessage, newContext, msgSourceName);
-                        if (needReturn) {
-                            return message;
-                        }
-                        if (newContext.isContinue()) {
-                            if (newContext.isSplitModel()) {
-                                messages.addAll(newContext.getSplitMessages());
-                            } else {
-                                messages.add(copyMessage);
+                    Set<String> values = entry.getValue();
+                    for(String value:values){
+                        if (msgSourceName != null && msgSourceName.equals(value)) {//如果来源数据的标签和map中的相同,转发这条消息给对应的pipline
+                            ChainPipeline pipline = piplines.get(piplineName);
+                            IMessage copyMessage = message.deepCopy();
+                            //copyMessage.getMessageBody().put(ORI_MESSAGE_KEY,message.getMessageBody());
+                            // 保留一份最原始的数据,后续对字段的修改不影响这个字段
+                            Context newContext = new Context(copyMessage);
+                            copyMessage.getHeader().setMsgRouteFromLable(msgSourceName);
+                            boolean needReturn = executePipline(pipline, copyMessage, newContext, msgSourceName);
+                            if (needReturn) {
+                                return message;
                             }
+                            if (newContext.isContinue()) {
+                                if (newContext.isSplitModel()) {
+                                    messages.addAll(newContext.getSplitMessages());
+                                } else {
+                                    messages.add(copyMessage);
+                                }
 
+                            }
                         }
                     }
+
                 }
                 for (IMessage msg : messages) {
                     msg.getHeader().setMsgRouteFromLable(msgSourceName);
@@ -174,6 +181,15 @@ public abstract class AbstractMutilPipelineChainPipline<T extends IMessage> exte
             if (chainPipline != null) {
                 piplineMap.put(chainPipline.getConfigureName(), chainPipline);
             }
+            List<AbstractStage<?>>  stages= chainPipline.getStages();
+            for(AbstractStage stage:stages){
+                 if(WindowChainStage.class.isInstance(stage)){
+                    ((WindowChainStage)stage).getWindow().setFireReceiver(getReceiverAfterCurrentNode());
+                }else if(JoinChainStage.class.isInstance(stage)){
+                     ((JoinChainStage)stage).getWindow().setFireReceiver(getReceiverAfterCurrentNode());
+                 }
+            }
+
         }
         this.piplines = piplineMap;
     }
@@ -192,15 +208,18 @@ public abstract class AbstractMutilPipelineChainPipline<T extends IMessage> exte
         return piplines;
     }
 
-    public Map<String, String> getPiplineName2MsgSourceName() {
+    public Map<String, Set<String>> getPiplineName2MsgSourceName() {
         return piplineName2MsgSourceName;
     }
 
+    public void setPiplineName2MsgSourceName(
+        Map<String, Set<String>> piplineName2MsgSourceName) {
+        this.piplineName2MsgSourceName = piplineName2MsgSourceName;
+    }
+
     public ChainPipeline getPipeline(String pipelineName){
         return this.piplines.get(pipelineName);
     }
 
-    public void setPiplineName2MsgSourceName(Map<String, String> piplineName2MsgSourceName) {
-        this.piplineName2MsgSourceName = piplineName2MsgSourceName;
-    }
+
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/Pipeline.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/Pipeline.java
index d98d71d..049fd6c 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/Pipeline.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/Pipeline.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
 import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
 import org.apache.rocketmq.streams.common.optimization.MessageGlobleTrace;
 import org.apache.rocketmq.streams.common.optimization.fingerprint.PreFingerprint;
+import org.apache.rocketmq.streams.common.topology.ChainPipeline;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 
 /**
@@ -176,7 +177,12 @@ public class Pipeline<T extends IMessage> extends BasedConfigurable implements I
             T lastMsg = null;
             for (T subT : oldSplits) {
                 context.closeSplitMode(subT);
-                subT.getHeader().setMsgRouteFromLable(t.getHeader().getMsgRouteFromLable());
+                if(ChainPipeline.class.isInstance(this)&&!((ChainPipeline)this).isTopology()&&StringUtil.isNotEmpty(this.msgSourceName)){
+                    subT.getHeader().setMsgRouteFromLable(t.getHeader().getMsgRouteFromLable());
+                }else {
+                    subT.getHeader().setMsgRouteFromLable(t.getHeader().getMsgRouteFromLable());
+                }
+
                 subT.getHeader().addLayerOffset(splitMessageOffset);
                 splitMessageOffset++;
                 boolean isContinue = doMessage(subT, stage, context);
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
index abe8158..4385855 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
 import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.topology.ChainPipeline;
 import org.apache.rocketmq.streams.common.topology.ChainStage;
 import org.apache.rocketmq.streams.common.topology.model.IWindow;
 
@@ -78,7 +79,9 @@ public abstract class AbstractWindowStage<T extends IMessage> extends ChainStage
     @Override
     public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
         window = configurableService.queryConfigurable(IWindow.TYPE, windowName);
-        window.setFireReceiver(getReceiverAfterCurrentNode());
+        if (((ChainPipeline)getPipeline()).isTopology()) {
+            window.setFireReceiver(getReceiverAfterCurrentNode());
+        }
         if (Boolean.TRUE.equals(Boolean.valueOf(ComponentCreator.getProperties().getProperty(ConfigureFileKey.DIPPER_RUNNING_STATUS, ConfigureFileKey.DIPPER_RUNNING_STATUS_DEFAULT)))) {
             window.windowInit();
         }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/ViewChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/ViewChainStage.java
index d8c7ad1..8335b90 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/ViewChainStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/ViewChainStage.java
@@ -71,13 +71,13 @@ public class ViewChainStage<T extends IMessage> extends OutputChainStage<T> impl
     /**
      * Homologous expression result cache
      */
-    protected int homologousRulesCaseSize = 2000000;
-    protected int homologousExpressionCaseSize = 2000000;
+    protected int homologousRulesCaseSize = 10000;
+    protected int homologousExpressionCaseSize = 10000;
 
     /**
      * Pre fingerprint filtering
      */
-    protected int preFingerprintCaseSize = 2000000;
+    protected int preFingerprintCaseSize = 10000;
     protected int parallelTasks = 4;
     /**
      * fingerprint cache
@@ -280,8 +280,8 @@ public class ViewChainStage<T extends IMessage> extends OutputChainStage<T> impl
             }
         }
 
-        if (this.parallelTasks > 0) {
-            executorService = ThreadPoolFactory.createThreadPool(this.parallelTasks);
+        if (this.parallelTasks > 0 && executorService == null) {
+            executorService = ThreadPoolFactory.createThreadPool(this.parallelTasks, "ViewChainStage-");
         }
     }
     @Override
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/task/StreamsTask.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/task/StreamsTask.java
index 09b3d44..38a6e7a 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/task/StreamsTask.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/task/StreamsTask.java
@@ -18,34 +18,15 @@ package org.apache.rocketmq.streams.common.topology.task;
 
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
-import java.util.ServiceLoader;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.cache.compress.BitSetCache;
-import org.apache.rocketmq.streams.common.component.ComponentCreator;
 import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
 import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
 import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
-import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.context.AbstractContext;
-import org.apache.rocketmq.streams.common.context.Context;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
-import org.apache.rocketmq.streams.common.optimization.IHomologousOptimization;
-import org.apache.rocketmq.streams.common.optimization.MessageGlobleTrace;
-import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintCache;
-import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintMetric;
-import org.apache.rocketmq.streams.common.threadpool.ThreadPoolFactory;
 import org.apache.rocketmq.streams.common.topology.ChainPipeline;
 import org.apache.rocketmq.streams.common.topology.model.Pipeline;
-import org.apache.rocketmq.streams.common.utils.CollectionUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
 
 /**
  * run one or multi pipeline's
@@ -56,11 +37,12 @@ public class StreamsTask extends BasedConfigurable implements IAfterConfigurable
     public static final String TYPE = "stream_task";
 
 
-
-
     /**
      * 任务的状态,目前有started,stopped俩种, 任务序列化保存在数据库
      */
+    public static final String STATE_STARTED = "started";
+    public static final String STATE_STOPPED = "stopped";
+
     protected String state = "stopped";
     /**
      * 在当前进程中任务的状态
@@ -72,14 +54,6 @@ public class StreamsTask extends BasedConfigurable implements IAfterConfigurable
     protected transient List<ChainPipeline<?>> pipelines = new ArrayList<>();
     protected List<String> pipelineNames = new ArrayList<>();
 
-
-
-
-
-
-
-
-
     public StreamsTask() {
         setType(TYPE);
     }
@@ -98,7 +72,6 @@ public class StreamsTask extends BasedConfigurable implements IAfterConfigurable
         }
     }
 
-
     @Override public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
 
         List<ChainPipeline<?>> newPipelines = new ArrayList<>();
@@ -138,12 +111,6 @@ public class StreamsTask extends BasedConfigurable implements IAfterConfigurable
 
     }
 
-
-
-
-
-
-
     /**
      * start one pipeline
      *
@@ -154,9 +121,6 @@ public class StreamsTask extends BasedConfigurable implements IAfterConfigurable
         thread.start();
     }
 
-
-
-
     public List<ChainPipeline<?>> getPipelines() {
         return pipelines;
     }
@@ -170,8 +134,6 @@ public class StreamsTask extends BasedConfigurable implements IAfterConfigurable
         this.pipelineNames = pipelineNames;
     }
 
-
-
     public List<String> getPipelineNames() {
         return pipelineNames;
     }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ContantsUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ContantsUtil.java
index 7f0513a..261515d 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ContantsUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ContantsUtil.java
@@ -21,12 +21,13 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.rocketmq.streams.common.model.NameCreator;
 
 public class ContantsUtil {
     private static final List<String> CONSTANTS_SIGNS = new ArrayList<>();//对于特殊字符优先处理,里面存储需要特殊处理的字符
     private static final Map<String, String> CONSTANTS_SIGNS_REPLACE = new HashMap<>();// 特殊字符和替换字符的映射
     private static final Map<String, String> CONSTANTS_REPLACE_SIGNS = new HashMap<>();//替换字符和特殊字符的映射
-
+    private static final NameCreator EscapesNameCreator=new NameCreator();
     static {
         CONSTANTS_SIGNS.add("\\\\");
         CONSTANTS_SIGNS.add("\\\"");
@@ -51,6 +52,23 @@ public class ContantsUtil {
         CONSTANTS_REPLACE_SIGNS.put("#######", "'");
     }
 
+    public static String replaceEscape(String str,Map<String,String> flag2Escapes){
+        int index=str.indexOf("\\");
+        if(index==-1){
+            return str;
+        }
+        String word=str.substring(index,index+1);
+        String flag= EscapesNameCreator.createName("escapes");
+        str=str.substring(0,index)+flag+str.substring(index+1);
+        return replaceEscape(str,flag2Escapes);
+    }
+    public static void main(String[] args) {
+        String str = "34432\"fs";
+        Map<String, String> replaceEscape = new HashMap<>(16);
+        String value = replaceEscape(str, replaceEscape);
+        System.out.println(value);
+    }
+
     /**
      * 替换特殊字符为替换字符串
      *
@@ -328,18 +346,7 @@ public class ContantsUtil {
         return true;
     }
 
-    public static void main(String[] args) {
-        String str = "splitarray('da''fsfds''ta','fdsdfs')";
-        Map<String, String> flag2ExpressionStr = new HashMap<>(16);
-        String value = doConstantReplace(str, flag2ExpressionStr, 1);
-        String[] values = value.split(",");
-        int i = 0;
-        for (String v : values) {
-            values[i] = restore(v, flag2ExpressionStr);
-            System.out.println(values[i]);
-        }
-        System.out.println(value);
-    }
+
 
     public static boolean containContant(String jsonValue) {
         if (isContant(jsonValue)) {
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/FileUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/FileUtil.java
index e4a00cc..bbdce31 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/FileUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/FileUtil.java
@@ -30,6 +30,7 @@ import java.io.OutputStream;
 import java.net.JarURLConnection;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.net.URLConnection;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
@@ -957,8 +958,14 @@ public class FileUtil {
     }
 
     public static void main(String[] args) {
-        File target = new File("/Users/yd/Downloads/test.html");
-        downloadFile("https://zhuanlan.zhihu.com/p/60383884", target);
+        long begin = System.currentTimeMillis();
+//        File file = new File("/Users/yd/Documents/tert.jar");
+//        downloadFile("https://yundun-bigdata.oss-cn-qingdao.aliyuncs.com/download/dipper/linux64/1.0.0/rocketmq-stream-sql_20220225112207911.jar",
+//            file);
+        downloadNet("https://yundun-bigdata.oss-cn-qingdao.aliyuncs.com/download/dipper/linux64/1.0.0/rocketmq-stream-sql_20220225112207911.jar",
+            "/Users/yd/Documents/tert.jar");
+        long end = System.currentTimeMillis();
+        System.out.println("用时=====" + (end - begin));
     }
 
     /**
@@ -1014,4 +1021,31 @@ public class FileUtil {
 //        return dest_file+File.separator+fileName;
     }
 
+    public static void downloadNet(String packageUrl, String destPath)  {
+        // 下载网络文件
+        int bytesum = 0;
+        int byteread = 0;
+
+
+
+        try {
+            URL url = new URL(packageUrl);
+            URLConnection conn = url.openConnection();
+            InputStream inStream = conn.getInputStream();
+            FileOutputStream fs = new FileOutputStream(destPath);
+
+            byte[] buffer = new byte[2048];
+            int length;
+            while ((byteread = inStream.read(buffer)) != -1) {
+                bytesum += byteread;
+                System.out.println(bytesum);
+                fs.write(buffer, 0 , byteread);
+            }
+        } catch (FileNotFoundException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
 }
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
index f7c2942..2d54a3e 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
@@ -165,7 +165,7 @@ public class FileConfigureService extends AbstractConfigurableService {
         String name = getColumnValue(values, 2, "name");
         String jsonValue = getColumnValue(values, 3, "json_value");
         try {
-            //jsonValue = AESUtil.aesDecrypt(jsonValue, ComponentCreator.getProperties().getProperty(ConfigureFileKey.SECRECY, ConfigureFileKey.SECRECY_DEFAULT));
+            jsonValue = AESUtil.aesDecrypt(jsonValue, ComponentCreator.getProperties().getProperty(ConfigureFileKey.SECRECY, ConfigureFileKey.SECRECY_DEFAULT));
         } catch (Exception e) {
             LOG.error("failed in decrypting the value, reason:\t" + e.getCause());
             throw new RuntimeException(e);
@@ -242,7 +242,7 @@ public class FileConfigureService extends AbstractConfigurableService {
         }
         String theSecretValue = null;
         try {
-            theSecretValue =configure.toJson();AESUtil.aesEncrypt(configure.toJson(), ComponentCreator.getProperties().getProperty(ConfigureFileKey.SECRECY, ConfigureFileKey.SECRECY_DEFAULT));
+            theSecretValue = AESUtil.aesEncrypt(configure.toJson(), ComponentCreator.getProperties().getProperty(ConfigureFileKey.SECRECY, ConfigureFileKey.SECRECY_DEFAULT));
         } catch (Exception e) {
             LOG.error("failed in encrypting the value, reason:\t" + e.getCause());
             throw new RuntimeException(e);
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java
index eafd34a..def6f27 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
 import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
 import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
 import org.apache.rocketmq.streams.common.dboperator.IDBDriver;
+import org.apache.rocketmq.streams.common.threadpool.ThreadPoolFactory;
 import org.apache.rocketmq.streams.common.utils.NumberUtils;
 import org.apache.rocketmq.streams.common.utils.SQLUtil;
 import org.apache.rocketmq.streams.db.driver.JDBCDriver;
@@ -88,9 +89,8 @@ public abstract class AbstractIntelligenceCache extends BasedConfigurable implem
 
     public AbstractIntelligenceCache() {
         setType(TYPE);
-        executorService = new ThreadPoolExecutor(20, 20,
-            0L, TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<Runnable>(1000));
+        executorService = ThreadPoolFactory.createThreadPool(20, 20,0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(1000), "AbstractIntelligenceCache");
         scheduledExecutorService = new ScheduledThreadPoolExecutor(3);
     }
 
diff --git a/rocketmq-streams-examples/README.md b/rocketmq-streams-examples/README.md
deleted file mode 100644
index 16c7d38..0000000
--- a/rocketmq-streams-examples/README.md
+++ /dev/null
@@ -1,184 +0,0 @@
-## rocketmq-streams-examples
-
-
-### 1、File source example
-逐行读取文件数据,并打印出来。
-```java
-public class FileSourceExample {
-    public static void main(String[] args) {
-        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
-        source.fromFile("data.txt", false)
-                .map(message -> message)
-                .toPrint(1)
-                .start();
-    }
-}
-
-```
-
-
-### 2、分时间段,统计分组中某字段的和
-
-
-#### 2.1 安装 Apache RocketMQ
-可以参考[Apache RocketMQ 搭建文档](https://rocketmq.apache.org/docs/quick-start/)
-
-#### 2.2 源数据
-[源数据](./../rocketmq-streams-examples/src/main/resources/data.txt)
-```xml
-{"InFlow":"1","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"0"}
-{"InFlow":"2","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"1"}
-{"InFlow":"3","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"2"}
-{"InFlow":"4","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"3"}
-{"InFlow":"5","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"4"}
-{"InFlow":"6","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"5"}
-{"InFlow":"7","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"6"}
-{"InFlow":"8","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"7"}
-{"InFlow":"9","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"8"}
-{"InFlow":"10","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"9"}
-```
-
-#### 2.3 代码示例
-
-[代码示例](./../rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java)
-
-
-#### 2.4 结果说明
-这个例子中,使用 rocketmq-streams 消费 rocketmq 中的数据,并按照 ProjectName 和 LogStore 两个字段联合分组统计,两个字段的值相同,分为一组。
-分别统计每组的InFlow和OutFlow两字段累计和。
-
-data.text数据运行的结果部分如下:
-
-```xml
-"InFlow":22,"total":4,"ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":18
-"InFlow":18,"total":3,"ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":15
-"InFlow":15,"total":3,"ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":12
-```
-可见"ProjectName":"ProjectName-0","LogStore":"LogStore-0"分组公有4条数据,"ProjectName":"ProjectName-2","LogStore":"LogStore-2",3条数据。
-"ProjectName":"ProjectName-1","LogStore":"LogStore-1"分组3条数据,总共10条数据。结果与源数据一致。
-
-### 3、网页点击统计
-#### 3.1、数据说明
-原始数据为resources路径下的[pageClickData.txt](./../rocketmq-streams-examples/src/main/resources/pageClickData.txt)
-
-第一列是用户id,第二列是用户点击时间,最后一列是网页地址
-```xml
-{"userId":"1","eventTime":"1631700000000","method":"GET","url":"page-1"}
-{"userId":"2","eventTime":"1631700030000","method":"POST","url":"page-2"}
-{"userId":"3","eventTime":"1631700040000","method":"GET","url":"page-3"}
-{"userId":"1","eventTime":"1631700050000","method":"DELETE","url":"page-2"}
-{"userId":"1","eventTime":"1631700060000","method":"DELETE","url":"page-2"}
-{"userId":"2","eventTime":"1631700070000","method":"POST","url":"page-3"}
-{"userId":"3","eventTime":"1631700080000","method":"GET","url":"page-1"}
-{"userId":"1","eventTime":"1631700090000","method":"GET","url":"page-2"}
-{"userId":"2","eventTime":"1631700100000","method":"PUT","url":"page-3"}
-{"userId":"4","eventTime":"1631700120000","method":"POST","url":"page-1"}
-```
-
-#### 3.1、统计某段时间窗口内用户点击网页次数
-[代码示例](./../rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/UsersDimension.java)
-
-结果:
-```xml
-{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"SPVGTV6DaXmxV5mGNzQixQ==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","userId":"2"}
-{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"dzAZ104qjUAwzTE6gbKSPA==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","userId":"3"}
-{"start_time":"2021-09-15 18:00:00","total":2,"windowInstanceId":"wrTTyU5DiDkrAb6669Ig9w==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","userId":"1"}
-{"start_time":"2021-09-15 18:01:00","total":1,"windowInstanceId":"vabkmx14xHsJ7G7w16vwug==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","userId":"3"}
-{"start_time":"2021-09-15 18:01:00","total":2,"windowInstanceId":"YIgEKptN2Wf+Oq2m8sEcYw==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","userId":"2"}
-{"start_time":"2021-09-15 18:01:00","total":2,"windowInstanceId":"iYKnwMYAzXFJYbO1KvDnng==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","userId":"1"}
-{"start_time":"2021-09-15 18:02:00","total":1,"windowInstanceId":"HBojuU6/2F/6llkyefECxw==","offset":53892181100000001,"end_time":"2021-09-15 18:03:00","userId":"4"}
-```
-
-在时间范围 18:00:00- 18:01:00内:
-
-|userId|点击次数|
-|------|---|
-|   1  | 2 |
-|   2  | 1 |
-|   3  | 1 |
-
-在时间范围 18:01:00- 18:02:00内:
-
-|userId|点击次数|
-|------|---|
-|   1  | 2 |
-|   2  | 2 |
-|   3  | 1 |
-
-在时间范围 18:02:00- 18:03:00内:
-
-|userId|点击次数|
-|------|---|
-|   4  | 1 | 
-
-可查看原数据文件,eventTime为时间字段,简单检查后上述结果与预期相符合。
-
-#### 3.2、统计某段时间窗口内,被点击次数最多的网页
-[代码示例](./../rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/PageDimension.java)
-
-运行结果:
-```xml
-{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"wrTTyU5DiDkrAb6669Ig9w==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","url":"page-1"}
-{"start_time":"2021-09-15 18:00:00","total":2,"windowInstanceId":"seECZRcaQSRsET1rDc6ZAw==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","url":"page-2"}
-{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"dzAZ104qjUAwzTE6gbKSPA==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","url":"page-3"}
-{"start_time":"2021-09-15 18:01:00","total":2,"windowInstanceId":"uCqvAeaLTYRnjQm8dCZOvw==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","url":"page-2"}
-{"start_time":"2021-09-15 18:01:00","total":3,"windowInstanceId":"vabkmx14xHsJ7G7w16vwug==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","url":"page-3"}
-{"start_time":"2021-09-15 18:02:00","total":1,"windowInstanceId":"NdgwYMT8azNMu55NUIvygg==","offset":53892181100000001,"end_time":"2021-09-15 18:03:00","url":"page-1"}
-
-```
-在时间窗口18:00:00 - 18:01:00 内,有4条数据;
-
-在时间窗口18:01:00 - 18:02:00 内,有5条数据;
-
-在时间窗口18:02:00 - 18:03:00 内,有1条数据;
-
-分钟统计窗口内,被点击次数最多的网页.
-得到上述数据后,需要按照窗口进行筛选最大值,需要再次计算。
-代码:
-```java
-    public void findMax() {
-        DataStreamSource source = StreamBuilder.dataStream("ns-1", "pl-1");
-        source.fromFile("/home/result.txt", false)
-        .map(message -> JSONObject.parseObject((String) message))
-        .window(TumblingWindow.of(Time.seconds(5)))
-        .groupBy("start_time","end_time")
-        .max("total")
-        .waterMark(1)
-        .setLocalStorageOnly(true)
-        .toDataSteam()
-        .toPrint(1)
-        .start();
-   }
-
-```
-得到结果:
-```xml
-{"start_time":"2021-09-17 11:09:35","total":"2","windowInstanceId":"kRRpe2hPEQtEuTkfnXUaHg==","offset":54040181100000001,"end_time":"2021-09-17 11:09:40"}
-{"start_time":"2021-09-17 11:09:35","total":"3","windowInstanceId":"kRRpe2hPEQtEuTkfnXUaHg==","offset":54040181100000002,"end_time":"2021-09-17 11:09:40"}
-{"start_time":"2021-09-17 11:09:35","total":"1","windowInstanceId":"kRRpe2hPEQtEuTkfnXUaHg==","offset":54040181100000003,"end_time":"2021-09-17 11:09:40"}
-```
-
-可以得到三个窗口中网页点击次数最多分别是2次,1次,3次。
-
-### 4、Rocketmq-streams 多客户端消费
-#### 4.1、数据说明
-源数据由[data.txt](./../rocketmq-streams-examples/src/main/resources/data.txt)组成,反复发送100遍,总共生产1000条数据。
-#### 4.2、代码实例
-[代码示例](./../rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MutilStreamsClientTest.java)
-
-代码中读取data.txt文件反复发送100遍,发送1000条数据。同时,开启两个消费者,每个消费者独立消费数据,然后做window聚合。
-代码目的:
-    通过两个独立消费者,组成消费者组,同时消费相同topic数据,达到当1个消费者消费不过来时横向扩容的效果,通过打印出来"total"字段值的和判断两个消费者是否总共消费了1000条数据。
-
-#### 4.3、结果说明
-结果数据下所示,可计算各行total对应值之和为1000,表明的却两个消费者达到了并发消费的效果,计算无误,达到了扩容目的。
-```xml
-
-{"start_time":"2021-09-27 14:10:10","InFlow":1144,"total":208,"windowInstanceId":"gYZ3tv/5ohgHrwF6tIFgoQ==","offset":54915025100000001,"ProjectName":"ProjectName-0","LogStore":"LogStore-0","end_time":"2021-09-27 14:10:20","OutFlow":936}
-{"start_time":"2021-09-27 14:10:10","InFlow":936,"total":156,"windowInstanceId":"gYZ3tv/5ohgHrwF6tIFgoQ==","offset":54915025100000002,"ProjectName":"ProjectName-2","LogStore":"LogStore-2","end_time":"2021-09-27 14:10:20","OutFlow":780}
-{"start_time":"2021-09-27 14:10:10","InFlow":780,"total":156,"windowInstanceId":"gYZ3tv/5ohgHrwF6tIFgoQ==","offset":54915025100000003,"ProjectName":"ProjectName-1","LogStore":"LogStore-1","end_time":"2021-09-27 14:10:20","OutFlow":624}
-{"start_time":"2021-09-27 14:10:20","InFlow":1056,"total":192,"windowInstanceId":"4YnbFAgSzeDt5qpo+Is/5w==","offset":54915035100000001,"ProjectName":"ProjectName-0","LogStore":"LogStore-0","end_time":"2021-09-27 14:10:30","OutFlow":864}
-{"start_time":"2021-09-27 14:10:20","InFlow":720,"total":144,"windowInstanceId":"4YnbFAgSzeDt5qpo+Is/5w==","offset":54915035100000002,"ProjectName":"ProjectName-1","LogStore":"LogStore-1","end_time":"2021-09-27 14:10:30","OutFlow":576}
-{"start_time":"2021-09-27 14:10:20","InFlow":864,"total":144,"windowInstanceId":"4YnbFAgSzeDt5qpo+Is/5w==","offset":54915035100000003,"ProjectName":"ProjectName-2","LogStore":"LogStore-2","end_time":"2021-09-27 14:10:30","OutFlow":720}
-
-```
\ No newline at end of file
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/expression/CompareFunction.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/expression/CompareFunction.java
index e0596a7..b6f08dd 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/expression/CompareFunction.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/expression/CompareFunction.java
@@ -20,6 +20,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.datatype.DataType;
+import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
 import org.apache.rocketmq.streams.common.utils.ReflectUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.filter.operator.expression.Expression;
@@ -27,7 +29,7 @@ import org.apache.rocketmq.streams.filter.operator.var.Var;
 import org.apache.rocketmq.streams.script.utils.FunctionUtils;
 
 public abstract class CompareFunction extends AbstractExpressionFunction {
-
+    public static String VAR_PREFIX="&&&&##$$%^*";//标识一个表达式的值式一个变量,是一个特殊处理。这个标识会放到值的前面。如变量式uuid,值会变成&&&&##$$%^*uuid
     private static final Log LOG = LogFactory.getLog(CompareFunction.class);
 
     @Override
@@ -39,6 +41,7 @@ public abstract class CompareFunction extends AbstractExpressionFunction {
         String varName = expression.getVarName();
         Var var =expression.getVar();
         varValue = var.doMessage(message, context);
+
         /**
          * 两个数字比较的情况
          */
@@ -61,9 +64,16 @@ public abstract class CompareFunction extends AbstractExpressionFunction {
         if (basicValue == null || basicVarValue == null) {
             return false;
         }
+        DataType dataType=expression.getDataType();
+        if(String.class.isInstance(basicValue)&&basicValue.toString().startsWith(VAR_PREFIX)){
+            String valueVarName=basicValue.toString().replace(VAR_PREFIX,"");
+            basicValue=message.getMessageBody().get(valueVarName);
+            dataType= DataTypeUtil.getDataTypeFromClass(varValue.getClass());
+            basicVarValue=dataType.getData(varValue.toString());
+        }
 
-        Class varClass = basicVarValue == null ? expression.getDataType().getDataClass() : basicVarValue.getClass();
-        Class valueClass = basicValue == null ? expression.getDataType().getDataClass() : basicValue.getClass();
+        Class varClass = basicVarValue == null ? dataType.getDataClass() : basicVarValue.getClass();
+        Class valueClass = basicValue == null ? dataType.getDataClass() : basicValue.getClass();
         try {
             match = (Boolean) ReflectUtil.invoke(this, "compare",
                 new Class[] {varClass, valueClass},
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/homologous/HomologousCompute.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/homologous/HomologousCompute.java
index 438d114..35e808f 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/homologous/HomologousCompute.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/homologous/HomologousCompute.java
@@ -74,7 +74,7 @@ public class HomologousCompute {
         this.commonExpressions = commonExpressions;
         Map<String, SameVarExpressionGroup> groupBySourceVarName = groupBySourceVarName(commonExpressions);
         this.groupBySourceVarName = groupBySourceVarName;
-        registHyperscan(groupBySourceVarName);
+//        registHyperscan(groupBySourceVarName);
         createExpressionIndexAndHomologousVar(groupBySourceVarName);
         fingerprintCache = new FingerprintCache(cacheSize);
     }
diff --git a/rocketmq-streams-schedule/src/main/java/org/apache/rocketmq/streams/schedule/job/ConfigurableExecutorJob.java b/rocketmq-streams-schedule/src/main/java/org/apache/rocketmq/streams/schedule/job/ConfigurableExecutorJob.java
index 4ffc2d5..2b2c3ed 100644
--- a/rocketmq-streams-schedule/src/main/java/org/apache/rocketmq/streams/schedule/job/ConfigurableExecutorJob.java
+++ b/rocketmq-streams-schedule/src/main/java/org/apache/rocketmq/streams/schedule/job/ConfigurableExecutorJob.java
@@ -35,21 +35,21 @@ public class ConfigurableExecutorJob implements StatefulJob {
 
     @Override
     public void execute(JobExecutionContext context) throws JobExecutionException {
-        IScheduleExecutor channelExecutor = null;
-
-        try {
-            JobDetail jobDetail = context.getJobDetail();
-            channelExecutor = (IScheduleExecutor)jobDetail.getJobDataMap().get(IScheduleExecutor.class.getName());
-            channelExecutor.doExecute();
-        } catch (Exception e) {
-            //降低日志量
-            //            LOG.error("schedule error "+channelExecutor.toString(),e);
-            IMonitor startupMonitor = MonitorFactory.getOrCreateMonitor(MapKeyUtil.createKey(MonitorFactory.PIPLINE_START_UP, channelExecutor.getNameSpace()));
-            IMonitor monitor = startupMonitor.createChildren(channelExecutor.getConfigureName());
-            monitor.addContextMessage(JSON.parse(channelExecutor.toString()));
-            String name = MapKeyUtil.createKeyBySign(".", channelExecutor.getNameSpace(), channelExecutor.getConfigureName());
-            monitor.occureError(e, name + " schedule error", e.getMessage());
-        }
+//        IScheduleExecutor channelExecutor = null;
+//
+//        try {
+//            JobDetail jobDetail = context.getJobDetail();
+//            channelExecutor = (IScheduleExecutor)jobDetail.getJobDataMap().get(IScheduleExecutor.class.getName());
+//            channelExecutor.doExecute();
+//        } catch (Exception e) {
+//            //降低日志量
+//            //            LOG.error("schedule error "+channelExecutor.toString(),e);
+//            IMonitor startupMonitor = MonitorFactory.getOrCreateMonitor(MapKeyUtil.createKey(MonitorFactory.PIPLINE_START_UP, channelExecutor.getNameSpace()));
+//            IMonitor monitor = startupMonitor.createChildren(channelExecutor.getConfigureName());
+//            monitor.addContextMessage(JSON.parse(channelExecutor.toString()));
+//            String name = MapKeyUtil.createKeyBySign(".", channelExecutor.getNameSpace(), channelExecutor.getConfigureName());
+//            monitor.occureError(e, name + " schedule error", e.getMessage());
+//        }
 
     }
 }
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/distinct/DistinctFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/distinct/DistinctFunction.java
index a9cd627..6a35b51 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/distinct/DistinctFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/distinct/DistinctFunction.java
@@ -37,7 +37,6 @@ public class DistinctFunction {
                 }
             }
         }
-        cache = new KeySet(MAX_SIZE);
         String key = MapKeyUtil.createKey(keys);
         boolean success = cache.contains(key);
         if (!success) {
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/json/JsonCreatorFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/json/JsonCreatorFunction.java
index 7a85b94..800fc3c 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/json/JsonCreatorFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/json/JsonCreatorFunction.java
@@ -32,7 +32,7 @@ public class JsonCreatorFunction {
     @FunctionMethod(value = "json_merge", alias = "jsonMerge", comment = "根据字段来组合json")
     public String extraJsonByField(IMessage message, FunctionContext context,
                                    @FunctionParamter(value = "json", comment = "字段名列表") String jsonFieldName) {
-        jsonFieldName = FunctionUtils.getValueString(message, context, jsonFieldName);
+        jsonFieldName = FunctionUtils.getConstant( jsonFieldName);
         JSONObject msg = message.getMessageBody().getJSONObject(jsonFieldName);
         message.getMessageBody().putAll(msg);
         return null;
@@ -177,7 +177,7 @@ public class JsonCreatorFunction {
     @FunctionMethod(value = "jsonExpand", alias = "json_expand", comment = "展开一个json中的json")
     public void expandElement(IMessage message, FunctionContext context,
                               @FunctionParamter(value = "array", comment = "代表要移除key的字段名或常量列表") String jsonSubFieldName) {
-        jsonSubFieldName = FunctionUtils.getValueString(message, context, jsonSubFieldName);
+        jsonSubFieldName = FunctionUtils.getConstant(jsonSubFieldName);
         String jsonValue = message.getMessageBody().getString(jsonSubFieldName);
         if (jsonValue == null) {
             return;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/json/UDTFFieldNameFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/json/UDTFFieldNameFunction.java
new file mode 100644
index 0000000..4f3e8f4
--- /dev/null
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/json/UDTFFieldNameFunction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.script.function.impl.json;
+
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.script.annotation.Function;
+import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
+import org.apache.rocketmq.streams.script.annotation.FunctionParamter;
+import org.apache.rocketmq.streams.script.context.FunctionContext;
+import org.apache.rocketmq.streams.script.utils.FunctionUtils;
+
+@Function
+public class UDTFFieldNameFunction {
+
+    @FunctionMethod(value = "addAliasForNewField",comment = "获取msg中的json数据")
+    public Object addAliasForNewField(IMessage message, FunctionContext context,
+        @FunctionParamter(value = "string", comment = "代表json的字段名或常量") String fieldName,String alias,int i){
+        fieldName=FunctionUtils.getConstant(fieldName);
+        alias=FunctionUtils.getConstant(alias);
+        Object object=message.getMessageBody().get(fieldName);
+        if(message.getMessageBody().containsKey("f"+i)) {
+            object=message.getMessageBody().get("f"+i);
+            message.getMessageBody().put(alias+fieldName,object);
+        }
+        else if(message.getMessageBody().containsKey(fieldName)&&!message.getMessageBody().containsKey("f"+i)){
+            message.getMessageBody().put(alias+fieldName,object);
+        }
+        return object;
+    }
+
+
+}
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/GrokFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/GrokFunction.java
index ea93a84..babbe39 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/GrokFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/GrokFunction.java
@@ -43,9 +43,9 @@ public class GrokFunction {
          传入自定义的pattern, 会从已注册的patterns里面进行配对, 例如: TIMESTAMP_ISO8601:timestamp1, TIMESTAMP_ISO8601在注册的
          patterns里面有对应的解析格式, 配对成功后, 会在match时按照固定的解析格式将解析结果存入map中, 此处timestamp1作为输出的key
           */
-        grokStr = FunctionUtils.getValueString(message, context, grokStr);
+        grokStr = FunctionUtils.getConstant( grokStr);
         Grok grok = grokCompiler.compile(grokStr);
-        fieldName = FunctionUtils.getValueString(message, context, fieldName);
+        fieldName = FunctionUtils.getConstant(fieldName);
         String logMsg = message.getMessageBody().getString(fieldName);
         // 通过match()方法进行匹配, 对log进行解析, 按照指定的格式进行输出
         Match grokMatch = grok.match(logMsg);
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/Paser2JsonFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/Paser2JsonFunction.java
index ebd4ad8..767153b 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/Paser2JsonFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/Paser2JsonFunction.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.script.annotation.Function;
 import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
 import org.apache.rocketmq.streams.script.context.FunctionContext;
+import org.apache.rocketmq.streams.script.utils.FunctionUtils;
 
 @Function
 public class Paser2JsonFunction {
@@ -40,23 +41,25 @@ public class Paser2JsonFunction {
      * @param context
      */
     @FunctionMethod(value = "spread_json", alias = "autoJson", comment = "原始数据是嵌套json或jsonArray调用此方法会自动展开成单层")
-    public void spread2Json(IMessage message, FunctionContext context) {
-        JSONObject jsonObject = null;
-        if (message.isJsonMessage()) {
-            jsonObject = message.getMessageBody();
-        }
+    public void spread2Json(IMessage message, FunctionContext context,String fieldName) {
+        fieldName= FunctionUtils.getConstant(fieldName);
+
+        JSONObject jsonObject = message.getMessageBody().getJSONObject(fieldName);
+//        if (message.isJsonMessage()) {
+//            jsonObject = message.getMessageBody();
+//        }
         List<String> jsonArrayNames = new ArrayList<>();
         jsonObject = spreadJson(jsonObject, jsonArrayNames);
         if (jsonArrayNames.size() > 0) {
             List<JSONObject> jsonObjects = spreadJsonArray(jsonObject, jsonArrayNames);
             for (JSONObject tmp : jsonObjects) {
                 IMessage copyMessage = message.copy();
-                copyMessage.setMessageBody(tmp);
+                copyMessage.getMessageBody().putAll(tmp);
                 context.getSplitMessages().add(copyMessage);
             }
             context.openSplitModel();
         } else {
-            message.setMessageBody(jsonObject);
+            message.getMessageBody().putAll(jsonObject);
         }
     }
 
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/PaserBySplitFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/PaserBySplitFunction.java
index dc1ae43..219e6f2 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/PaserBySplitFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/PaserBySplitFunction.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.streams.script.annotation.Function;
 import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
 import org.apache.rocketmq.streams.script.annotation.FunctionParamter;
 import org.apache.rocketmq.streams.script.context.FunctionContext;
+import org.apache.rocketmq.streams.script.function.model.FunctionType;
 import org.apache.rocketmq.streams.script.utils.FunctionUtils;
 
 @Function
@@ -37,14 +38,13 @@ public class PaserBySplitFunction {
 
     @FunctionMethod(value = "paserByComma", comment = "根据英文逗号分割字符串")
     public JSONObject paserByComma(IMessage message, FunctionContext context,
-                                   @FunctionParamter(value = "string", comment = "代表字符串的字段名") String fieldName,
-                                   @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String... keyNames) {
+                                   @FunctionParamter(value = "string", comment = "代表字符串的字段名") String fieldName) {
         String log = FunctionUtils.getValueString(message, context, fieldName);
         Map<String, String> flags = (Map<String, String>)context.get(CONST_MAP_KEY);
         if (flags == null) {
             flags = new HashMap<>();
         }
-        return parseBySplit(message, context, log, fieldName, ",", flags, keyNames);
+        return parseBySplit(message, context, log, fieldName, ",", flags);
     }
 
     /**
@@ -53,21 +53,19 @@ public class PaserBySplitFunction {
      * @param message
      * @param context
      * @param asciiDec 十进制的ascii码
-     * @param keyNames
      * @return
      */
-    @FunctionMethod(value = "paserByAsciiSplit", comment = "根据char分割字符串,其中char通过ascii码转换过来,常用于使用不可见字符做分割")
+    @FunctionMethod(value = "paserBySign", comment = "根据char分割字符串,其中char通过ascii码转换过来,常用于使用不可见字符做分割")
     public JSONObject paserByAsciiSplit(IMessage message, FunctionContext context,
                                         @FunctionParamter(value = "string", comment = "代表字符串的字段名") String fieldName,
-                                        @FunctionParamter(value = "string", comment = "代表分割符") String asciiDec,
-                                        @FunctionParamter(value = "string", comment = "预计的字段名称") String... keyNames) {
+                                        @FunctionParamter(value = "string", comment = "代表分割符") String asciiDec) {
         char splitSign = (char)Integer.parseInt(asciiDec);
         String log = FunctionUtils.getValueString(message, context, fieldName);
         Map<String, String> flags = (Map<String, String>)context.get(CONST_MAP_KEY);
         if (flags == null) {
             flags = new HashMap<>();
         }
-        return parseBySplit(message, context, log, fieldName, String.valueOf(splitSign), flags, keyNames);
+        return parseBySplit(message, context, log, fieldName, String.valueOf(splitSign), flags);
     }
 
     @FunctionMethod(value = "split", alias = "paserBySplit", comment = "通过分割符来进行日志解析")
@@ -76,8 +74,7 @@ public class PaserBySplitFunction {
                                   @FunctionParamter(value = "boolean", comment = "是否需要预先处理带括号的数据") boolean needBacket,
                                   @FunctionParamter(value = "boolean", comment = "是否预先处理时间类型的数据") boolean needDate,
                                   @FunctionParamter(value = "string", comment = "代表字符串的字段名") String fieldName,
-                                  @FunctionParamter(value = "string", comment = "代表分割符") String sign,
-                                  @FunctionParamter(value = "string", comment = "预计的字段名称") String... names) {
+                                  @FunctionParamter(value = "string", comment = "代表分割符") String sign) {
         Map<String, String> flags = new HashMap<>();
         fieldName = FunctionUtils.getValueString(message, context, fieldName);
         String log = FunctionUtils.getValueString(message, context, fieldName);
@@ -91,21 +88,20 @@ public class PaserBySplitFunction {
             log = LogParserUtil.parseDate(log, flags);
         }
         sign = FunctionUtils.getValueString(message, context, sign);
-        return parseBySplit(message, context, log, fieldName, sign, flags, names);
+        return parseBySplit(message, context, log, fieldName, sign, flags);
     }
 
     @FunctionMethod(value = "split", alias = "paserBySplit", comment = "通过分割符来进行日志解析")
     public JSONObject parseBySign(IMessage message, FunctionContext context,
                                   @FunctionParamter(value = "string", comment = "代表字符串的字段名") String fieldName,
-                                  @FunctionParamter(value = "string", comment = "代表分割符") String sign,
-                                  @FunctionParamter(value = "string", comment = "预计的字段名称") String... names) {
-        sign = FunctionUtils.getValueString(message, context, sign);
-        String log = FunctionUtils.getValueString(message, context, fieldName);
+                                  @FunctionParamter(value = "string", comment = "代表分割符") String sign) {
+        sign = FunctionUtils.getConstant(sign);
+        String log = FunctionUtils.getValueString(message,context,fieldName);
         Map<String, String> flags = (Map<String, String>)context.get(CONST_MAP_KEY);
         if (flags == null) {
             flags = new HashMap<>();
         }
-        return parseBySplit(message, context, log, fieldName, sign, flags, names);
+        return parseBySplit(message, context, log, fieldName, sign, flags);
     }
 
     /**
@@ -135,28 +131,16 @@ public class PaserBySplitFunction {
      * @param fieldName 字段名称
      * @param sign      分割符号
      * @param flags     常量和原始值的映射
-     * @param names     预计的字段名称,这里的名称未做常量处理
      * @return
      */
-    private JSONObject parseBySplit(IMessage message, FunctionContext context, String log, String fieldName, String sign, Map<String, String> flags, String... names) {
+    private JSONObject parseBySplit(IMessage message, FunctionContext context, String log, String fieldName, String sign, Map<String, String> flags) {
         if (signs.containsKey(sign)) {
             sign = signs.get(sign);
         }
         String[] values = log.split(sign);
         Map jsonObject = new HashMap();
         for (int i = 0; i < values.length; i++) {
-            String name = null;
-            if (names != null && names.length > i) {
-                name = names[i];
-                if (StringUtil.isEmpty(name)) {
-                    name = fieldName + i;
-                } else {
-                    name = FunctionUtils.getValueString(message, context, name);
-                }
-
-            } else {
-                name = fieldName + i;
-            }
+            String name = FunctionType.UDTF.getName() +i;
             String value = values[i];
             String tmp = flags.get(value);
             if (StringUtil.isNotEmpty(tmp)) {
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/RegexParserFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/RegexParserFunction.java
index a23a7c0..18df058 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/RegexParserFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/parser/RegexParserFunction.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.streams.script.annotation.Function;
 import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
 import org.apache.rocketmq.streams.script.annotation.FunctionParamter;
 import org.apache.rocketmq.streams.script.context.FunctionContext;
+import org.apache.rocketmq.streams.script.function.model.FunctionType;
 import org.apache.rocketmq.streams.script.utils.FunctionUtils;
 
 @Function
@@ -36,13 +37,11 @@ public class RegexParserFunction {
     @FunctionMethod(value = "paserByRegex", comment = "通过正则解析实例日志")
     public String paserByRegex(IMessage message, FunctionContext context,
                                @FunctionParamter(value = "string", comment = "代表字符串的字段名") String fieldName,
-                               @FunctionParamter(value = "string", comment = "正则表达式") String regex,
-                               @FunctionParamter(value = "string", comment = "正则表达式中的字段") String... keyNames) {
-        if (message.isJsonMessage()) {
-            return message.getMessageBody().toJSONString();
-        }
+                               @FunctionParamter(value = "string", comment = "正则表达式") String regex) {
+
         String log = FunctionUtils.getValueString(message, context, fieldName);
-        JSONObject jsonObject = parseLog(regex, fieldName, log, keyNames);
+        regex=FunctionUtils.getConstant(regex);
+        JSONObject jsonObject = parseLog(regex, fieldName, log);
         if (jsonObject == null) {
             context.breakExecute();
         }
@@ -54,11 +53,10 @@ public class RegexParserFunction {
      * 解析实例日志
      *
      * @param regex    正则表达式
-     * @param keyNames 正则表达式中的字段
      * @param log      日志
      * @return regex和解析的字段和内容的对应关系
      */
-    public static JSONObject parseLog(String regex, String fieldName, String log, String... keyNames) {
+    public static JSONObject parseLog(String regex, String fieldName, String log) {
         JSONObject result = new JSONObject();
         Pattern pattern = Pattern.compile(regex);
         Matcher matcher = pattern.matcher(log);
@@ -67,15 +65,7 @@ public class RegexParserFunction {
             return null;
         }
         for (int i = 1; i <= matcher.groupCount(); i++) {
-            String name = null;
-            if ((i - 1) < keyNames.length) {
-                name = keyNames[i - 1];
-                if (StringUtil.isEmpty(name)) {
-                    name = fieldName + (i - 1);
-                }
-            } else {
-                name = fieldName + (i - 1);
-            }
+            String name = FunctionType.UDTF.getName()+i;
             result.put(name, matcher.group(i));
         }
         return result;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/AggregationScript.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/AggregationScript.java
index d7b6553..0ef1441 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/AggregationScript.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/AggregationScript.java
@@ -196,6 +196,10 @@ public class AggregationScript implements IStreamOperator<IMessage, List<IMessag
         return null;
     }
 
+
+    public static void registUDAF(String functionName,Class accumulator){
+        aggregationEngineMap.put(functionName,accumulator);
+    }
     public Object getAccumulator() {
         return accumulator;
     }
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/IAccumulator.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/IAccumulator.java
index a533e00..b5c6144 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/IAccumulator.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/IAccumulator.java
@@ -16,10 +16,12 @@
  */
 package org.apache.rocketmq.streams.script.service;
 
+import java.io.Serializable;
+
 /**
  * UDAF的标准接口,所有的udaf都要实现这个接口。 Blink UDAF也是生成这个接口的实现类来完成转化的
  */
-public interface IAccumulator<T, ACC> {
+public interface IAccumulator<T, ACC> extends Serializable {
 
     String ACCUMULATOR_VALUE = "value";
 
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/udf/SimpleUDAFScript.java
similarity index 58%
copy from rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java
copy to rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/udf/SimpleUDAFScript.java
index 87a2b3e..ebb90d1 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/udf/SimpleUDAFScript.java
@@ -14,23 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.streams.db.sink;
+package org.apache.rocketmq.streams.script.service.udf;
 
-import org.apache.rocketmq.streams.common.channel.split.ISplit;
-import org.apache.rocketmq.streams.common.context.IMessage;
+public class SimpleUDAFScript extends UDAFScript {
 
-public class SplitByTimeMultiTableSink extends AbstractMultiTableSink {
-    public SplitByTimeMultiTableSink(String url, String userName, String password) {
-        super(url, userName, password);
+    public SimpleUDAFScript(){
+        this.accumulateMethodName = "accumulate";
+        this.createAccumulatorMethodName = "createAccumulator";
+        this.getValueMethodName = "getValue";
+        this.retractMethodName = "retract";
+        this.mergeMethodName = "merge";
+        this.methodName = "eval";
+        this.initMethodName = "open";
+        this.initParameters = new Object[0];
     }
 
-    @Override
-    protected String createTableName(String splitId) {
-        return null;
-    }
-
-    @Override
-    protected ISplit getSplitFromMessage(IMessage message) {
-        return null;
+    @Override protected Object createMergeParamters(Iterable its) {
+        return its;
     }
 }
diff --git a/rocketmq-streams-script/src/test/java/org/apache/rocketmq/streams/script/function/FunctionTest.java b/rocketmq-streams-script/src/test/java/org/apache/rocketmq/streams/script/function/FunctionTest.java
index 2c921fc..75a0d31 100644
--- a/rocketmq-streams-script/src/test/java/org/apache/rocketmq/streams/script/function/FunctionTest.java
+++ b/rocketmq-streams-script/src/test/java/org/apache/rocketmq/streams/script/function/FunctionTest.java
@@ -22,6 +22,7 @@ import java.util.Date;
 import java.util.List;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.context.Message;
+import org.apache.rocketmq.streams.common.utils.JsonableUtil;
 import org.apache.rocketmq.streams.script.ScriptComponent;
 import org.apache.rocketmq.streams.script.context.FunctionContext;
 import org.junit.Test;
@@ -57,6 +58,22 @@ public class FunctionTest {
         }
 
     }
+    @Test
+    public void testJSON(){
+        JSONObject jsonObject=new JSONObject();
+        JSONObject person=new JSONObject();
+        person.put("name","chris");
+        person.put("age",18);
+        jsonObject.put("persion",person);
+        JSONArray jsonArray=new JSONArray();
+        for(int i=0;i<3;i++){
+            JSONObject jsonObject1=new JSONObject();
+            jsonObject1.put("address","address"+i);
+            jsonArray.add(jsonObject1);
+        }
+        jsonObject.put("addresses",jsonArray);
+        System.out.println(JsonableUtil.formatJson(jsonObject));
+    }
 
     /**
      * 在导表时常用的语句
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
index 3820ff8..6e8cfe0 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
+import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
 import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
@@ -75,32 +76,38 @@ public abstract class WindowCache extends
      */
     protected transient ShuffleChannel shuffleChannel;
 
-    protected class ShuffleMsgCache extends AbstractMultiSplitMessageCache<Pair<ISplit, JSONObject>> {
+    protected class MessageFlushCallBack implements IMessageFlushCallBack<Pair<ISplit, JSONObject>> {
 
-        public ShuffleMsgCache() {
-            super(messages -> {
-                if (messages == null || messages.size() == 0) {
-                    return true;
-                }
-                ISplit split = messages.get(0).getLeft();
-                JSONObject jsonObject = messages.get(0).getRight();
-                JSONArray allMsgs = shuffleChannel.getMsgs(jsonObject);
-                for (int i = 1; i < messages.size(); i++) {
-                    Pair<ISplit, JSONObject> pair = messages.get(i);
-                    JSONObject msg = pair.getRight();
-                    JSONArray jsonArray = shuffleChannel.getMsgs(msg);
-                    if (jsonArray != null) {
-                        allMsgs.addAll(jsonArray);
-                    }
+        @Override
+        public boolean flushMessage(List<Pair<ISplit, JSONObject>> messages) {
+            if (messages == null || messages.size() == 0) {
+                return true;
+            }
+            ISplit split = messages.get(0).getLeft();
+            JSONObject jsonObject = messages.get(0).getRight();
+            JSONArray allMsgs = shuffleChannel.getMsgs(jsonObject);
+            for (int i = 1; i < messages.size(); i++) {
+                Pair<ISplit, JSONObject> pair = messages.get(i);
+                JSONObject msg = pair.getRight();
+                JSONArray jsonArray = shuffleChannel.getMsgs(msg);
+                if (jsonArray != null) {
+                    allMsgs.addAll(jsonArray);
                 }
-                JSONObject zipJsonObject = new JSONObject();
-                zipJsonObject.put(COMPRESSION_MSG_DATA, CompressUtil.gZip(jsonObject.toJSONString()));
-                zipJsonObject.put(IS_COMPRESSION_MSG, true);
-                shuffleChannel.getProducer().batchAdd(new Message(zipJsonObject), split);
-                shuffleChannel.getProducer().flush(split.getQueueId());
+            }
+            JSONObject zipJsonObject = new JSONObject();
+            zipJsonObject.put(COMPRESSION_MSG_DATA, CompressUtil.gZip(jsonObject.toJSONString()));
+            zipJsonObject.put(IS_COMPRESSION_MSG, true);
+            shuffleChannel.getProducer().batchAdd(new Message(zipJsonObject), split);
+            shuffleChannel.getProducer().flush(split.getQueueId());
 
-                return true;
-            });
+            return true;
+        }
+    }
+
+    protected class ShuffleMsgCache extends AbstractMultiSplitMessageCache<Pair<ISplit, JSONObject>> {
+
+        public ShuffleMsgCache() {
+            super(new WindowCache.MessageFlushCallBack());
         }
 
         @Override
@@ -162,7 +169,9 @@ public abstract class WindowCache extends
     @Override
     public void finishBatchMsg(BatchFinishMessage batchFinishMessage) {
         if (shuffleChannel != null && shuffleChannel.getProducer() != null) {
+            this.flush();
             shuffleChannel.getProducer().flush();
+            shuffleMsgCache.flush();
             for (ISplit split : shuffleChannel.getQueueList()) {
                 IMessage message = batchFinishMessage.getMsg().deepCopy();
                 message.getMessageBody().put(ORIGIN_QUEUE_ID, message.getHeader().getQueueId());
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueManager.java
index 78bdcd7..06e4ba6 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueManager.java
@@ -30,15 +30,15 @@ import org.apache.rocketmq.streams.window.sqlcache.SQLCache;
 public class WindowMaxValueManager implements IWindowMaxValueManager {
     protected AbstractWindow window;
     protected Map<String, WindowMaxValueProcessor> windowMaxValueProcessorMap = new HashMap<>();
-    protected transient ExecutorService executorService;
+//    protected transient ExecutorService executorService;
     protected transient SQLCache sqlCache;
 
     public WindowMaxValueManager(AbstractWindow window, SQLCache sqlCache) {
         this.window = window;
         this.sqlCache = sqlCache;
-        this.executorService = new ThreadPoolExecutor(10, 10,
-            0L, TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<Runnable>());
+//        this.executorService = new ThreadPoolExecutor(10, 10,
+//            0L, TimeUnit.MILLISECONDS,
+//            new LinkedBlockingQueue<Runnable>());
     }
 
     protected WindowMaxValueProcessor getOrCreate(String queueId) {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
index 1ab8b6f..bd03d37 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
@@ -47,16 +47,23 @@ public abstract class AbstractShuffleWindow extends AbstractWindow {
 
     @Override
     public AbstractContext<IMessage> doMessage(IMessage message, AbstractContext context) {
-        if (hasCreated.compareAndSet(false, true)) {
-            this.windowFireSource = new WindowTrigger(this);
-            this.windowFireSource.init();
-            this.windowFireSource.start(getFireReceiver());
-            this.shuffleChannel = new ShuffleChannel(this);
-            this.shuffleChannel.init();
-            windowCache.setBatchSize(5000);
-            windowCache.setShuffleChannel(shuffleChannel);
+        if (!hasCreated.get() || windowCache == null) {
+            synchronized (this) {
+                if (!hasCreated.get() || windowCache == null) {
+                    this.windowFireSource = new WindowTrigger(this);
+                    this.windowFireSource.init();
+                    this.windowFireSource.start(getFireReceiver());
+                    this.shuffleChannel = new ShuffleChannel(this);
+                    this.shuffleChannel.init();
+                    windowCache.setBatchSize(5000);
+                    windowCache.setShuffleChannel(shuffleChannel);
+                    shuffleChannel.startChannel();
+                    hasCreated.set(true);
+                }
+            }
+
         }
-        shuffleChannel.startChannel();
+
         return super.doMessage(message, context);
     }
 
@@ -65,8 +72,7 @@ public abstract class AbstractShuffleWindow extends AbstractWindow {
         Set<String> splitIds = new HashSet<>();
         splitIds.add(windowInstance.getSplitId());
         shuffleChannel.flush(splitIds);
-        int fireCount = fireWindowInstance(windowInstance, windowInstance.getSplitId(), queueId2Offset);
-        return fireCount;
+        return fireWindowInstance(windowInstance, windowInstance.getSplitId(), queueId2Offset);
     }
 
     /**
@@ -82,8 +88,7 @@ public abstract class AbstractShuffleWindow extends AbstractWindow {
      *
      * @param instance
      */
-    protected abstract int fireWindowInstance(WindowInstance instance, String queueId,
-        Map<String, String> queueId2Offset);
+    protected abstract int fireWindowInstance(WindowInstance instance, String queueId, Map<String, String> queueId2Offset);
 
     public abstract void clearCache(String queueId);
 }
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
index 72028db..9cc2019 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
@@ -320,8 +320,7 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
      */
 
     public long incrementAndGetSplitNumber(WindowInstance instance, String shuffleId) {
-        long maxValue = windowMaxValueManager.incrementAndGetSplitNumber(instance, shuffleId);
-        return maxValue;
+        return windowMaxValueManager.incrementAndGetSplitNumber(instance, shuffleId);
     }
 
     public abstract Class getWindowBaseValueClass();
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
index ff90cf7..ba5bf4c 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
@@ -20,7 +20,6 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -54,22 +53,20 @@ public abstract class AbstractSystemChannel implements IConfigurableIdentificati
     protected static final String CHANNEL_PROPERTY_KEY_PREFIX = "CHANNEL_PROPERTY_KEY_PREFIX";
     protected static final String CHANNEL_TYPE = "CHANNEL_TYPE";
 
-    protected ISource consumer;
+    protected ISource<?> consumer;
     protected AbstractSupportShuffleSink producer;
     protected Map<String, String> channelConfig = new HashMap<>();
-    ;
-    protected boolean hasCreateShuffleChannel = false;
+    protected volatile boolean hasCreateShuffleChannel = false;
 
     public void startChannel() {
         if (consumer == null) {
             return;
         }
-        final AbstractSystemChannel channel = this;
         consumer.start(this);
     }
 
     /**
-     * 如果用户未配置shuffle channel,根据pipline数据源动态创建
+     * 如果用户未配置shuffle channel,根据pipeline数据源动态创建
      *
      * @param pipeline
      */
@@ -77,23 +74,23 @@ public abstract class AbstractSystemChannel implements IConfigurableIdentificati
         if (!hasCreateShuffleChannel) {
             synchronized (this) {
                 if (!hasCreateShuffleChannel) {
-                    ISource piplineSource = pipeline.getSource();
+                    ISource<?> pipelineSource = pipeline.getSource();
                     ServiceLoaderComponent serviceLoaderComponent = ComponentCreator.getComponent(IChannelBuilder.class.getName(), ServiceLoaderComponent.class);
 
-                    IChannelBuilder builder = (IChannelBuilder) serviceLoaderComponent.loadService(piplineSource.getClass().getSimpleName());
+                    IChannelBuilder builder = (IChannelBuilder) serviceLoaderComponent.loadService(pipelineSource.getClass().getSimpleName());
                     if (builder == null) {
-                        throw new RuntimeException("can not create shuffle channel, not find channel builder " + piplineSource.toJson());
+                        throw new RuntimeException("can not create shuffle channel, not find channel builder " + pipelineSource.toJson());
                     }
                     if (!(builder instanceof IShuffleChannelBuilder)) {
-                        throw new RuntimeException("can not create shuffle channel, builder not impl IShuffleChannelBuilder " + piplineSource.toJson());
+                        throw new RuntimeException("can not create shuffle channel, builder not impl IShuffleChannelBuilder " + pipelineSource.toJson());
                     }
                     IShuffleChannelBuilder shuffleChannelBuilder = (IShuffleChannelBuilder) builder;
-                    ISink sink = shuffleChannelBuilder.createBySource(piplineSource);
+                    ISink<?> sink = shuffleChannelBuilder.createBySource(pipelineSource);
                     sink.init();
                     if (!(sink instanceof MemoryChannel) && !(sink instanceof AbstractSupportShuffleSink)) {
-                        throw new RuntimeException("can not create shuffle channel, sink not extends AbstractSupportShuffleSink " + piplineSource.toJson());
+                        throw new RuntimeException("can not create shuffle channel, sink not extends AbstractSupportShuffleSink " + pipelineSource.toJson());
                     }
-                    ISource source = null;
+                    ISource<?> source = null;
                     if (sink instanceof MemoryChannel) {
                         MemoryCache memoryCache = new MemoryCache();
                         memoryCache.setNameSpace(createShuffleChannelNameSpace(pipeline));
@@ -135,7 +132,7 @@ public abstract class AbstractSystemChannel implements IConfigurableIdentificati
                     shuffleSink.setHasInit(false);
                     shuffleSink.init();//在这里完成shuffle channel的创建
                     if (source == null) {
-                        source = shuffleChannelBuilder.copy(piplineSource);
+                        source = shuffleChannelBuilder.copy(pipelineSource);
                     }
 
                     //修改和window有关的属性,如groupname,tags
@@ -155,7 +152,7 @@ public abstract class AbstractSystemChannel implements IConfigurableIdentificati
                     if (shuffleTopic != null && topicFiledName != null) {
                         ReflectUtil.setBeanFieldValue(source, topicFiledName, shuffleTopic);
                     }
-                    if (AbstractSource.class.isInstance(source)) {
+                    if (source instanceof AbstractSource) {
                         AbstractSource abstractSource = (AbstractSource) source;
                         abstractSource.setHasInit(false);
                     }
@@ -206,14 +203,14 @@ public abstract class AbstractSystemChannel implements IConfigurableIdentificati
      *
      * @return
      */
-    protected ISource createSource(String namespace, String name) {
+    protected ISource<?> createSource(String namespace, String name) {
         IChannelBuilder builder = createBuilder();
         if (builder == null) {
             return null;
         }
         Properties properties = createChannelProperties(namespace);
-        ISource source = builder.createSource(namespace, name, properties, null);
-        if (MemorySource.class.isInstance(source)) {
+        ISource<?> source = builder.createSource(namespace, name, properties, null);
+        if (source instanceof MemorySource) {
             MemorySource memorySource = (MemorySource) source;
             MemoryCache memoryCache = new MemoryCache();
             memorySource.setMemoryCache(memoryCache);
@@ -236,13 +233,13 @@ public abstract class AbstractSystemChannel implements IConfigurableIdentificati
         }
         Properties properties = createChannelProperties(namespace);
 
-        ISink sink = builder.createSink(namespace, name, properties, null);
-        if (!AbstractSupportShuffleSink.class.isInstance(sink)) {
+        ISink<?> sink = builder.createSink(namespace, name, properties, null);
+        if (!(sink instanceof AbstractSupportShuffleSink)) {
             throw new RuntimeException("can not support shuffle " + sink.toJson());
         }
-        if (MemorySink.class.isInstance(sink)) {
+        if (sink instanceof MemorySink) {
             MemorySink memorySink = (MemorySink) sink;
-            if (!MemorySource.class.isInstance(this.consumer)) {
+            if (!(this.consumer instanceof MemorySource)) {
                 throw new RuntimeException("shuffle cosumer need memory, real is " + this.consumer);
             }
             MemorySource memorySource = (MemorySource) this.consumer;
@@ -251,7 +248,6 @@ public abstract class AbstractSystemChannel implements IConfigurableIdentificati
         }
 
         sink.init();
-
         return (AbstractSupportShuffleSink) sink;
     }
 
@@ -266,8 +262,7 @@ public abstract class AbstractSystemChannel implements IConfigurableIdentificati
             return null;
         }
         ServiceLoaderComponent serviceLoaderComponent = ComponentCreator.getComponent(IChannelBuilder.class.getName(), ServiceLoaderComponent.class);
-        IChannelBuilder builder = (IChannelBuilder) serviceLoaderComponent.loadService(type);
-        return builder;
+        return (IChannelBuilder) serviceLoaderComponent.loadService(type);
     }
 
     /**
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index f8b26f5..e5dcb6d 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -132,6 +132,7 @@ public class ShuffleChannel extends AbstractSystemChannel {
     public void init() {
         this.consumer = createSource(window.getNameSpace(), window.getConfigureName());
         this.producer = createSink(window.getNameSpace(), window.getConfigureName());
+
         if (this.consumer == null || this.producer == null) {
             autoCreateShuffleChannel(window.getFireReceiver().getPipeline());
         }
@@ -142,12 +143,12 @@ public class ShuffleChannel extends AbstractSystemChannel {
             ((AbstractSource) this.consumer).setJsonData(true);
         }
         if (producer != null && (queueList == null || queueList.size() == 0)) {
+            this.producer.init();
             queueList = producer.getSplitList();
             Map<String, ISplit<?, ?>> tmp = new ConcurrentHashMap<>();
             for (ISplit<?, ?> queue : queueList) {
                 tmp.put(queue.getQueueId(), queue);
             }
-
             this.queueMap = tmp;
         }
         isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/AbstractWindowStorage.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/AbstractWindowStorage.java
index b29a3b8..635f59e 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/AbstractWindowStorage.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/AbstractWindowStorage.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.streams.common.threadpool.ThreadPoolFactory;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.db.driver.batchloader.BatchRowLoader;
 import org.apache.rocketmq.streams.db.driver.batchloader.IRowOperator;
@@ -32,9 +33,12 @@ import org.apache.rocketmq.streams.window.state.WindowBaseValue;
 
 public abstract class AbstractWindowStorage<T extends WindowBaseValue> implements IWindowStorage<T> {
     protected boolean isLocalStorageOnly = false;
-    protected transient ExecutorService dataLoaderExecutor = new ThreadPoolExecutor(10, 10,
-        0L, TimeUnit.MILLISECONDS,
-        new LinkedBlockingQueue<Runnable>());
+//    protected transient ExecutorService dataLoaderExecutor = new ThreadPoolExecutor(10, 10,
+//        0L, TimeUnit.MILLISECONDS,
+//        new LinkedBlockingQueue<Runnable>(), new ThreadPoolFactory.DipperThreadFactory("AbstractWindowStorage-"));
+
+    protected transient ExecutorService dataLoaderExecutor = ThreadPoolFactory.createThreadPool(10, 10, 0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>(), "AbstractWindowStorage");
     ;
 
 
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/ShufflePartitionManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/ShufflePartitionManager.java
index 93a0b2a..aa71a81 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/ShufflePartitionManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/ShufflePartitionManager.java
@@ -27,12 +27,12 @@ public class ShufflePartitionManager {
     private static ShufflePartitionManager instance = new ShufflePartitionManager();
     protected Map<String, Boolean> splitId2AllWindowInstanceFinishInit = new HashMap<>();//split是否有效,这个分片下所有的window instacne都完成了初始化
     protected Map<String, Boolean> windowInstanceId2FinishInit = new HashMap<>();//window instance 是否完成初始化
-    private ExecutorService executorService;
+//    private ExecutorService executorService;
 
     private ShufflePartitionManager() {
-        executorService = new ThreadPoolExecutor(10, 10,
-            0L, TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<Runnable>());
+//        executorService = new ThreadPoolExecutor(10, 10,
+//            0L, TimeUnit.MILLISECONDS,
+//            new LinkedBlockingQueue<Runnable>());
 
     }
 
diff --git a/docs/stream_sink/README.md b/stream_sink.md
similarity index 99%
rename from docs/stream_sink/README.md
rename to stream_sink.md
index 820b0d0..d771886 100644
--- a/docs/stream_sink/README.md
+++ b/stream_sink.md
@@ -80,7 +80,7 @@
     DataStream dataStream=dataStream.toRocketmq(topic,tags,groupName,namesrvAddress);
 
 ```
-##kafka
+## kafka
 ```java
     String bootstrapServers = ......;//kafka的bootstrap server
     String topic = ......; //kafka的topic
diff --git a/docs/stream_source/README.md b/stream_source.md
similarity index 99%
rename from docs/stream_source/README.md
rename to stream_source.md
index 51b794d..8ae7b7e 100644
--- a/docs/stream_source/README.md
+++ b/stream_source.md
@@ -59,7 +59,7 @@
 
 ```
 
-##kafka
+## kafka
 ```java
     String bootstrapServers = ......;//kafka的bootstrap server
     String topic = ......; //kafka的topic
diff --git a/docs/stream_transform/README.md b/stream_transform.md
similarity index 100%
rename from docs/stream_transform/README.md
rename to stream_transform.md