You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/06/21 03:55:01 UTC

[rocketmq-streams] 02/16: merge 0.1

This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch snapshot-1.0.4
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit f5a1380ffeb6fa18920f59edf54b39d996caf77b
Merge: d4243be9 1cd2dd02
Author: 维章 <un...@gmail.com>
AuthorDate: Mon May 23 11:53:01 2022 +0800

    merge 0.1

 NOTICE                                             |   2 +-
 README.md                                          |  16 +-
 SUMMARY.md                                         |   7 +
 docs/README.md                                     | 142 ------
 docs/SUMMARY.md                                    |   8 -
 ...225\264\344\275\223\346\236\266\346\236\204.md" |  33 --
 .../2.\346\236\204\345\273\272DataStream.md"       |  73 ----
 .../3.\345\220\257\345\212\250DataStream.md"       |  53 ---
 ...265\201\350\275\254\350\277\207\347\250\213.md" |  63 ---
 ...256\227\345\255\220\350\247\243\346\236\220.md" |  55 ---
 ...256\236\347\216\260\345\256\271\351\224\231.md" |   0
 "docs/images/Pipeline\347\261\273\345\233\276.png" | Bin 44207 -> 0 bytes
 docs/images/img.png                                | Bin 0 -> 38684 bytes
 docs/images/img_1.png                              | Bin 0 -> 43711 bytes
 docs/images/img_2.png                              | Bin 0 -> 103151 bytes
 docs/images/window.png                             | Bin 241692 -> 0 bytes
 ...75\223\346\236\266\346\236\204\345\233\276.png" | Bin 60493 -> 0 bytes
 ...00\273\344\275\223\350\277\207\347\250\213.png" | Bin 44252 -> 0 bytes
 .../\346\211\251\345\256\271\345\211\215.png"      | Bin 56733 -> 0 bytes
 ...12\266\346\200\201\347\256\227\345\255\220.png" | Bin 35766 -> 0 bytes
 "docs/images/\347\212\266\346\200\201.png"         | Bin 47527 -> 0 bytes
 "docs/images/\347\274\251\345\256\271.png"         | Bin 51087 -> 0 bytes
 docs/quick_start/README.md                         |  46 --
 pom.xml                                            | 124 ++----
 quick_start.md                                     |  92 ++--
 .../pom.xml                                        |  43 +-
 rocketmq-streams-cep/src/test/resources/log4j.xml  |  36 ++
 rocketmq-streams-channel-db/pom.xml                |   6 +-
 .../streams/db/sink/AbstractMultiTableSink.java    |  12 +-
 .../streams/db/sink/DynamicMultipleDBSink.java     |  11 +-
 .../streams/db/sink/SelfMultiTableSink.java        |   2 +-
 .../streams/db/sink/SplitBySerialNumber.java       |   2 +-
 .../streams/db/sink/SplitByTimeMultiTableSink.java |   2 +-
 rocketmq-streams-channel-es/pom.xml                |  29 +-
 .../{ESSinkBuilder.java => ESChannelBuilder.java}  |  54 +--
 .../rocketmq/streams/es/sink/ESSinkBuilder.java    |   1 +
 .../streams/es/sink/ESSinkOnlyChannel.java         |  43 +-
 .../apache/rocketmq/streams/es/sink/EsClient.java  | 135 ++++++
 rocketmq-streams-channel-http/pom.xml              |   6 +-
 rocketmq-streams-channel-kafka/pom.xml             |  32 ++
 .../streams/kafka/KafkaChannelBuilder.java         |  52 ++-
 .../apache/rocketmq/streams/kafka/KafkaSplit.java  |  55 +++
 .../rocketmq/streams/kafka/sink/KafkaSink.java     | 200 +++++++++
 .../rocketmq/streams/kafka/source/KafkaSource.java | 238 ++++++++++
 .../rocketmq/streams/kafka/KafkaChannelTest.java   | 104 +++++
 .../src/test/resources/log4j.xml                   |  20 +
 rocketmq-streams-channel-mqtt/pom.xml              |  24 +-
 .../rocketmq/streams/mqtt/source/PahoSource.java   |  41 +-
 rocketmq-streams-channel-rocketmq/pom.xml          |  30 +-
 .../apache/rocketmq/streams/debug/DebugWriter.java |  92 ++--
 .../apache/rocketmq/streams/sink/RocketMQSink.java |  42 +-
 .../rocketmq/streams/source/RocketMQSource.java    |  14 +-
 .../rocketmq/streams/RocketMQChannelTest.java      |   2 +-
 rocketmq-streams-channel-syslog/pom.xml            |  21 +-
 .../rocketmq/streams/syslog/SyslogChannel.java     |  35 +-
 .../streams/syslog/SyslogChannelBuilder.java       |   4 +
 .../streams/syslog/SyslogChannelManager.java       |   6 +-
 .../rocketmq/streams/syslog/SyslogServer.java      |  35 +-
 .../rocketmq/streams/syslog/SyslogClient.java      |  22 +-
 rocketmq-streams-checkpoint/pom.xml                |  64 ---
 .../streams/checkpoint/db/DBCheckPointStorage.java |  68 ---
 rocketmq-streams-clients/pom.xml                   |  23 +-
 .../streams/client/source/DataStreamSource.java    |  20 +
 .../streams/client/transform/DataStream.java       |  43 +-
 .../streams/client/transform/JoinStream.java       |   5 +-
 .../streams/client/transform/SplitStream.java      |  24 +-
 .../streams/client/transform/WindowStream.java     |  50 ++-
 .../rocketmq/streams/client/ApplicationTest.java   |  57 +++
 .../rocketmq/streams/client/MqttSourceExample.java |  80 ++++
 .../{sink/UserDefinedSink.java => ScriptTest.java} |  27 +-
 .../apache/rocketmq/streams/client/WindowTest.java |  14 +-
 .../rocketmq/streams/client/example/JoinTest.java  |   7 +-
 .../rocketmq/streams/client/example/SplitTest.java |  31 +-
 .../streams/client/sink/UserDefinedSink.java       |   2 +-
 .../client/sink/UserDefinedSupportShuffleSink.java |   4 +-
 rocketmq-streams-commons/pom.xml                   |  48 +-
 .../MappedByteBufferTableWithPrimaryIndex.java     | 482 +++++++++++++++++++++
 .../streams/common/cache/compress/KVAddress.java   |  48 +-
 .../streams/common/channel/AbstractChannel.java    |  14 +-
 .../AbstractSupportShuffleChannelBuilder.java      |   2 +-
 .../common/channel/builder/IChannelBuilder.java    |  25 +-
 .../channel/builder/IShuffleChannelBuilder.java    |   4 +-
 .../common/channel/impl/CollectionSink.java        |  39 +-
 .../common/channel/impl/CollectionSinkBuilder.java |  34 +-
 .../common/channel/impl/PrintChannelBuilder.java   |  30 +-
 .../channel/impl/file/FileChannelBuilder.java      |  45 +-
 .../streams/common/channel/impl/file/FileSink.java |   4 +-
 .../common/channel/impl/file/FileSource.java       |   2 +-
 .../channel/impl/memory/MemoryChannelBuilder.java  |  64 +++
 .../common/channel/impl/memory/MemorySink.java     |   4 +-
 .../channel/impl/view/ViewChannelBuilder.java      |  43 +-
 .../streams/common/channel/impl/view/ViewSink.java |  31 +-
 .../common/channel/impl/view/ViewSource.java       |  66 +++
 .../streams/common/channel/sink/AbstractSink.java  |  17 +-
 .../channel/sink/AbstractSupportShuffleSink.java   |   8 +-
 .../sink/AbstractSupportShuffleUDFSink.java        |   6 +-
 .../common/channel/sink/AbstractUDFSink.java       |  23 +-
 .../streams/common/channel/sink/ISink.java         |   9 +-
 .../impl/AbstractMultiSplitMessageCache.java       |  22 +-
 .../common/channel/source/AbstractSource.java      |  94 ++--
 .../channel/source/AbstractUnreliableSource.java   |   2 +-
 .../streams/common/channel/split/ISplit.java       |   5 +-
 .../streams/common/component/ComponentCreator.java |   4 +
 .../common/configurable/AbstractConfigurable.java  |  17 +-
 .../common/configurable/BasedConfigurable.java     |  13 +-
 .../common/configurable/IConfigurableService.java  |   5 +-
 .../streams/common/configure/ConfigureFileKey.java |   7 +-
 .../streams/common/context/AbstractContext.java    |  33 +-
 .../streams/common/context/MessageHeader.java      |  89 ++--
 .../streams/common/datatype/IntDataType.java       |   3 +-
 .../streams/common/datatype/ShortDataType.java     |   2 +-
 .../streams/common/interfaces/ISerialize.java      |   8 +-
 .../rocketmq/streams/common/model/NameCreator.java |  14 +-
 .../streams/common/model/NameCreatorContext.java   |  31 +-
 .../common/monitor/ConsoleMonitorManager.java      | 412 ++++++++++++++++++
 .../streams/common/monitor/DataSyncConstants.java  |  54 +++
 .../streams/common/monitor/HttpClient.java         | 116 +++++
 .../rocketmq/streams/common/monitor/HttpUtil.java  | 248 +++++++++++
 .../monitor/MonitorDataSyncServiceFactory.java     |  61 +++
 .../common/monitor/group/MonitorCommander.java     |   4 +-
 .../streams/common/monitor/impl/DipperMonitor.java |   2 +-
 .../streams/common/monitor/model/JobStage.java     | 350 +++++++++++++++
 .../streams/common/monitor/model/TraceIdsDO.java   | 126 ++++++
 .../common/monitor/model/TraceMonitorDO.java       | 250 +++++++++++
 .../monitor/service/MonitorDataSyncService.java    |  23 +-
 .../service/impl/DBMonitorDataSyncImpl.java        |  63 +++
 .../service/impl/HttpMonitorDataSyncImpl.java      | 151 +++++++
 .../service/impl/RocketMQMonitorDataSyncImpl.java  | 185 ++++++++
 .../optimization/IHomologousOptimization.java      |   2 +-
 .../common/optimization/MessageGlobleTrace.java    |  16 +-
 .../streams/common/optimization/Re2Engine.java     |  47 +-
 .../common/optimization/TaskOptimization.java      |  17 +-
 .../optimization/fingerprint/FingerprintCache.java |   4 +-
 .../optimization/fingerprint/PreFingerprint.java   |  54 ++-
 .../streams/common/schedule/ScheduleTask.java      |   4 +-
 .../common/threadpool/ThreadPoolFactory.java       |  34 +-
 .../AbstractMutilPipelineChainPipline.java         |  81 ++--
 .../streams/common/topology/ChainPipeline.java     | 255 ++++++-----
 .../streams/common/topology/ChainStage.java        |  32 +-
 .../common/topology/builder/PipelineBuilder.java   | 139 ++++--
 .../common/topology/metric/NotFireReason.java      | 176 ++++++++
 .../streams/common/topology/metric/StageGroup.java | 248 +++++++++++
 .../common/topology/metric/StageMetric.java        | 138 ++++++
 .../common/topology/model/AbstractStage.java       | 177 +++++---
 .../streams/common/topology/model/IWindow.java     |   3 +-
 .../streams/common/topology/model/Pipeline.java    |  56 ++-
 .../topology/model/PipelineSourceJoiner.java       |  48 --
 .../topology/shuffle/IShuffleKeyGenerator.java     |  16 +-
 .../common/topology/shuffle/ShuffleMQCreator.java  | 401 ++++++++++-------
 .../topology/stages/AbstractWindowStage.java       |   5 +-
 .../stages/EmptyChainStage.java}                   |  27 +-
 .../common/topology/stages/FilterChainStage.java   | 157 ++-----
 .../common/topology/stages/JoinChainStage.java     |   3 +-
 .../common/topology/stages/JoinEndChainStage.java  |  11 +-
 .../topology/stages/JoinStartChainStage.java       |  67 +++
 .../common/topology/stages/OutputChainStage.java   |  81 ++--
 .../topology/stages/ShuffleConsumerChainStage.java | 193 +++++++++
 .../topology/stages/ShuffleProducerChainStage.java | 345 +++++++++++++++
 .../topology/stages/SubPiplineChainStage.java      | 138 ------
 .../common/topology/stages/UnionChainStage.java    |   3 +-
 .../common/topology/stages/UnionEndChainStage.java |  10 +-
 ...onChainStage.java => UnionStartChainStage.java} |  43 +-
 .../ViewChainStage.java}                           | 479 ++++++++++----------
 .../common/topology/stages/udf/UDFChainStage.java  |  27 +-
 .../streams/common/topology/task/StreamsTask.java  | 444 +++----------------
 .../streams/common/topology/task/TaskAssigner.java |  20 +-
 .../streams/common/utils/ConfigurableUtil.java     |   6 +-
 .../streams/common/utils/ContantsUtil.java         |  33 +-
 .../streams/common/utils/DataTypeUtil.java         |  16 +-
 .../rocketmq/streams/common/utils/FileUtil.java    |  89 +++-
 .../streams/common/utils/InstantiationUtil.java    |  48 +-
 .../streams/common/utils/JsonableUtil.java         |   5 +
 .../rocketmq/streams/common/utils/KryoUtil.java    | 214 +++++++++
 .../streams/common/utils/NameCreatorUtil.java      |  60 ---
 .../streams/common/utils/PipelineHTMLUtil.java     | 299 +++++++++++++
 .../streams/common/utils/PropertiesUtils.java      |   2 +-
 .../rocketmq/streams/common/utils/ReflectUtil.java |  77 ++--
 .../streams/common/utils/SerializeUtil.java        |  23 +-
 .../streams/common/utils/ServiceLoadUtil.java      |  63 +++
 .../rocketmq/streams/common/utils/TraceUtil.java   |  23 +-
 .../rocketmq/streams/common/channel/SinkTest.java  |   4 +-
 rocketmq-streams-configurable/pom.xml              |  11 +-
 .../configurable/ConfigurableComponent.java        |  17 +-
 .../service/AbstractConfigurableService.java       |  98 ++---
 .../service/impl}/FileConfigureService.java        |   2 +-
 .../impl}/FileSupportParentConfigureService.java   |   2 +-
 .../service/impl/HttpConfigureService.java         | 377 ++++++++++++++++
 .../impl/HttpSupportParentConfigureService.java    |  20 +-
 .../service/impl}/MemoryConfigureService.java      |   2 +-
 .../impl}/MemorySupportParentConfigureService.java |   2 +-
 rocketmq-streams-connectors/pom.xml                |  40 --
 .../connectors/balance/AbstractBalance.java        | 207 ---------
 .../streams/connectors/balance/ISourceBalance.java |  60 ---
 .../streams/connectors/balance/SplitChanged.java   |  55 ---
 .../connectors/balance/impl/LeaseBalanceImpl.java  | 144 ------
 .../streams/connectors/model/PullMessage.java      |  50 ---
 .../streams/connectors/model/ReaderStatus.java     | 120 -----
 .../streams/connectors/reader/DBScanReader.java    | 269 ------------
 .../streams/connectors/reader/ISplitReader.java    |  96 ----
 .../connectors/reader/SplitCloseFuture.java        |  83 ----
 .../connectors/source/AbstractPullSource.java      | 272 ------------
 .../source/CycleDynamicMultipleDBScanSource.java   | 213 ---------
 .../source/DynamicMultipleDBScanSource.java        | 190 --------
 .../streams/connectors/source/IPullSource.java     |  60 ---
 .../source/filter/BoundedPatternFilter.java        |  53 ---
 .../source/filter/CyclePatternFilter.java          | 173 --------
 .../connectors/source/filter/CyclePeriod.java      | 222 ----------
 .../connectors/source/filter/CycleSchedule.java    | 236 ----------
 .../source/filter/DataFormatPatternFilter.java     | 106 -----
 .../connectors/source/filter/PatternFilter.java    |  41 --
 rocketmq-streams-db-operator/pom.xml               |   6 +-
 .../streams/db/configuable/DBConfigureService.java |  11 +-
 rocketmq-streams-dbinit/pom.xml                    |   6 +-
 .../dbinit/mysql/delegate/DBDelegateFactory.java   |   5 +
 .../dbinit/mysql/delegate/MysqlDelegate.java       |   1 +
 .../src/main/resources/tables_mysql_innodb.sql     |  16 -
 rocketmq-streams-dim/pom.xml                       |   6 +-
 .../intelligence/AbstractIntelligenceCache.java    |   5 +-
 rocketmq-streams-examples/pom.xml                  |   6 +-
 .../checkpoint/RemoteCheckpointExample.java        |   4 +-
 .../mutilconsumer/MultiStreamsExample.java         |   4 +-
 .../streams/examples/source/FileSourceExample.java |   2 +-
 .../examples/source/RocketmqSourceExample4.java    |  63 ---
 .../src/main/resources/joinData-1.txt              |   4 -
 .../src/main/resources/joinData-2.txt              |   4 -
 rocketmq-streams-filter/pom.xml                    |   5 +-
 .../streams/filter/builder/ExpressionBuilder.java  |  49 +--
 .../streams/filter/context/RuleContext.java        |  31 +-
 .../filter/engine/impl/DefaultRuleEngine.java      |  29 +-
 .../function/expression/CompareFunction.java       |  16 +-
 .../rocketmq/streams/filter/operator/Rule.java     |  58 +++
 .../expression/ExpressionRelationParser.java       |   9 +-
 .../expression/ExpressionRelationPaser.java        | 107 -----
 .../operator/expression/GroupExpression.java       |   3 +-
 .../operator/expression/RelationExpression.java    |  35 +-
 .../PiplineLogFingerprintAnalysis.java             |   6 +-
 .../dependency/BlinkRuleV2Expression.java          |   5 +-
 .../optimization/dependency/DependencyTree.java    |  22 +-
 .../dependency/SimplePipelineTree.java             |   3 +-
 .../dependency/StateLessDependencyTree.java        |  84 ++++
 .../optimization/homologous/HomologousCompute.java |   5 +-
 .../homologous/HomologousOptimization.java         |  13 +-
 rocketmq-streams-lease/pom.xml                     |   6 +-
 rocketmq-streams-runner/assembly/distribution.xml  |  69 ---
 rocketmq-streams-runner/assembly/standalone.xml    |  72 ---
 rocketmq-streams-runner/bin/start.sh               |  58 ---
 rocketmq-streams-runner/bin/stop.sh                |  33 --
 rocketmq-streams-runner/pom.xml                    |  80 ----
 .../src/main/resources/log4j.xml                   |  51 ---
 rocketmq-streams-schedule/pom.xml                  |   6 +-
 .../schedule/job/ConfigurableExecutorJob.java      |  30 +-
 rocketmq-streams-script/pom.xml                    |   6 +-
 .../function/aggregation/LastValueAccumulator.java |  67 +++
 .../function/impl/distinct/DistinctFunction.java   |   1 -
 .../function/impl/json/JsonCreatorFunction.java    |   4 +-
 .../function/impl/json/UDTFFieldNameFunction.java  |  50 +++
 .../script/function/impl/parser/GrokFunction.java  |   4 +-
 .../function/impl/parser/Paser2JsonFunction.java   |  17 +-
 .../function/impl/parser/PaserBySplitFunction.java |  44 +-
 .../function/impl/parser/RegexParserFunction.java  |  24 +-
 .../script/function/impl/router/RouteFunction.java |   4 +-
 .../script/operator/impl/AggregationScript.java    |  19 +-
 .../streams/script/service/IAccumulator.java       |   4 +-
 .../script/service/udf/SimpleUDAFScript.java       |  29 +-
 .../streams/script/service/udf/UDFScript.java      |  26 +-
 .../streams/script/function/FunctionTest.java      |  19 +-
 rocketmq-streams-serviceloader/pom.xml             |   6 +-
 rocketmq-streams-state/pom.xml                     |   6 +-
 .../streams/state/kv/rocksdb/RocksDBOperator.java  |  37 +-
 rocketmq-streams-transport-minio/pom.xml           |   6 +-
 rocketmq-streams-window/pom.xml                    |   7 +-
 .../window/minibatch/MiniBatchMsgCache.java        |  76 ++++
 .../window/minibatch/ShuffleMessageCache.java      | 187 ++++++++
 .../rocketmq/streams/window/model/WindowCache.java | 182 +++-----
 .../streams/window/model/WindowInstance.java       |  19 +-
 .../window/offset/WindowMaxValueManager.java       |   8 +-
 .../window/operator/AbstractShuffleWindow.java     |  36 +-
 .../streams/window/operator/AbstractWindow.java    |  74 +++-
 .../window/operator/impl/SessionOperator.java      |   3 +-
 .../window/operator/impl/WindowOperator.java       |  25 +-
 .../streams/window/operator/join/JoinWindow.java   |  39 +-
 .../window/shuffle/AbstractSystemChannel.java      |  97 +++--
 .../streams/window/shuffle/ShuffleCache.java       |  13 +-
 .../streams/window/shuffle/ShuffleChannel.java     |  74 ++--
 .../streams/window/state/impl/WindowValue.java     |  54 ++-
 .../window/storage/AbstractWindowStorage.java      |  10 +-
 .../window/storage/ShufflePartitionManager.java    |   8 +-
 .../window/storage/rocksdb/RocksdbStorage.java     |   7 +-
 .../streams/window/trigger/WindowTrigger.java      |   2 +-
 .../rocketmq/streams/window/util/ShuffleUtil.java  |  62 +++
 .../org/apache/rocketmq/streams/RocksdbTest.java   |   2 +-
 docs/stream_sink/README.md => stream_sink.md       |  19 +-
 docs/stream_source/README.md => stream_source.md   |  30 +-
 .../README.md => stream_transform.md               |   0
 294 files changed, 9733 insertions(+), 7012 deletions(-)

diff --cc README.md
index f2f475e7,e4205699..51d9a6cf
--- a/README.md
+++ b/README.md
@@@ -1,123 -1,6 +1,123 @@@
- # RocketMQ Streams 
- [![Build Status](https://app.travis-ci.com/apache/rocketmq-streams.svg?branch=main)](https://app.travis-ci.com/apache/rocketmq-streams)
- [![CodeCov](https://codecov.io/gh/apache/rocketmq-stream/branch/main/graph/badge.svg)](https://app.codecov.io/gh/apache/rocketmq-streams) 
- [![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.com/apache/rocketmq-streams/releases)
- [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
- [![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
- [![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
- [![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
+ # Summary
+ 
 +
 +## [中文文档](./README-chinese.md)
 +
 +## [Quick Start](./quick_start.md)
 +
 +## Features
 +
 +* Lightweight deployment: RocketMQ Streams can be deployed separately or in cluster mode.
 +* Various types of data input and output: source supports [RocketMQ](https://github.com/apache/rocketmq) while sink supports databases and RocketMQ, etc.
 +
 +## DataStream Example
 +
 +```java
 +import org.apache.rocketmq.streams.client.transform.DataStream;
 +
 +DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
 +    source
 +    .fromFile("~/admin/data/text.txt",false)
 +    .map(message->message)
 +    .toPrint(1)
 +    .start();
 +```
 +
 +## Maven Repository
 +
 +```xml
 +
 +<dependency>
 +    <groupId>org.apache.rocketmq</groupId>
 +    <artifactId>rocketmq-streams-clients</artifactId>
 +    <version>1.0.0-SNAPSHOT</version>
 +</dependency>
 +```
 +
 +# Core API
 +
 +RocketMQ Streams implements a series of advanced APIs, allowing users to write stream computing programs conveniently and achieve their own business requirements.
 +
 +## StreamBuilder
 +
 +StreamBuilder is used to build the source of stream tasks. It contains two methods: ```dataStream()``` and ```tableStream()```, which return two sources, DataStreamSource and TableStreamSource, respectively.
 +
 ++ [dataStream(nameSpaceName,pipelineName)]() returns an instance of DataStreamSource, used for segmented programming to achieve stream computing tasks.
 ++ [tableStream(nameSpaceName,pipelineName)]() returns an instance of TableStreamSource, used for script programming to achieve stream computing tasks.
 +
 +## DataStream API
 +
 +### Source
 +
 +DataStreamSource is a source class of segmented programming, used to interface with various data sources and obtain data from major message queues.
 +
 ++ ```fromFile```: reads data from the file. This method contains two parameters:
 +    + ```filePath```: specifies which file path to read. Required.
 +    + ```isJsonData```: specifies whether data is in JSON format. Optional. Default value: ```true```.
 +    + ```tags```: the tags for filtering messages used by the RocketMQ consumer. Optional.
 +
 +
 ++ ```fromRocketmq```: obtains data from RocketMQ, including four parameters:
 +    + ```topic```:  the topic name of RocketMQ. Required.
 +    + ```groupName```: the name of the consumer group. Required.
 +    + ```isJson```: specifies whether data is in JSON format. Optional.
 +    + ```tags```: the tags for filtering messages used by the RocketMQ consumer. Optional.
 +
 ++ ```from```: custom data source. You can specify your own data source by implementing ISource interface.
 +
 +### transform
 +
 +transform allows the input source data to be modified during the stream calculation process for the next step; DataStream API includes ```DataStream```, ```JoinStream```, ```SplitStream```, ```WindowStream```, and many other transform classes.
 +
 +#### DataStream
 +
 +DataStream implements a series of common stream calculation operators as follows:
 +
 ++ ```map```: returns a new DataStream by passing each record of the source to the **func** function.
 ++ ```flatmap```: similar to map. One input item corresponds to 0 or more output items.
 ++ ```filter```: returns a new DataStream based on the record of the source DataStream only when the ** func** function returns **true**.
 ++ ```forEach```: executes the **func** function once for each record and returns a new DataStream.
 ++ ```selectFields```: returns the corresponding field value for each record, and returns a new DataStream.
 ++ ```operate```: executes a custom function for each record and returns a new DataStream.
 ++ ```script```: executes a script for each recorded field, returns new fields, and generates a new DataStream.
 ++ ```toPrint```: prints the result on the console and generates a new DataStreamAction instance.
 ++ ```toFile```: saves the result as a file and generates a new DataStreamAction instance.
 ++ ```toDB```: saves the result to the database.
 ++ ```toRocketmq```: outputs the result to RocketMQ.
 ++ ```to```: outputs the result to the specified storage through the custom ISink interface.
 ++ ```window```: performs relevant statistical analysis in the window, generally used in conjunction with ```groupBy```. ```window()``` is used to define the size of the window, and ```groupBy( )``` used to define the main key of statistical analysis. You can specify multiple main keys:
 +    + ```count```: counts in the window.
 +    + ```min```: gets the minimum of the statistical value in the window.
 +    + ```max```: gets the maximum of the statistical value in the window.
 +    + ```avg```: gets the average of the statistical values in the window.
 +    + ```sum```: gets the sum of the statistical values in the window.
 +    + ```reduce```: performs custom summary calculations in the window.
 ++ ```join```: associates the two streams or one stream and one physical table according to the conditions and merges them into a large stream for related calculations.
 +    + ```dimJoin```  associate a stream with a physical table which can be a file or a db table, and all matching records are retained
 +    + ```dimLeftJoin```  After a flow is associated with a physical table, all data of the flow is reserved and fields that do not match the physical table are left blank
 +    + ```join```
 +    + ```leftJoin```
 ++ ```union```: merges the two streams.
 ++ ```split```: splits a data stream into different data streams according to tags for downstream analysis and calculation.
 ++ ```with```: specifies related strategies during the calculation, including Checkpoint and state storage strategies, etc.
 +
 +# Strategy
 +
 +The Strategy mechanism is mainly used to control the underlying logic during the operation of the computing engine, such as the storage methods of Checkpoint and state etc. Subsequent controls for windows, dual-stream joins, and so on will be added. All control strategies are transmitted through the ```with``` operator. Multiple policy types can be transmitted at the same time.
 +
 +```java
 +//Specify the storage strategy for Checkpoint.
 +source
 +    .fromRocketmq("TSG_META_INFO","")
 +    .map(message->message+"--")
 +    .toPrint(1)
 +    .with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
 +    .start();
 +```
 +
++=======
+ * [Quick Start](quick\_start.md)
+ * [创建实时任务数据源](stream\_source.md)
+ * [创建实时任务数据输出](stream\_sink.md)
+ * [数据处理逻辑](stream\_transform.md)
++>>>>>>> 1cd2dd0291dbcab033e6773021ddca13ce819f82
diff --cc pom.xml
index 5eac49b1,b2792adc..9da26dcd
--- a/pom.xml
+++ b/pom.xml
@@@ -100,12 -52,13 +52,11 @@@
          <module>rocketmq-streams-channel-http</module>
          <module>rocketmq-streams-state</module>
          <module>rocketmq-streams-examples</module>
--        <module>rocketmq-streams-checkpoint</module>
--        <module>rocketmq-streams-connectors</module>
          <module>rocketmq-streams-channel-syslog</module>
          <module>rocketmq-streams-channel-es</module>
-         <module>rocketmq-streams-runner</module>
+         <module>rocketmq-streams-channel-kafka</module>
          <module>rocketmq-streams-channel-mqtt</module>
+         <module>rocketmq-streams-cep</module>
      </modules>
  
      <properties>
@@@ -119,13 -72,12 +70,13 @@@
          <java.version>1.8</java.version>
          <java.encoding>UTF-8</java.encoding>
          <project.build.sourceEncoding>${java.encoding}</project.build.sourceEncoding>
-         <log4j.version>1.2.17</log4j.version>
          <commons-logging.version>1.1</commons-logging.version>
-         <spring.version>3.2.13.RELEASE</spring.version>
+         <spring.version>5.1.14.RELEASE</spring.version>
          <auto-service.version>1.0-rc5</auto-service.version>
          <mysql-connector.version>5.1.40</mysql-connector.version>
 -        <fastjson.version>1.2.25</fastjson.version>
 -        <quartz.version>2.2.1</quartz.version>
 +        <fastjson.version>1.2.9</fastjson.version>
 +        <quartz.version>2.3.2</quartz.version>
++
          <httpclient.version>4.5.13</httpclient.version>
          <commons-io.version>2.7</commons-io.version>
          <junit.version>4.13.1</junit.version>
diff --cc rocketmq-streams-channel-es/pom.xml
index 15723b3c,9e872a51..fe50aeef
--- a/rocketmq-streams-channel-es/pom.xml
+++ b/rocketmq-streams-channel-es/pom.xml
@@@ -20,8 -7,8 +7,8 @@@
      <parent>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-streams</artifactId>
-         <version>1.0.2-preview-SNAPSHOT</version>
-     </parent>
+         <version>1.0.2-SNAPSHOT</version>
 -    </parent>
++     </parent>
      <artifactId>rocketmq-streams-channel-es</artifactId>
      <name>ROCKETMQ STREAMS :: channel-es</name>
      <packaging>jar</packaging>
diff --cc rocketmq-streams-channel-mqtt/pom.xml
index 1ebfd008,abf10b72..0141989c
--- a/rocketmq-streams-channel-mqtt/pom.xml
+++ b/rocketmq-streams-channel-mqtt/pom.xml
@@@ -19,8 -5,8 +5,8 @@@
      <parent>
          <artifactId>rocketmq-streams</artifactId>
          <groupId>org.apache.rocketmq</groupId>
-         <version>1.0.2-preview-SNAPSHOT</version>
-     </parent>
+         <version>1.0.2-SNAPSHOT</version>
 -    </parent>
++     </parent>
      <modelVersion>4.0.0</modelVersion>
  
      <artifactId>rocketmq-streams-channel-mqtt</artifactId>
diff --cc rocketmq-streams-checkpoint/pom.xml
index 6da59e3a,312de599..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-checkpoint/pom.xml
+++ /dev/null
@@@ -1,64 -1,66 +1,0 @@@
--<?xml version="1.0" encoding="UTF-8"?>
--<!--
--  Licensed to the Apache Software Foundation (ASF) under one or more
--  contributor license agreements.  See the NOTICE file distributed with
--  this work for additional information regarding copyright ownership.
--  The ASF licenses this file to You under the Apache License, Version 2.0
--  (the "License"); you may not use this file except in compliance with
--  the License.  You may obtain a copy of the License at
--
--      http://www.apache.org/licenses/LICENSE-2.0
--
--  Unless required by applicable law or agreed to in writing, software
--  distributed under the License is distributed on an "AS IS" BASIS,
--  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--  See the License for the specific language governing permissions and
--  limitations under the License.
--  -->
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 -<project xmlns="http://maven.apache.org/POM/4.0.0"
 -         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 -         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
--    <parent>
--        <artifactId>rocketmq-streams</artifactId>
--        <groupId>org.apache.rocketmq</groupId>
-         <version>1.0.2-preview-SNAPSHOT</version>
 -        <version>1.0.2-SNAPSHOT</version>
--    </parent>
--    <modelVersion>4.0.0</modelVersion>
--
--    <artifactId>rocketmq-streams-checkpoint</artifactId>
--    <name>ROCKETMQ STREAMS :: checkpoint</name>
--    <packaging>jar</packaging>
--
--    <properties>
--        <maven.compiler.source>8</maven.compiler.source>
--        <maven.compiler.target>8</maven.compiler.target>
--    </properties>
--    <dependencies>
--        <dependency>
--            <groupId>org.apache.rocketmq</groupId>
--            <artifactId>rocketmq-streams-commons</artifactId>
--            <exclusions>
--                <exclusion>
--                    <groupId>com.google.auto.service</groupId>
--                    <artifactId>auto-service</artifactId>
--                </exclusion>
--            </exclusions>
--        </dependency>
--
--        <dependency>
--            <groupId>org.apache.rocketmq</groupId>
--            <artifactId>rocketmq-streams-db-operator</artifactId>
--            <exclusions>
--                <exclusion>
--                    <groupId>com.google.auto.service</groupId>
--                    <artifactId>auto-service</artifactId>
--                </exclusion>
--            </exclusions>
--        </dependency>
--
--        <dependency>
--            <groupId>com.google.auto.service</groupId>
--            <artifactId>auto-service</artifactId>
--            <optional>true</optional>
--        </dependency>
--    </dependencies>
--
--</project>
diff --cc rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
index 69150acf,69150acf..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
+++ /dev/null
@@@ -1,68 -1,68 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.checkpoint.db;
--
--import java.util.List;
--import org.apache.commons.logging.Log;
--import org.apache.commons.logging.LogFactory;
--import org.apache.rocketmq.streams.common.channel.source.ISource;
--import org.apache.rocketmq.streams.common.checkpoint.AbstractCheckPointStorage;
--import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
--import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
--import org.apache.rocketmq.streams.common.checkpoint.SourceSnapShot;
--import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
--
--/**
-- * @description
-- */
--public class DBCheckPointStorage extends AbstractCheckPointStorage {
--
--    static final Log logger = LogFactory.getLog(DBCheckPointStorage.class);
--    static final String STORAGE_NAME = "DB";
--
--    public DBCheckPointStorage() {
--
--    }
--
--    @Override
--    public String getStorageName() {
--        return STORAGE_NAME;
--    }
--
--    @Override
--    public <T> void save(List<T> checkPointState) {
--        logger.info(String.format("save checkpoint size %d", checkPointState.size()));
--        ORMUtil.batchReplaceInto(checkPointState);
--    }
--
--    @Override
--    public void finish() {
--
--    }
--
--    @Override
--    //todo
--    public CheckPoint recover(ISource iSource, String queueId) {
--        String sourceName = CheckPointManager.createSourceName(iSource, null);
--        String key = CheckPointManager.createCheckPointKey(sourceName, queueId);
--        String sql = "select * from source_snap_shot where `key` = " + "'" + key + "';";
--        SourceSnapShot snapShot = ORMUtil.queryForObject(sql, null, SourceSnapShot.class);
--
--        logger.info(String.format("checkpoint recover key is %s, sql is %s, recover sourceSnapShot : %s", key, sql, snapShot == null ? "null snapShot" : snapShot.toString()));
--        return new CheckPoint().fromSnapShot(snapShot);
--    }
--}
diff --cc rocketmq-streams-connectors/pom.xml
index 8c8277e3,d544e3d5..00000000
deleted file mode 100755,100755
--- a/rocketmq-streams-connectors/pom.xml
+++ /dev/null
@@@ -1,40 -1,47 +1,0 @@@
--<?xml version="1.0" encoding="utf-8"?>
--<!--
--  Licensed to the Apache Software Foundation (ASF) under one or more
--  contributor license agreements.  See the NOTICE file distributed with
--  this work for additional information regarding copyright ownership.
--  The ASF licenses this file to You under the Apache License, Version 2.0
--  (the "License"); you may not use this file except in compliance with
--  the License.  You may obtain a copy of the License at
--
--      http://www.apache.org/licenses/LICENSE-2.0
--
--  Unless required by applicable law or agreed to in writing, software
--  distributed under the License is distributed on an "AS IS" BASIS,
--  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--  See the License for the specific language governing permissions and
--  limitations under the License.
--  -->
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 -<project xmlns="http://maven.apache.org/POM/4.0.0"
 -         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 -         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
--    <modelVersion>4.0.0</modelVersion>
--    <parent>
--        <groupId>org.apache.rocketmq</groupId>
--        <artifactId>rocketmq-streams</artifactId>
-         <version>1.0.2-preview-SNAPSHOT</version>
 -        <version>1.0.2-SNAPSHOT</version>
--    </parent>
--    <artifactId>rocketmq-streams-connectors</artifactId>
--    <packaging>jar</packaging>
--    <name>ROCKETMQ STREAMS :: connectors</name>
--
--    <dependencies>
--        <dependency>
--            <groupId>org.apache.rocketmq</groupId>
--            <artifactId>rocketmq-streams-lease</artifactId>
 -        </dependency>
 -
 -        <dependency>
 -            <groupId>org.apache.rocketmq</groupId>
 -            <artifactId>rocketmq-streams-schedule</artifactId>
--        </dependency>
--
--        <dependency>
--            <groupId>org.apache.rocketmq</groupId>
--            <artifactId>rocketmq-streams-commons</artifactId>
--        </dependency>
--    </dependencies>
--</project>
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java
index 5c2b266f,5c2b266f..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java
+++ /dev/null
@@@ -1,207 -1,207 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.balance;
--
--import java.util.ArrayList;
--import java.util.HashSet;
--import java.util.List;
--import java.util.Set;
--import org.apache.rocketmq.streams.common.channel.split.ISplit;
--import org.apache.rocketmq.streams.connectors.source.SourceInstance;
--
--public abstract class AbstractBalance implements ISourceBalance {
--
--    protected int balanceCount = 0;
--
--    @Override
--    public SplitChanged doBalance(List<ISplit> allSplits, List<ISplit> ownerSplits) {
--        balanceCount++;
--        heartBeat();
--        List<SourceInstance> sourceInstances = fetchSourceInstances();
--        List<ISplit> workingSplits = fetchWorkingSplits(allSplits);
--        SplitChanged splitChanged = getAdditionSplits(allSplits, sourceInstances, workingSplits, ownerSplits);
--        if (splitChanged != null) {
--            return splitChanged;
--        }
--        splitChanged = getRemoveSplits(allSplits, sourceInstances, workingSplits, ownerSplits);
--        return splitChanged;
--    }
--
--    protected void heartBeat() {
--        holdLockSourceInstance();
--    }
--
--    /**
--     * get all dispatch splits
--     *
--     * @return
--     */
--    protected abstract List<ISplit> fetchWorkingSplits(List<ISplit> allSplitS);
--
--    /**
--     * get all instacne for the source
--     *
--     * @return
--     */
--    protected abstract List<SourceInstance> fetchSourceInstances();
--
--    /**
--     * lock the source ,the lock is globel,only one source instance can get it in same time
--     *
--     * @return
--     */
--    protected abstract boolean holdLockSourceInstance();
--
--    /**
--     * unlock
--     */
--    protected abstract void unlockSourceInstance();
--
--    /**
--     * juge need add split,根据调度策略选择
--     * 每次最大增加的分片数,根据调度次数决定
--     *
--     * @param allSplits
--     * @param sourceInstances
--     * @param workingSplits
--     * @return
--     */
--    protected SplitChanged getAdditionSplits(List<ISplit> allSplits, List<SourceInstance> sourceInstances,
--        List<ISplit> workingSplits, List<ISplit> ownerSplits) {
--        SplitChanged splitChanged = getChangedSplitCount(allSplits, sourceInstances, workingSplits.size(), ownerSplits.size());
--        if (splitChanged == null) {
--            return null;
--        }
--        if (splitChanged.isNewSplit == false) {
--            return null;
--        }
--        if (splitChanged.splitCount <= 0) {
--            return null;
--        }
--        List<ISplit> noWorkingSplits = getNoWorkingSplits(allSplits, workingSplits);
--        List<ISplit> newSplits = new ArrayList<>();
--        for (int i = 0; i < noWorkingSplits.size(); i++) {
--            boolean success = holdLockSplit(noWorkingSplits.get(i));
--            if (success) {
--                newSplits.add(noWorkingSplits.get(i));
--                if (newSplits.size() >= splitChanged.splitCount) {
--                    break;
--                }
--            }
--        }
--        splitChanged.setChangedSplits(newSplits);
--        return splitChanged;
--
--    }
--
--    protected List<ISplit> getNoWorkingSplits(List<ISplit> allSplits, List<ISplit> workingSplits) {
--        Set<String> workingSplitIds = new HashSet<>();
--        for (ISplit split : workingSplits) {
--            workingSplitIds.add(split.getQueueId());
--        }
--        List<ISplit> splits = new ArrayList<>();
--        for (ISplit split : allSplits) {
--            if (!workingSplitIds.contains(split.getQueueId())) {
--                splits.add(split);
--            }
--        }
--        return splits;
--    }
--
--    /**
--     * 获取需要删除的分片
--     *
--     * @param allSplits
--     * @param sourceInstances
--     * @param workingSplits
--     * @return
--     */
--    protected SplitChanged getRemoveSplits(List<ISplit> allSplits, List<SourceInstance> sourceInstances,
--        List<ISplit> workingSplits, List<ISplit> ownerSplits) {
--        SplitChanged splitChanged = getChangedSplitCount(allSplits, sourceInstances, workingSplits.size(), ownerSplits.size());
--        if (splitChanged == null) {
--            return null;
--        }
--        if (splitChanged.isNewSplit == true) {
--            return null;
--        }
--
--        if (splitChanged.splitCount <= 0) {
--            return null;
--        }
--        //List<ISplit> ownerSplits=source.ownerSplits();
--        List<ISplit> removeSplits = new ArrayList();
--        for (int i = 0; i < splitChanged.splitCount; i++) {
--            removeSplits.add(ownerSplits.get(i));
--        }
--        splitChanged.setChangedSplits(removeSplits);
--        return splitChanged;
--    }
--
--    /**
--     * 获取需要变动的分片个数,新增或删除
--     * 分配策略,只有有未分配的分片时才会分配新分片,为了减少分片切换,前面几次尽可能少分,后面越来越多
--     *
--     * @return 需要本实例有变化的分配,新增或删除
--     */
--    protected SplitChanged getChangedSplitCount(List<ISplit> allSplits, List<SourceInstance> sourceInstances,
--        int splitCountInWorking, int ownerSplitCount) {
--        //int ownerSplitCount=source.ownerSplits().size();
--        int instanceCount = sourceInstances.size();
--        if (instanceCount == 0) {
--            instanceCount = 1;
--        }
--        int allSplitCount = allSplits.size();
--        int minSplitCount = allSplitCount / instanceCount;
--        int maxSplitCount = minSplitCount + (allSplitCount % instanceCount == 0 ? 0 : 1);
--        //已经是最大分片数了
--        if (ownerSplitCount == maxSplitCount) {
--            return null;
--        }
--        if (ownerSplitCount > maxSplitCount) {
--            int changeSplitCount = ownerSplitCount - maxSplitCount;
--            return new SplitChanged(changeSplitCount, false);
--        }
--        //分片已经全部在处理,当前分片也符合最小分片分配策略,不需要重新分配
--        if (splitCountInWorking == allSplitCount && ownerSplitCount >= minSplitCount) {
--            return null;
--        }
--        //如果还有未分配的分片,且当前实例还有分片的可行性,则分配分片
--        if (splitCountInWorking < allSplitCount && ownerSplitCount < maxSplitCount) {
--            int changeSplitCount = Math.min(maxSplitCount - ownerSplitCount, getMaxSplitCountInOneBalance());
--
--            return new SplitChanged(changeSplitCount, true);
--        }
--        return null;
--    }
--
--    @Override
--    public int getBalanceCount() {
--        return balanceCount;
--    }
--
--    /**
--     * 每次负载均衡最大的分片个数,目的是前几次,少分配分配,可能有实例在启动中,以免频繁切换分片,到后面实例都启动了,斤可能多分配分片
--     *
--     * @return
--     */
--    private int getMaxSplitCountInOneBalance() {
--        int balanceCount = getBalanceCount();
--        return (int) Math.pow(2, balanceCount - 1);
--    }
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/ISourceBalance.java
index b012b323,b012b323..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/ISourceBalance.java
+++ /dev/null
@@@ -1,60 -1,60 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.balance;
--
--import java.util.List;
--import org.apache.rocketmq.streams.common.channel.split.ISplit;
--
--public interface ISourceBalance {
--
--    /**
--     * 做负载均衡
--
--     * @return
--     */
--    SplitChanged doBalance(List<ISplit> allSplits, List<ISplit> ownerSplits);
--
--    /**
--     * 从启动开始,做了多少次均衡
--     * @return
--     */
--    int getBalanceCount();
--
--
--
--    boolean getRemoveSplitLock();
--
--    void unLockRemoveSplitLock();
--
--    /**
--     * lock the split and hold it util the instance is shutdown or remove split
--     * @param split
--     * @return
--     */
--    boolean holdLockSplit(ISplit split);
--
--    /**
--     * unlock split lock
--     * @param split
--     */
--    void unlockSplit(ISplit split);
--
--
--    void setSourceIdentification(String sourceIdentification);
--
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java
index c01c1519,c01c1519..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java
+++ /dev/null
@@@ -1,55 -1,55 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.balance;
--
--import java.util.List;
--import org.apache.rocketmq.streams.common.channel.split.ISplit;
--
--public class SplitChanged {
--
--    protected int splitCount;//变动多分片个数
--    protected boolean isNewSplit;//是否新增,false是删除
--    protected List<ISplit> changedSplits;
--    public SplitChanged(int splitCount,boolean isNewSplit){
--        this.splitCount=splitCount;
--        this.isNewSplit=isNewSplit;
--    }
--
--    public int getSplitCount() {
--        return splitCount;
--    }
--
--    public void setSplitCount(int splitCount) {
--        this.splitCount = splitCount;
--    }
--
--    public boolean isNewSplit() {
--        return isNewSplit;
--    }
--
--    public void setNewSplit(boolean newSplit) {
--        isNewSplit = newSplit;
--    }
--
--    public List<ISplit> getChangedSplits() {
--        return changedSplits;
--    }
--
--    public void setChangedSplits(List<ISplit> changedSplits) {
--        this.changedSplits = changedSplits;
--    }
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java
index dc504e5d,dc504e5d..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java
+++ /dev/null
@@@ -1,144 -1,144 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.balance.impl;
--
--import com.google.auto.service.AutoService;
--import java.util.ArrayList;
--import java.util.Arrays;
--import java.util.HashMap;
--import java.util.List;
--import java.util.Map;
--import org.apache.commons.logging.Log;
--import org.apache.commons.logging.LogFactory;
--import org.apache.rocketmq.streams.common.channel.split.ISplit;
--import org.apache.rocketmq.streams.common.model.ServiceName;
--import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
--import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
--import org.apache.rocketmq.streams.connectors.balance.AbstractBalance;
--import org.apache.rocketmq.streams.connectors.balance.ISourceBalance;
--import org.apache.rocketmq.streams.connectors.source.SourceInstance;
--import org.apache.rocketmq.streams.lease.LeaseComponent;
--import org.apache.rocketmq.streams.lease.model.LeaseInfo;
--import org.apache.rocketmq.streams.lease.service.ILeaseService;
--
--@AutoService(ISourceBalance.class)
--@ServiceName(LeaseBalanceImpl.DB_BALANCE_NAME)
--public class LeaseBalanceImpl extends AbstractBalance {
--
--    private static final Log logger = LogFactory.getLog(LeaseBalanceImpl.class);
--
--    public static final String DB_BALANCE_NAME = "db_balance";
--    private static final String REMOVE_SPLIT_LOCK_NAME = "lock_remove_split";
--    private static final String SOURCE_LOCK_PREFIX = "SOURCE_";
--    private static final String SPLIT_LOCK_PREFIX = "SPLIT_";
--    protected transient LeaseComponent leaseComponent = LeaseComponent.getInstance();
--    protected transient String sourceIdentification;
--
--    protected int lockTimeSecond = 5;
--
--    public LeaseBalanceImpl(String sourceIdentification) {
--
--        this.sourceIdentification = sourceIdentification;
--    }
--
--    public LeaseBalanceImpl() {
--
--    }
--
--    @Override
--    protected List<ISplit> fetchWorkingSplits(List<ISplit> allSplits) {
--        List<LeaseInfo> leaseInfos = leaseComponent.getService().queryLockedInstanceByNamePrefix(SPLIT_LOCK_PREFIX + this.sourceIdentification, null);
--        logger.info(String.format("lease SPLIT_LOCK_PREFIX is %s, sourceIdentification is %s. ", SPLIT_LOCK_PREFIX, sourceIdentification));
--        if (leaseInfos == null) {
--            return new ArrayList<>();
--        }
--
--        Map<String, ISplit> allSplitMap = new HashMap<>();
--        for (ISplit split : allSplits) {
--            allSplitMap.put(split.getQueueId(), split);
--        }
--        List<ISplit> splits = new ArrayList<>();
--        for (LeaseInfo leaseInfo : leaseInfos) {
--            String leaseName = leaseInfo.getLeaseName();
--            String splitId = MapKeyUtil.getLast(leaseName);
--            splits.add(allSplitMap.get(splitId));
--        }
--        logger.info(String.format("working split is %s", Arrays.toString(splits.toArray())));
--        return splits;
--    }
--
--    @Override
--    protected List<SourceInstance> fetchSourceInstances() {
--        List<LeaseInfo> leaseInfos = leaseComponent.getService().queryLockedInstanceByNamePrefix(SOURCE_LOCK_PREFIX + sourceIdentification, null);
--        if (leaseInfos == null) {
--            return new ArrayList<>();
--        }
--        List<SourceInstance> sourceInstances = new ArrayList<>();
--        for (LeaseInfo leaseInfo : leaseInfos) {
--            String leaseName = leaseInfo.getLeaseName();
--            sourceInstances.add(new SourceInstance(leaseName));
--        }
--        return sourceInstances;
--    }
--
--    @Override
--    protected boolean holdLockSourceInstance() {
--        return holdLock(SOURCE_LOCK_PREFIX + sourceIdentification, RuntimeUtil.getDipperInstanceId());
--    }
--
--    @Override
--    protected void unlockSourceInstance() {
--        leaseComponent.getService().unlock(SOURCE_LOCK_PREFIX + sourceIdentification, RuntimeUtil.getDipperInstanceId());
--    }
--
--    @Override
--    public boolean holdLockSplit(ISplit split) {
--        return holdLock(SPLIT_LOCK_PREFIX + this.sourceIdentification, split.getQueueId());
--    }
--
--    @Override
--    public void unlockSplit(ISplit split) {
--        leaseComponent.getService().unlock(SPLIT_LOCK_PREFIX + this.sourceIdentification, split.getQueueId());
--
--    }
--
--    @Override
--    public boolean getRemoveSplitLock() {
--        return holdLock(this.sourceIdentification, REMOVE_SPLIT_LOCK_NAME);
--    }
--
--    @Override
--    public void unLockRemoveSplitLock() {
--        leaseComponent.getService().unlock(this.sourceIdentification, REMOVE_SPLIT_LOCK_NAME);
--    }
--
--    public String getSourceIdentification() {
--        return sourceIdentification;
--    }
--
--    @Override
--    public void setSourceIdentification(String sourceIdentification) {
--        this.sourceIdentification = sourceIdentification;
--    }
--
--    protected boolean holdLock(String name, String lockName) {
--        ILeaseService leaseService = leaseComponent.getService();
--        boolean success = leaseService.holdLock(name, lockName, lockTimeSecond);
--        return success;
--    }
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/PullMessage.java
index 9bf34803,9bf34803..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/PullMessage.java
+++ /dev/null
@@@ -1,50 -1,50 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.model;
--
--import org.apache.rocketmq.streams.common.context.MessageOffset;
--
--public class PullMessage<T> {
--    protected T message;
--    protected MessageOffset messageOffset;
--
--    public T getMessage() {
--        return message;
--    }
--
--    public void setMessage(T message) {
--        this.message = message;
--    }
--
--    public MessageOffset getMessageOffset() {
--        return messageOffset;
--    }
--
--    public void setMessageOffset(MessageOffset messageOffset) {
--        this.messageOffset = messageOffset;
--    }
--    /**
--     * 获取offset字符串,通过.把主offset和子offset串接在一起
--     * @return
--     */
--    public String getOffsetStr(){
--       return this.messageOffset.getOffsetStr();
--    }
--    public String getMainOffset() {
--        return messageOffset.getMainOffset();
--    }
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java
index a4889b5f,a4889b5f..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java
+++ /dev/null
@@@ -1,120 -1,120 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.model;
--
--import java.util.Date;
--import java.util.List;
--import org.apache.rocketmq.streams.common.model.Entity;
--import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
--
--/**
-- * @description
-- */
--public class ReaderStatus extends Entity {
--
--    /**
--     * 查询单个readerStatus
--     */
--    static final String queryReaderStatusByUK = "select * from reader_status where source_name = '%s' and reader_name = '%s' and is_finished = 1";
--
--    static final String queryReaderStatusList = "select * from reader_status where source_name = '%s' and is_finished = 1";
--
--    static final String clearReaderStatus = "update reader_status set gmt_modified = now(), is_finished = -1 where source_name = '%s' and reader_name = '%s'";
--
--    String sourceName;
--
--    String readerName;
--
--    int isFinished;
--
--    int totalReader;
--
--    public String getReaderName() {
--        return readerName;
--    }
--
--    public void setReaderName(String readerName) {
--        this.readerName = readerName;
--    }
--
--    public int getIsFinished() {
--        return isFinished;
--    }
--
--    public void setIsFinished(int isFinished) {
--        this.isFinished = isFinished;
--    }
--
--    public int getTotalReader() {
--        return totalReader;
--    }
--
--    public void setTotalReader(int totalReader) {
--        this.totalReader = totalReader;
--    }
--
--    public String getSourceName() {
--        return sourceName;
--    }
--
--    public void setSourceName(String sourceName) {
--        this.sourceName = sourceName;
--    }
--
--    @Override
--    public String toString() {
--        return "ReaderStatus{" +
--            "id=" + id +
--            ", gmtCreate=" + gmtCreate +
--            ", gmtModified=" + gmtModified +
--            ", sourceName='" + sourceName + '\'' +
--            ", readerName='" + readerName + '\'' +
--            ", isFinished=" + isFinished +
--            ", totalReader=" + totalReader +
--            '}';
--    }
--
--    public static ReaderStatus queryReaderStatusByUK(String sourceName, String readerName) {
--        String sql = String.format(queryReaderStatusByUK, sourceName, readerName);
--        ReaderStatus readerStatus = ORMUtil.queryForObject(sql, null, ReaderStatus.class);
--        return readerStatus;
--    }
--
--    public static List<ReaderStatus> queryReaderStatusListBySourceName(String sourceName) {
--        String sql = String.format(queryReaderStatusList, sourceName);
--        List<ReaderStatus> readerStatusList = ORMUtil.queryForList(sql, null, ReaderStatus.class);
--        return readerStatusList;
--    }
--
--    public static void clearReaderStatus(String sourceName, String readerName) {
--        String sql = String.format(clearReaderStatus, sourceName, readerName);
--        ORMUtil.executeSQL(sql, null);
--    }
--
--    public static ReaderStatus create(String sourceName, String readerName, int isFinished, int totalReader) {
--
--        ReaderStatus readerStatus = new ReaderStatus();
--        readerStatus.setSourceName(sourceName);
--        readerStatus.setReaderName(readerName);
--        readerStatus.setIsFinished(isFinished);
--        readerStatus.setTotalReader(totalReader);
--        readerStatus.setGmtCreate(new Date());
--        readerStatus.setGmtModified(new Date());
--        return readerStatus;
--
--    }
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
index 268e891e,268e891e..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
+++ /dev/null
@@@ -1,269 -1,269 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.reader;
--
--import com.alibaba.fastjson.JSON;
--import com.alibaba.fastjson.JSONObject;
--import java.io.Serializable;
--import java.util.ArrayList;
--import java.util.List;
--import java.util.Map;
--import org.apache.commons.logging.Log;
--import org.apache.commons.logging.LogFactory;
--import org.apache.rocketmq.streams.common.channel.source.ISource;
--import org.apache.rocketmq.streams.common.channel.split.ISplit;
--import org.apache.rocketmq.streams.common.component.AbstractComponent;
--import org.apache.rocketmq.streams.common.context.MessageOffset;
--import org.apache.rocketmq.streams.common.utils.ThreadUtil;
--import org.apache.rocketmq.streams.connectors.IBoundedSource;
--import org.apache.rocketmq.streams.connectors.IBoundedSourceReader;
--import org.apache.rocketmq.streams.connectors.model.PullMessage;
--import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
--import org.apache.rocketmq.streams.connectors.source.CycleDynamicMultipleDBScanSource;
--import org.apache.rocketmq.streams.db.driver.DriverBuilder;
--import org.apache.rocketmq.streams.db.driver.JDBCDriver;
--import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
--
--/**
-- * @description
-- */
--public class DBScanReader implements ISplitReader, IBoundedSourceReader, Serializable {
--
--    private static final long serialVersionUID = 8172403250050893288L;
--    private static final Log logger = LogFactory.getLog(DBScanReader.class);
--    static final String sqlTemplate = "select * from %s where id >= %d and id < %d";
--
--    //是否完成了source的call back调用
--    transient volatile boolean isFinishedCall = false;
--    ISource iSource;
--    String url;
--    String userName;
--    String password;
--    String tableName;
--    int batchSize;
--    long offset;
--    long offsetStart;
--    long offsetEnd;
--    long maxOffset;
--    long minOffset;
--    ISplit iSplit;
--    transient List<PullMessage> pullMessages;
--    volatile boolean interrupt = false;
--    volatile boolean isClosed = false;
--
--    public String getUrl() {
--        return url;
--    }
--
--    public void setUrl(String url) {
--        this.url = url;
--    }
--
--    public String getUserName() {
--        return userName;
--    }
--
--    public void setUserName(String userName) {
--        this.userName = userName;
--    }
--
--    public String getPassword() {
--        return password;
--    }
--
--    public void setPassword(String password) {
--        this.password = password;
--    }
--
--    public String getTableName() {
--        return tableName;
--    }
--
--    public void setTableName(String tableName) {
--        this.tableName = tableName;
--    }
--
--    public int getBatchSize() {
--        return batchSize;
--    }
--
--    public void setBatchSize(int batchSize) {
--        this.batchSize = batchSize;
--    }
--
--    public ISplit getISplit() {
--        return iSplit;
--    }
--
--    public void setISplit(ISplit iSplit) {
--        this.iSplit = iSplit;
--    }
--
--    public DBScanReader() {
--
--    }
--
--    transient ThreadLocal<JDBCDriver> threadLocal = new ThreadLocal<JDBCDriver>() {
--
--        @Override
--        public JDBCDriver initialValue() {
--            logger.info(String.format("%s initial jdbcDriver. ", Thread.currentThread().getName()));
--            return DriverBuilder.createDriver(AbstractComponent.DEFAULT_JDBC_DRIVER, url, userName, password);
--        }
--
--    };
--
--    @Override
--    public void open(ISplit split) {
--        this.iSplit = split;
--        JDBCDriver jdbcDriver = threadLocal.get();
--        Map<String, Object> range = jdbcDriver.queryOneRow("select min(id) as min_id, max(id) as max_id from " + tableName);
--        minOffset = Long.parseLong(String.valueOf(range.get("min_id")));
--        maxOffset = Long.parseLong(String.valueOf(range.get("max_id")));
--        offsetStart = minOffset;
--        offset = minOffset;
--        logger.info(String.format("table %s min id [ %d ],  max id [ %d ]", tableName, minOffset, maxOffset));
--        pullMessages = new ArrayList<>();
--    }
--
--    @Override
--    public boolean next() {
--        if (interrupt) {
--            return false;
--        }
--        if (isFinished()) {
--            finish();
--            ThreadUtil.sleep(10 * 1000);
--            return false;
--        }
--        JDBCDriver jdbcDriver = threadLocal.get();
--        offsetEnd = offsetStart + batchSize;
--        String batchQuery = String.format(sqlTemplate, tableName, offsetStart, offsetEnd);
--        logger.debug(String.format("execute sql : %s", batchQuery));
--        List<Map<String, Object>> resultData = jdbcDriver.queryForList(batchQuery);
--        offsetStart = offsetEnd;
--        pullMessages.clear();
--        for (Map<String, Object> r : resultData) {
--            PullMessage msg = new PullMessage();
--            JSONObject data = JSONObject.parseObject(JSON.toJSONString(r));
--            msg.setMessage(data);
--            offset = offset > Long.parseLong(data.getString("id")) ? offset : Long.parseLong(data.getString("id"));
--            msg.setMessageOffset(new MessageOffset(String.valueOf(offset), true));
--            pullMessages.add(msg);
--        }
--        return offsetStart - batchSize <= maxOffset;
--    }
--
--    @Override
--    public List<PullMessage> getMessage() {
--//        logger.info(String.format("output messages %d", pullMessages.size()));
--        return pullMessages;
--    }
--
--    @Override
--    public SplitCloseFuture close() {
--//        interrupt = true;
--        isClosed = true;
--        threadLocal.remove();
--        pullMessages = null;
--        return new SplitCloseFuture(this, iSplit);
--    }
--
--    @Override
--    public void seek(String cursor) {
--        if (cursor == null || cursor.trim().equals("")) {
--            cursor = "0";
--        }
--        offset = Long.parseLong(cursor);
--        if (offset < minOffset) {
--            offset = minOffset;
--        }
--        offsetStart = offset;
--        logger.info(String.format("split %s seek %d.", iSplit.getQueueId(), offset));
--    }
--
--    @Override
--    public String getProgress() {
--        return String.valueOf(offset);
--    }
--
--    @Override
--    public long getDelay() {
--        return maxOffset - offset;
--    }
--
--    @Override
--    public long getFetchedDelay() {
--        return 0;
--    }
--
--    @Override
--    public boolean isClose() {
--        return isClosed;
--    }
--
--    @Override
--    public ISplit getSplit() {
--        return iSplit;
--    }
--
--    @Override
--    public boolean isInterrupt() {
--        return interrupt;
--    }
--
--    @Override
--    public boolean interrupt() {
--        interrupt = true;
--        return true;
--    }
--
--    @Override
--    public boolean isFinished() {
--        return offsetStart > maxOffset;
--    }
--
--    @Override
--    public void finish() {
--        if (isFinishedCall) {
--            return;
--        }
--        pullMessages = null;
--        updateReaderStatus();
--        IBoundedSource tmp = (IBoundedSource) iSource;
--        tmp.boundedFinishedCallBack(this.iSplit);
--        isFinishedCall = true;
--    }
--
--    public ISource getISource() {
--        return iSource;
--    }
--
--    public void setISource(ISource iSource) {
--        this.iSource = iSource;
--    }
--
--    private final void updateReaderStatus() {
--        String sourceName = CycleDynamicMultipleDBScanSource.createKey(this.getISource());
--        int finish = Integer.valueOf(1);
--        int total = ((CycleDynamicMultipleDBScanSource) iSource).getTotalReader();
--        ReaderStatus readerStatus = ReaderStatus.create(sourceName, iSplit.getQueueId(), finish, total);
--        logger.info(String.format("create reader status %s.", readerStatus));
--        ORMUtil.batchReplaceInto(readerStatus);
--    }
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java
index 6b377cff,6b377cff..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java
+++ /dev/null
@@@ -1,96 -1,96 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.reader;
--
--import java.io.IOException;
--import java.io.Serializable;
--import java.util.List;
--import org.apache.rocketmq.streams.common.channel.split.ISplit;
--import org.apache.rocketmq.streams.connectors.model.PullMessage;
--
--public interface ISplitReader extends Serializable {
--
--    /**
--     * Open.
--     *
--     * @param split the split
--     * @throws IOException the io exception
--     */
--    void open(ISplit split);
--
--    /**
--     * Next boolean.
--     *
--     * @return the boolean
--     * @throws IOException          the io exception
--     * @throws InterruptedException the interrupted exception
--     */
--    boolean next();
--
--    /**
--     * Gets message.
--     *
--     * @return the message
--     */
--    List<PullMessage> getMessage();
--
--    /**
--     * Close.
--     *
--     * @throws IOException the io exception
--     */
--    SplitCloseFuture close();
--
--    /**
--     * Seek.
--     *
--     * @param cursor the cursor
--     * @throws IOException the io exception
--     */
--    void seek(String cursor);
--
--    /**
--     * Gets progress.
--     *
--     * @return the progress
--     * @throws IOException the io exception
--     */
--    String getProgress();
--
--    /**
--     * Get message delay (millseconds)
--     *
--     * @return delay
--     */
--    long getDelay();
--
--    /**
--     * Get message delay (millseconds) from being fetched
--     *
--     * @return delay
--     */
--    long getFetchedDelay();
--
--    boolean isClose();
--
--    ISplit getSplit();
--
--    boolean isInterrupt();
--
--    boolean interrupt();
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java
index b28748b8,b28748b8..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java
+++ /dev/null
@@@ -1,83 -1,83 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.reader;
--
--import java.util.concurrent.ExecutionException;
--import java.util.concurrent.Future;
--import java.util.concurrent.TimeUnit;
--import java.util.concurrent.TimeoutException;
--import org.apache.rocketmq.streams.common.channel.split.ISplit;
--
--public class SplitCloseFuture implements Future<Boolean> {
--
--    protected ISplitReader reader;
--    protected ISplit split;
--
--    public SplitCloseFuture(ISplitReader reader, ISplit split) {
--        this.reader = reader;
--        this.split = split;
--    }
--
--    @Override
--    public boolean cancel(boolean mayInterruptIfRunning) {
--        return false;
--    }
--
--    @Override
--    public boolean isCancelled() {
--        return false;
--    }
--
--    @Override
--    public boolean isDone() {
--        return reader.isClose();
--    }
--
--    @Override
--    public Boolean get() throws InterruptedException, ExecutionException {
--        synchronized (reader) {
--            reader.wait();
--        }
--        return reader.isClose();
--    }
--
--    @Override
--    public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
--        synchronized (reader) {
--            long time = timeout;
--            if (unit == TimeUnit.SECONDS) {
--                time = time * 1000;
--            } else if (unit == TimeUnit.MINUTES) {
--                time = time * 1000 * 60;
--            } else if (unit == TimeUnit.HOURS) {
--                time = time * 1000 * 60 * 60;
--            } else {
--                throw new RuntimeException("can not support this timeout, expect less hour " + timeout + " the unit is " + unit);
--            }
--            reader.wait(time);
--        }
--        return reader.isClose();
--    }
--
--    public ISplitReader getReader() {
--        return reader;
--    }
--
--    public ISplit getSplit() {
--        return split;
--    }
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java
index fb09a6bd,9aedc95a..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java
+++ /dev/null
@@@ -1,272 -1,312 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.source;
--
--import com.alibaba.fastjson.JSON;
--import com.alibaba.fastjson.JSONObject;
--import java.util.ArrayList;
 -import java.util.Collection;
--import java.util.HashMap;
--import java.util.HashSet;
 -import java.util.Iterator;
--import java.util.List;
--import java.util.Map;
--import java.util.Set;
--import java.util.concurrent.ExecutionException;
 -import java.util.concurrent.ExecutorService;
 -import java.util.concurrent.Executors;
 -import java.util.concurrent.LinkedBlockingQueue;
--import java.util.concurrent.ScheduledExecutorService;
--import java.util.concurrent.ScheduledThreadPoolExecutor;
 -import java.util.concurrent.ThreadPoolExecutor;
--import java.util.concurrent.TimeUnit;
--import org.apache.commons.lang3.concurrent.BasicThreadFactory;
--import org.apache.commons.logging.Log;
--import org.apache.commons.logging.LogFactory;
--import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
--import org.apache.rocketmq.streams.common.channel.split.ISplit;
--import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
--import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
--import org.apache.rocketmq.streams.common.context.Message;
 -import org.apache.rocketmq.streams.common.threadpool.ThreadPoolFactory;
--import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
--import org.apache.rocketmq.streams.connectors.balance.ISourceBalance;
--import org.apache.rocketmq.streams.connectors.balance.SplitChanged;
--import org.apache.rocketmq.streams.connectors.balance.impl.LeaseBalanceImpl;
--import org.apache.rocketmq.streams.connectors.model.PullMessage;
--import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
--import org.apache.rocketmq.streams.connectors.reader.SplitCloseFuture;
--import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;
--
--public abstract class AbstractPullSource extends AbstractSource implements IPullSource<AbstractSource> {
--
--    private static final Log logger = LogFactory.getLog(AbstractPullSource.class);
--
--    protected transient ISourceBalance balance;// balance interface
--    protected transient ScheduledExecutorService balanceExecutor;//schdeule balance
--    protected transient Map<String, ISplitReader> splitReaders = new HashMap<>();//owner split readers
--    protected transient Map<String, ISplit> ownerSplits = new HashMap<>();//working splits by the source instance
--
--    //可以有多种实现,通过名字选择不同的实现
--    protected String balanceName = LeaseBalanceImpl.DB_BALANCE_NAME;
--    //balance schedule time
--    protected int balanceTimeSecond = 10;
--    protected long pullIntervalMs;
-     transient CheckPointManager checkPointManager = new CheckPointManager();
- 
 -    protected transient CheckPointManager checkPointManager = new CheckPointManager();
 -    protected transient boolean shutDown=false;
--    @Override
--    protected boolean startSource() {
--        ServiceLoaderComponent serviceLoaderComponent = ServiceLoaderComponent.getInstance(ISourceBalance.class);
--        balance = (ISourceBalance) serviceLoaderComponent.getService().loadService(balanceName);
--        balance.setSourceIdentification(MapKeyUtil.createKey(getNameSpace(), getConfigureName()));
--        balanceExecutor = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("balance-task-%d").daemon(true).build());
--        List<ISplit> allSplits = fetchAllSplits();
--        SplitChanged splitChanged = balance.doBalance(allSplits, new ArrayList(ownerSplits.values()));
--        doSplitChanged(splitChanged);
--        balanceExecutor.scheduleWithFixedDelay(new Runnable() {
--            @Override
--            public void run() {
--                logger.info("balance running..... current splits is " + ownerSplits);
--                List<ISplit> allSplits = fetchAllSplits();
--                SplitChanged splitChanged = balance.doBalance(allSplits, new ArrayList(ownerSplits.values()));
--                doSplitChanged(splitChanged);
--            }
--        }, balanceTimeSecond, balanceTimeSecond, TimeUnit.SECONDS);
 -
 -        startWorks();
--        return true;
 -    }
 -
 -    private void startWorks() {
 -        ExecutorService workThreads= ThreadPoolFactory.createThreadPool(maxThread);
 -        long start=System.currentTimeMillis();
 -        while (!shutDown) {
 -            Iterator<Map.Entry<String, ISplitReader>> it = splitReaders.entrySet().iterator();
 -            while (it.hasNext()) {
 -                Map.Entry<String, ISplitReader> entry=it.next();
 -                String splitId=entry.getKey();
 -                ISplit split=ownerSplits.get(splitId);
 -                ISplitReader reader=entry.getValue();
 -                ReaderRunner runner=new ReaderRunner(split,reader);
 -                workThreads.execute(runner);
 -            }
 -            try {
 -                long sleepTime=this.pullIntervalMs-(System.currentTimeMillis()-start);
 -                if(sleepTime>0){
 -                    Thread.sleep(sleepTime);
 -                }
 -            } catch (InterruptedException e) {
 -                e.printStackTrace();
 -            }
 -        }
--    }
--
--    @Override
--    public Map<String, ISplit> getAllSplitMap() {
--        List<ISplit> splits = fetchAllSplits();
--        if (splits == null) {
--            return new HashMap<>();
--        }
--        Map<String, ISplit> splitMap = new HashMap<>();
--        for (ISplit split : splits) {
--            splitMap.put(split.getQueueId(), split);
--        }
--        return splitMap;
--    }
--
--    protected void doSplitChanged(SplitChanged splitChanged) {
--        if (splitChanged == null) {
--            return;
--        }
--        if (splitChanged.getSplitCount() == 0) {
--            return;
--        }
--        if (splitChanged.isNewSplit()) {
--            doSplitAddition(splitChanged.getChangedSplits());
--        } else {
--            doSplitRelease(splitChanged.getChangedSplits());
--        }
--    }
--
--    protected void doSplitAddition(List<ISplit> changedSplits) {
--        if (changedSplits == null) {
--            return;
--        }
--        Set<String> splitIds = new HashSet<>();
--        for (ISplit split : changedSplits) {
--            splitIds.add(split.getQueueId());
--        }
--        addNewSplit(splitIds);
--        for (ISplit split : changedSplits) {
--            ISplitReader reader = createSplitReader(split);
--            reader.open(split);
--            reader.seek(loadSplitOffset(split));
--            splitReaders.put(split.getQueueId(), reader);
--            this.ownerSplits.put(split.getQueueId(), split);
-             logger.info("start next");
-             Thread thread = new Thread(new Runnable() {
-                 long mLastCheckTime = System.currentTimeMillis();
- 
-                 @Override
-                 public void run() {
-                     logger.info("start running");
-                     while (reader.isInterrupt() == false) {
-                         if (reader.next()) {
-                             List<PullMessage> messages = reader.getMessage();
-                             if (messages != null) {
-                                 for (PullMessage pullMessage : messages) {
-                                     String queueId = split.getQueueId();
-                                     String offset = pullMessage.getOffsetStr();
-                                     JSONObject msg = createJson(pullMessage.getMessage());
-                                     Message message = createMessage(msg, queueId, offset, false);
-                                     message.getHeader().setOffsetIsLong(pullMessage.getMessageOffset().isLongOfMainOffset());
-                                     executeMessage(message);
-                                 }
-                             }
-                         }
-                         long curTime = System.currentTimeMillis();
-                         if (curTime - mLastCheckTime > getCheckpointTime()) {
-                             sendCheckpoint(reader.getSplit().getQueueId());
-                             mLastCheckTime = curTime;
-                         }
-                         try {
-                             Thread.sleep(pullIntervalMs);
-                         } catch (InterruptedException e) {
-                             e.printStackTrace();
-                         }
- 
-                     }
-                     try {
-                         Thread.sleep(10);
-                     } catch (InterruptedException e) {
-                         e.printStackTrace();
-                     }
-                     Set<String> removeSplits = new HashSet<>();
-                     removeSplits.add(reader.getSplit().getQueueId());
-                     removeSplit(removeSplits);
-                     balance.unlockSplit(split);
-                     reader.close();
-                     synchronized (reader) {
-                         reader.notifyAll();
-                     }
- 
-                 }
-             });
-             thread.setName("reader-task-" + reader.getSplit().getQueueId());
-             thread.start();
 -//            logger.info("start next");
 -//            Thread thread = new Thread(new Runnable() {
 -//
 -//            thread.setName("reader-task-" + reader.getSplit().getQueueId());
 -//            thread.start();
--        }
--
--    }
--
--    @Override
--    public String loadSplitOffset(ISplit split) {
--        String offset = null;
--        CheckPoint<String> checkPoint = checkPointManager.recover(this, split);
--        if (checkPoint != null) {
--            offset = JSON.parseObject(checkPoint.getData()).getString("offset");
--        }
--        return offset;
--    }
--
--    protected abstract ISplitReader createSplitReader(ISplit split);
--
--    protected void doSplitRelease(List<ISplit> changedSplits) {
--        boolean success = balance.getRemoveSplitLock();
--        if (!success) {
--            return;
--        }
--        try {
--            List<SplitCloseFuture> closeFutures = new ArrayList<>();
--            for (ISplit split : changedSplits) {
--                ISplitReader reader = this.splitReaders.get(split.getQueueId());
--                if (reader == null) {
--                    continue;
--                }
--                SplitCloseFuture future = reader.close();
--                closeFutures.add(future);
--            }
--            for (SplitCloseFuture future : closeFutures) {
--                try {
-                     future.get();
 -                    if(!future.isDone()){
 -                        future.get();
 -                    }
--                    this.splitReaders.remove(future.getSplit().getQueueId());
--                    this.ownerSplits.remove(future.getSplit().getQueueId());
--                } catch (InterruptedException e) {
--                    e.printStackTrace();
--                } catch (ExecutionException e) {
--                    e.printStackTrace();
--                }
--            }
--
--        } finally {
--            balance.unLockRemoveSplitLock();
 -        }
 -
 -    }
 -
 -
 -    protected class ReaderRunner implements Runnable{
 -        long mLastCheckTime = System.currentTimeMillis();
 -        protected ISplit split;
 -        protected ISplitReader reader;
 -
 -        public ReaderRunner(ISplit split,ISplitReader reader){
 -            this.split=split;
 -            this.reader=reader;
 -        }
 -
 -        @Override
 -        public void run() {
 -            logger.info("start running");
 -            if (reader.isInterrupt() == false) {
 -                if (reader.next()) {
 -                    List<PullMessage> messages = reader.getMessage();
 -                    if (messages != null) {
 -                        for (PullMessage pullMessage : messages) {
 -                            String queueId = split.getQueueId();
 -                            String offset = pullMessage.getOffsetStr();
 -                            JSONObject msg = createJson(pullMessage.getMessage());
 -                            Message message = createMessage(msg, queueId, offset, false);
 -                            message.getHeader().setOffsetIsLong(pullMessage.getMessageOffset().isLongOfMainOffset());
 -                            executeMessage(message);
 -                        }
 -                    }
 -                    reader.notifyAll();
 -                }
 -                long curTime = System.currentTimeMillis();
 -                if (curTime - mLastCheckTime > getCheckpointTime()) {
 -                    sendCheckpoint(reader.getSplit().getQueueId());
 -                    mLastCheckTime = curTime;
 -                }
 -
 -
 -            }else {
 -                Set<String> removeSplits = new HashSet<>();
 -                removeSplits.add(reader.getSplit().getQueueId());
 -                removeSplit(removeSplits);
 -                balance.unlockSplit(split);
 -                reader.close();
 -                synchronized (reader) {
 -                    reader.notifyAll();
 -                }
 -            }
 -
--        }
--
--    }
--
--    @Override
--    public boolean supportNewSplitFind() {
--        return true;
--    }
--
--    @Override
--    public boolean supportRemoveSplitFind() {
--        return true;
--    }
--
--    @Override
--    public boolean supportOffsetRest() {
--        return true;
--    }
--
--    @Override
--    public Long getPullIntervalMs() {
--        return pullIntervalMs;
--    }
--
--    public String getBalanceName() {
--        return balanceName;
--    }
--
--    public void setBalanceName(String balanceName) {
--        this.balanceName = balanceName;
--    }
--
--    public int getBalanceTimeSecond() {
--        return balanceTimeSecond;
--    }
--
--    public void setBalanceTimeSecond(int balanceTimeSecond) {
--        this.balanceTimeSecond = balanceTimeSecond;
--    }
--
--    public void setPullIntervalMs(long pullIntervalMs) {
--        this.pullIntervalMs = pullIntervalMs;
--    }
--
--    @Override
--    public List<ISplit> ownerSplits() {
--        return new ArrayList(ownerSplits.values());
--    }
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
index 561b48f2,561b48f2..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
+++ /dev/null
@@@ -1,213 -1,213 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.source;
--
--import com.alibaba.fastjson.JSONObject;
--import java.io.Serializable;
--import java.util.Arrays;
--import java.util.Iterator;
--import java.util.List;
--import java.util.Map;
--import java.util.concurrent.ConcurrentHashMap;
--import java.util.concurrent.atomic.AtomicInteger;
--import org.apache.commons.logging.Log;
--import org.apache.commons.logging.LogFactory;
--import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
--import org.apache.rocketmq.streams.common.channel.source.ISource;
--import org.apache.rocketmq.streams.common.channel.source.systemmsg.ChangeTableNameMessage;
--import org.apache.rocketmq.streams.common.channel.split.ISplit;
--import org.apache.rocketmq.streams.common.context.Message;
--import org.apache.rocketmq.streams.common.metadata.MetaDataUtils;
--import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
--import org.apache.rocketmq.streams.common.utils.ThreadUtil;
--import org.apache.rocketmq.streams.connectors.IBoundedSource;
--import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
--import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
--import org.apache.rocketmq.streams.connectors.source.filter.CycleSchedule;
--import org.apache.rocketmq.streams.connectors.source.filter.CycleScheduleFilter;
--import org.apache.rocketmq.streams.db.CycleSplit;
--
--/**
-- * @description
-- */
--public class CycleDynamicMultipleDBScanSource extends DynamicMultipleDBScanSource implements IBoundedSource, Serializable {
--
--    private static final long serialVersionUID = 6840988298037061128L;
--    private static final Log logger = LogFactory.getLog(CycleDynamicMultipleDBScanSource.class);
--
--    Map<String, Boolean> initReaderMap = new ConcurrentHashMap<>();
--    CycleSchedule.Cycle cycle;
--    transient AtomicInteger size = new AtomicInteger(0);
--
--    public CycleDynamicMultipleDBScanSource() {
--        super();
--    }
--
--    public CycleDynamicMultipleDBScanSource(CycleSchedule.Cycle cycle) {
--        super();
--        this.cycle = cycle;
--    }
--
--    public AtomicInteger getSize() {
--        return size;
--    }
--
--    public void setSize(AtomicInteger size) {
--        this.size = size;
--    }
--
--    /**
--     * @return
--     */
--    //todo
--    @Override
--    public synchronized List<ISplit> fetchAllSplits() {
--
--        if (this.filter == null) {
--            filter = new CycleScheduleFilter(cycle.getAllPattern());
--        }
--
--        //如果还是当前周期, 已经完成全部分区的加载, 则不在加载
--        if (size.get() == cycle.getCycleCount()) {
--            return splits;
--        }
--        String sourceName = createKey(this);
--        List<String> tableNames = MetaDataUtils.listTableNameByPattern(url, userName, password, logicTableName + "%");
--
--        logger.info(String.format("load all logic table : %s", Arrays.toString(tableNames.toArray())));
--        Iterator<String> it = tableNames.iterator();
--        while (it.hasNext()) {
--            String s = it.next();
--            String suffix = s.replace(logicTableName + "_", "");
--            if (filter.filter(sourceName, logicTableName, suffix)) {
--                logger.info(String.format("filter add %s", s));
--                CycleSplit split = new CycleSplit();
--                split.setLogicTableName(logicTableName);
--                split.setSuffix(suffix);
--                split.setCyclePeriod(cycle.getCycleDateStr());
--                String splitId = split.getQueueId();
--                if (initReaderMap.get(splitId) == null) {
--                    initReaderMap.put(splitId, false);
--                    splits.add(split);
--                    size.incrementAndGet();
--                }
--            } else {
--                logger.info(String.format("filter remove %s", s));
--                it.remove();
--            }
--        }
--
--        this.tableNames = tableNames;
--        return splits;
--    }
--
--    public Map<String, Boolean> getInitReaderMap() {
--        return initReaderMap;
--    }
--
--    public void setInitReaderMap(Map<String, Boolean> initReaderMap) {
--        this.initReaderMap = initReaderMap;
--    }
--
--    @Override
--    public void finish() {
--        super.finish();
--        for (Map.Entry<String, Boolean> entry : initReaderMap.entrySet()) {
--            String key = entry.getKey();
--            Boolean value = entry.getValue();
--            if (value == false) {
--                logger.error(String.format("split[%s] reader is not finish, exit with error. ", key));
--            }
--        }
--        this.initReaderMap.clear();
--        this.initReaderMap = null;
--        splits.clear();
--        splits = null;
--    }
--
--    @Override
--    public boolean isFinished() {
--        List<ReaderStatus> readerStatuses = ReaderStatus.queryReaderStatusListBySourceName(createKey(this));
--        if (readerStatuses == null) {
--            return false;
--        }
--        return readerStatuses.size() == size.get();
--    }
--
--    @Override
--    protected ISplitReader createSplitReader(ISplit iSplit) {
--        return super.createSplitReader(iSplit);
--    }
--
--    private void sendChangeTableNameMessage() {
--        logger.info(String.format("start send change table name message."));
--        ChangeTableNameMessage changeTableNameMessage = new ChangeTableNameMessage();
--        changeTableNameMessage.setScheduleCycle(cycle.getCycleDateStr());
--        Message message = createMessage(new JSONObject(), null, null, false);
--        message.setSystemMessage(changeTableNameMessage);
--        message.getHeader().setSystemMessage(true);
--        executeMessage(message);
--        logger.info(String.format("finish send change table name message."));
--    }
--
--    @Override
--    public synchronized void boundedFinishedCallBack(ISplit iSplit) {
--        this.initReaderMap.put(iSplit.getQueueId(), true);
--        logger.info(String.format("current map is %s, key is %s. ", initReaderMap, iSplit.getQueueId()));
--        if (statusCheckerStart.compareAndSet(false, true)) {
--            Thread thread = new Thread(new Runnable() {
--                @Override
--                public void run() {
--                    while (!isFinished()) {
--                        ThreadUtil.sleep(3 * 1000);
--                    }
--                    logger.info(String.format("source will be closed."));
--                    sendChangeTableNameMessage(); //下发修改name的消息
--                    ThreadUtil.sleep(1 * 1000);
--                    finish();
--                }
--
--            });
--            thread.setName(createKey(this) + "_callback");
--            thread.start();
--        }
--    }
--
--    public CycleSchedule.Cycle getCycle() {
--        return cycle;
--    }
--
--    public void setCycle(CycleSchedule.Cycle cycle) {
--        this.cycle = cycle;
--    }
--
--    @Override
--    public String createCheckPointName() {
--        return super.createCheckPointName();
--    }
--
--    public synchronized int getTotalReader() {
--        return size.get();
--    }
--
--    public static String createKey(ISource iSource) {
--        AbstractSource source = (AbstractSource) iSource;
--        CycleSchedule.Cycle cycle = ((CycleDynamicMultipleDBScanSource) iSource).getCycle();
--        return MapKeyUtil.createKey(source.getNameSpace(), source.getGroupName(), source.getConfigureName(), source.getTopic(), cycle.getCycleDateStr());
--    }
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java
index ea2a118b,ea2a118b..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java
+++ /dev/null
@@@ -1,190 -1,190 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.source;
--
--import java.io.Serializable;
--import java.util.ArrayList;
--import java.util.Arrays;
--import java.util.List;
--import java.util.concurrent.atomic.AtomicBoolean;
--import org.apache.commons.logging.Log;
--import org.apache.commons.logging.LogFactory;
--import org.apache.rocketmq.streams.common.channel.split.ISplit;
--import org.apache.rocketmq.streams.common.metadata.MetaDataUtils;
--import org.apache.rocketmq.streams.connectors.reader.DBScanReader;
--import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
--import org.apache.rocketmq.streams.connectors.source.filter.DataFormatPatternFilter;
--import org.apache.rocketmq.streams.connectors.source.filter.PatternFilter;
--import org.apache.rocketmq.streams.db.DynamicMultipleDBSplit;
--
--/**
-- * @description DynamicMultipleDBScanSource
-- */
--public class DynamicMultipleDBScanSource extends AbstractPullSource implements Serializable {
--
--    private static final long serialVersionUID = 3987103552547019739L;
--    private static final Log logger = LogFactory.getLog(DynamicMultipleDBScanSource.class);
--    public static final int DEFAULT_BATCH_SIZE = 50;
--    public static final int MAX_BATCH_SIZE = 100;
--
--    String url;
--    String userName;
--    String password;
--    String logicTableName;
--    String suffix;
--    int batchSize;
--    List<String> tableNames;
--    List<ISplit> splits;
--    transient volatile AtomicBoolean statusCheckerStart = new AtomicBoolean(false);
--
--    //todo
--    transient PatternFilter filter;
--
--    public DynamicMultipleDBScanSource() {
--        splits = new ArrayList<>();
--    }
--
--    @Override
--    protected boolean initConfigurable() {
--        setTopic(logicTableName);
--        return super.initConfigurable();
--    }
--
--    @Override
--    protected boolean isNotDataSplit(String queueId) {
--        return tableNames.contains(queueId);
--    }
--
--    @Override
--    protected ISplitReader createSplitReader(ISplit split) {
--
--        DBScanReader reader = new DBScanReader();
--        reader.setISplit(split);
--        reader.setUrl(url);
--        reader.setUserName(userName);
--        reader.setPassword(password);
--        reader.setTableName(String.valueOf(split.getQueue()));
--        int local = batchSize <= 0 ? DEFAULT_BATCH_SIZE : batchSize;
--        local = local > MAX_BATCH_SIZE ? MAX_BATCH_SIZE : local;
--        reader.setBatchSize(local);
--        reader.setISource(this);
--        logger.info(String.format("create reader for split %s", split.getQueueId()));
--        return reader;
--    }
--
--    @Override
--    public List<ISplit> fetchAllSplits() {
--
--        if (filter == null) {
--            filter = new DataFormatPatternFilter();
--        }
--
--//        String sourceName = createKey(this);
--
--        tableNames = MetaDataUtils.listTableNameByPattern(url, userName, password, logicTableName + "%");
--
--        logger.info(String.format("load all logic table : %s", Arrays.toString(tableNames.toArray())));
--
--        for (String s : tableNames) {
--            String suffix = s.replace(logicTableName + "_", "");
--            if (filter.filter(null, logicTableName, suffix)) {
--                logger.info(String.format("filter add %s", s));
--                DynamicMultipleDBSplit split = new DynamicMultipleDBSplit();
--                split.setLogicTableName(logicTableName);
--                split.setSuffix(suffix);
--                splits.add(split);
--            } else {
--                logger.info(String.format("filter remove %s", s));
--            }
--
--        }
--        return splits;
--    }
--
--    public String getUrl() {
--        return url;
--    }
--
--    public void setUrl(String url) {
--        this.url = url;
--    }
--
--    public String getUserName() {
--        return userName;
--    }
--
--    public void setUserName(String userName) {
--        this.userName = userName;
--    }
--
--    public String getPassword() {
--        return password;
--    }
--
--    public void setPassword(String password) {
--        this.password = password;
--    }
--
--    public String getLogicTableName() {
--        return logicTableName;
--    }
--
--    public void setLogicTableName(String logicTableName) {
--        this.logicTableName = logicTableName;
--    }
--
--    public String getSuffix() {
--        return suffix;
--    }
--
--    public void setSuffix(String suffix) {
--        this.suffix = suffix;
--    }
--
--    public int getBatchSize() {
--        return batchSize;
--    }
--
--    public void setBatchSize(int batchSize) {
--        this.batchSize = batchSize;
--    }
--
--    public List<String> getTableNames() {
--        return tableNames;
--    }
--
--    public void setTableNames(List<String> tableNames) {
--        this.tableNames = tableNames;
--    }
--
--    public List<ISplit> getSplits() {
--        return splits;
--    }
--
--    public void setSplits(List<ISplit> splits) {
--        this.splits = splits;
--    }
--
--    public PatternFilter getFilter() {
--        return filter;
--    }
--
--    public void setFilter(PatternFilter filter) {
--        this.filter = filter;
--    }
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java
index 6733911d,6733911d..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java
+++ /dev/null
@@@ -1,60 -1,60 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.source;
--
--import java.util.Collection;
--import java.util.List;
--import java.util.Map;
--import org.apache.rocketmq.streams.common.channel.source.ISource;
--import org.apache.rocketmq.streams.common.channel.split.ISplit;
--
--/**
-- * poll message,need balance
-- */
--public interface IPullSource<T extends ISource> extends ISource<T> {
--
--    /**
--     * 拥有的分片格式
--     *
--     * @return
--     */
--    Collection<ISplit> ownerSplits();
--
--    /**
--     * get all split for the source
--     *
--     * @return
--     */
--    List<ISplit> fetchAllSplits();
--
--    /**
--     * get all split for the source
--     *
--     * @return
--     */
--    Map<String, ISplit> getAllSplitMap();
--
--    Long getPullIntervalMs();
--
--    /**
--     * get cusor from store
--     *
--     * @return
--     */
--    String loadSplitOffset(ISplit split);
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java
index c06de98d,c06de98d..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java
+++ /dev/null
@@@ -1,53 -1,53 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.source.filter;
--
--import java.io.Serializable;
--import org.apache.commons.logging.Log;
--import org.apache.commons.logging.LogFactory;
--import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
--
--/**
-- * @description 过滤掉已经完成的reader
-- */
--@Deprecated
--public class BoundedPatternFilter extends AbstractPatternFilter implements Serializable {
--
--    static final Log logger = LogFactory.getLog(BoundedPatternFilter.class);
--
--    @Override
--    public boolean filter(String sourceName, String logicTableName, String tableName) {
--
--        ReaderStatus readerStatus = ReaderStatus.queryReaderStatusByUK(sourceName, logicTableName + "_" + tableName);
--        if (readerStatus != null) {
--            logger.info(String.format("filter sourceName %s, logicTableName %s, suffix %s. ", sourceName, logicTableName, tableName));
--            logger.info(String.format("query result %s", readerStatus.toString()));
--            return true;
--        }
--        if (next == null) {
--            return false;
--        }
--        return next.filter(sourceName, logicTableName, tableName);
--    }
--
--    @Override
--    public PatternFilter setNext(PatternFilter filter) {
--        super.setNext(filter);
--        return this;
--    }
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePatternFilter.java
index 3a0193f3,3a0193f3..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePatternFilter.java
+++ /dev/null
@@@ -1,173 -1,173 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.source.filter;
--
--import java.io.Serializable;
--import java.text.ParseException;
--import java.text.SimpleDateFormat;
--import java.util.ArrayList;
--import java.util.Date;
--import java.util.List;
--
--/**
-- * @description 用来做分区选取
-- */
--public class CyclePatternFilter extends AbstractPatternFilter implements Serializable {
--
--    private static final long serialVersionUID = -5151597286296228754L;
--
--    public static final int INIT_CYCLE_VERSION = 0;
--
--    CyclePeriod cyclePeriod;
--
--    Date curCycleDateTime; //当前调度周期时间
--
--    long cycleId;
--
--    String firstStartTime; //当前最小时间
--
--    List<String> allPatterns;
--
--    String expression;
--
--    boolean isInit;
--
--    //历史数据读取时使用,表示比起当前相差多少个调度周期
--    final long cycleDiff;
--
--    //todo expr解析
--    public CyclePatternFilter(String expr, Date date) throws ParseException {
--        expression = expr;
--        cycleId = INIT_CYCLE_VERSION;
--        cyclePeriod = CyclePeriod.getInstance(expression);
--        curCycleDateTime = calCycleDateTime(date);
--        allPatterns = new ArrayList<>();
--        isInit = true;
--        if(cyclePeriod.isHistory){
--            Date tmp = cyclePeriod.getHisDate();
--            cycleDiff = curCycleDateTime.getTime()/1000 * 1000 - tmp.getTime()/1000*1000;
--        }else{
--            cycleDiff = 0;
--        }
--    }
--
--
--    /**
--     *
--     * @return 返回date格式的调度周期时间
--     */
--    private Date calCycleDateTime(Date date){
--        return cyclePeriod.format(date);
--    }
--
--    private long calCycle(Date date){
--        Date tmp = calCycleDateTime(date);
--        if(tmp.getTime()/1000 == curCycleDateTime.getTime()/1000){
--            return cycleId;
--        }
--        return nextCycle(tmp);
--    }
--
--    private long nextCycle(Date date){
--        curCycleDateTime = date;
--        cycleId++;
--        calAllPattern();
--        return cycleId;
--    }
--
--    private void calAllPattern(){
--        allPatterns.clear();
--        for(int i = 1; i <= cyclePeriod.getCycle(); i++){
--            long d = (curCycleDateTime.getTime()/1000)*1000 - i * cyclePeriod.getInterval() - cycleDiff;
--            String s = cyclePeriod.getDateFormat().format(new Date(d));
--            allPatterns.add(s);
--        }
--        firstStartTime = allPatterns.get(allPatterns.size() - 1);
--    }
--
--    public boolean isNextCycle(Date date){
--        if(isInit){
--            isInit = false;
--            calAllPattern();
--            return true;
--        }
--        long tmp = cycleId;
--        return calCycle(date) > tmp;
--    }
--
--    public List<String> getAllPatterns() {
--        return allPatterns;
--    }
--
--    public long getCycleId() {
--        return cycleId;
--    }
--
--    public Date getCurCycleDateTime(){
--        return curCycleDateTime;
--    }
--
--    public String getCurCycleDateTimeStr(){
--        return cyclePeriod.getDateFormat().format(curCycleDateTime);
--    }
--
--    public long getCycleDiff() {
--        return cycleDiff;
--    }
--
--    public long getCyclePeriodDiff(){
--        return cycleDiff/cyclePeriod.getInterval();
--    }
--
--    public int getCycle(){
--        return cyclePeriod.getCycle();
--    }
--
--    public String getFirstStartTime() {
--        return firstStartTime;
--    }
--
--    @Override
--    public boolean filter(String sourceName, String logicTableName, String tableName) {
--        return allPatterns.contains(tableName);
--    }
--
--
--
--    public static void main(String[] args) throws ParseException {
--
--        CyclePatternFilter cycle = new CyclePatternFilter("yyyyMMddHHmm - 15m", new Date());
--        System.out.println(cycle);
--
--        System.out.println(cycle.filter(null, null, "202109131650"));
--        System.out.println(cycle.filter(null, null, "20210902000000"));
--        System.out.println(cycle.filter(null, null, "20210908000000"));
--        System.out.println(cycle.filter(null, null, "20210910000000"));
--        System.out.println(cycle.filter(null, null, "20210909230000"));
--
--        System.out.println(new SimpleDateFormat("yyyyMMddHH").parse("2021090923"));
--        System.out.println(new SimpleDateFormat("yyyyMMddhhmmss").parse("20210909230000"));
--        System.out.println(new SimpleDateFormat("yyyyMMddHHmmss").parse("20210909100000"));
--        System.out.println(new SimpleDateFormat("yyyyMMddhhmmss").parse("20210909100000"));
--
--
--
--
--    }
--
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java
index 4e6cdd6a,4e6cdd6a..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java
+++ /dev/null
@@@ -1,222 -1,222 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.source.filter;
--
--import java.text.ParseException;
--import java.text.SimpleDateFormat;
--import java.util.Date;
--import org.apache.commons.logging.Log;
--import org.apache.commons.logging.LogFactory;
--
--/**
-- * @Description
-- */
--public enum CyclePeriod {
--
--    CYCLE_PERIOD_DATE() {
--        @Override
--        void argsParser(String expr) throws ParseException {
--            super.argsParser(expr);
--            interval = 24 * 3600 * 1000;
--            int length = expr.length();
--            if (length == 8 && checkFormat(expr, PatternFilter.yyyyMMdd)) {
--                format = PatternFilter.yyyyMMdd;
--            } else if (length == 14 && checkFormat(expr, PatternFilter.yyyyMMddHHmmss)) {
--                format = PatternFilter.yyyyMMddHHmmss;
--            } else {
--                throw new RuntimeException(String.format("unsupported format : %s, only support yyyymmdd 、 yyyymmddhhmmss.", expr));
--            }
--        }
--
--        @Override
--        Date format(Date strDate) {
--            Date date = new Date(strDate.getTime());
--            date.setHours(0);
--            date.setMinutes(0);
--            date.setSeconds(0);
--            return date;
--        }
--
--    },
--    CYCLE_PERIOD_HOUR() {
--        @Override
--        void argsParser(String expr) throws ParseException {
--            super.argsParser(expr);
--            interval = 3600 * 1000;
--
--            int length = expr.length();
--            if (length == 10 && checkFormat(expr, PatternFilter.yyyyMMddHH)) {
--                format = PatternFilter.yyyyMMddHH;
--            } else if (length == 14 && checkFormat(expr, PatternFilter.yyyyMMddHHmmss)) {
--                format = PatternFilter.yyyyMMddHHmmss;
--            } else {
--                throw new RuntimeException(String.format("unsupported format : %s, only support yyyymmdd 、 yyyymmddhhmmss.", expr));
--            }
--        }
--
--        @Override
--        Date format(Date strDate) {
--            Date date = new Date(strDate.getTime());
--            date.setMinutes(0);
--            date.setSeconds(0);
--            return date;
--        }
--
--    },
--    CYCLE_PERIOD_MINUTE() {
--        @Override
--        void argsParser(String expr) throws ParseException {
--            super.argsParser(expr);
--            interval = 60 * 1000;
--            int length = expr.length();
--            if (length == 12 && checkFormat(expr, PatternFilter.yyyyMMddHHmm)) {
--                format = PatternFilter.yyyyMMddHHmm;
--            } else if (length == 14 && checkFormat(expr, PatternFilter.yyyyMMddHHmmss)) {
--                format = PatternFilter.yyyyMMddHHmmss;
--            } else {
--                throw new RuntimeException(String.format("unsupported format : %s, only support yyyymmdd 、 yyyymmddhhmmss.", expr));
--            }
--        }
--
--        @Override
--        Date format(Date strDate) {
--            Date date = new Date(strDate.getTime());
--            date.setSeconds(0);
--            return date;
--        }
--
--    };
--
--    boolean isHistory = false;
--
--    long interval;
--
--    int cycle;
--
--    String format;
--
--    String hisDateString;
--
--    static final Log logger = LogFactory.getLog(CyclePeriod.class);
--
--    void argsParser(String expr) throws ParseException {
--        if (expr.matches("^\\d+$")) {
--            isHistory = true;
--            hisDateString = expr;
--        }
--    }
--
--    Date format(Date strDate) {
--        throw new RuntimeException(String.format("unsupported type.", strDate));
--    }
--
--    /**
--     * expr可能是yyyymmdd 或者 20210917
--     *
--     * @param expr
--     * @param format
--     * @return
--     */
--    final boolean checkFormat(String expr, String format) {
--
--        if (!isHistory) {
--            return expr.equalsIgnoreCase(format);
--        }
--
--        try {
--            new SimpleDateFormat(format).parse(expr);
--            return true;
--        } catch (ParseException e) {
--            logger.error(String.format("error format, expr is %s, format is %s.", expr, format));
--            e.printStackTrace();
--            return false;
--        }
--    }
--
--    public Date getHisDate() throws ParseException {
--        return getDateFormat().parse(hisDateString);
--    }
--
--    public SimpleDateFormat getDateFormat() {
--        return new SimpleDateFormat(format);
--    }
--
--    public long getInterval() {
--        return interval;
--    }
--
--    public boolean isHistory() {
--        return isHistory;
--    }
--
--    public void setHistory(boolean history) {
--        isHistory = history;
--    }
--
--    public void setInterval(long interval) {
--        this.interval = interval;
--    }
--
--    public int getCycle() {
--        return cycle;
--    }
--
--    public void setCycle(int cycle) {
--        this.cycle = cycle;
--    }
--
--    public void setFormat(String format) {
--        this.format = format;
--    }
--
--    public String getFormat() {
--        return format;
--    }
--
--    public String getHisDateString() {
--        return hisDateString;
--    }
--
--    public void setHisDateString(String hisDateString) {
--        this.hisDateString = hisDateString;
--    }
--
--    public static CyclePeriod getInstance(String expression) throws ParseException {
--
--        String[] str = expression.split("\\-");
--        assert str.length == 2 : String.format("expression error : %s. ", expression);
--        String expr = str[0].trim();
--        String tmp = str[1].trim().toLowerCase();
--        String cycleStr = tmp.substring(0, tmp.length() - 1);
--        int cycle = Integer.parseInt(cycleStr);
--        CyclePeriod cyclePeriod = null;
--        if (tmp.endsWith("d")) {
--            cyclePeriod = CYCLE_PERIOD_DATE;
--        } else if (tmp.endsWith("h")) {
--            cyclePeriod = CYCLE_PERIOD_HOUR;
--        } else if (tmp.endsWith("m")) {
--            cyclePeriod = CYCLE_PERIOD_MINUTE;
--        } else {
--            new RuntimeException(String.format("unsupported format : %s", expression));
--        }
--        cyclePeriod.argsParser(expr);
--        cyclePeriod.cycle = cycle;
--
--        return cyclePeriod;
--    }
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java
index ba9a2797,ba9a2797..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java
+++ /dev/null
@@@ -1,236 -1,236 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.source.filter;
--
--import java.io.Serializable;
--import java.text.ParseException;
--import java.util.ArrayList;
--import java.util.Arrays;
--import java.util.Date;
--import java.util.List;
--import java.util.concurrent.atomic.AtomicLong;
--import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
--
--/**
-- * @description 用来做分区选取
-- */
--public class CycleSchedule implements Serializable {
--
--    private static final long serialVersionUID = -5151597286296228754L;
--    public static final int INIT_CYCLE_VERSION = 0;
--    private static CycleSchedule INSTANCE;
--    CyclePeriod cyclePeriod;
--    AtomicLong cycleId = new AtomicLong(0);
--    String expression;
--    boolean isInit;
--    //历史数据读取时使用,表示比起当前相差多少个调度周期
--    final long cycleDiff;
--
--    public CycleSchedule(String expr, Date date) throws ParseException {
--        Date local = subMs(date);
--        expression = expr;
--        cycleId.set(INIT_CYCLE_VERSION);
--        cyclePeriod = CyclePeriod.getInstance(expression);
--        isInit = true;
--        if (cyclePeriod.isHistory) {
--            Date curCycleDateTime = calCycleDateTime(local);
--            Date tmp = subMs(cyclePeriod.getHisDate());
--            cycleDiff = curCycleDateTime.getTime() - tmp.getTime();
--        } else {
--            cycleDiff = 0;
--        }
--    }
--
--    /**
--     * 去掉毫秒时间戳
--     *
--     * @param date
--     * @return
--     */
--    private Date subMs(Date date) {
--        long time = date.getTime() / 1000 * 1000;
--        return new Date(time);
--    }
--
--    /**
--     * @return 返回date格式的调度周期时间
--     */
--    private Date calCycleDateTime(Date date) {
--        return cyclePeriod.format(date);
--    }
--
--    public Cycle nextCycle(Date date) {
--        Date local = subMs(date);
--        local = cyclePeriod.format(local);
--        if (isInit) {
--            isInit = false;
--        } else {
--            cycleId.incrementAndGet();
--        }
--        List<String> ret = calAllPattern(local);
--        Cycle cycle = new Cycle();
--        cycle.setCycleId(cycleId.get());
--        cycle.setAllPattern(ret);
--        cycle.setCycleDateStr(calCycleDateStr(local));
--        cycle.setCycleCount(cyclePeriod.getCycle());
--        cycle.setCurDateStr(cyclePeriod.getDateFormat().format(local));
--        cycle.setCycleDiff(cycleDiff);
--        return cycle;
--    }
--
--    private String calCycleDateStr(Date date) {
--        long d = date.getTime() - cycleDiff;
--        Date d1 = new Date(d);
--        return cyclePeriod.getDateFormat().format(d1);
--    }
--
--    private List<String> calAllPattern(Date date) {
--        List<String> allPatterns = new ArrayList<>();
--        for (int i = 1; i <= cyclePeriod.getCycle(); i++) {
--            long d = date.getTime() - i * cyclePeriod.getInterval() - cycleDiff;
--            String s = cyclePeriod.getDateFormat().format(new Date(d));
--            allPatterns.add(s);
--        }
--        return allPatterns;
--    }
--
--    public CyclePeriod getCyclePeriod() {
--        return cyclePeriod;
--    }
--
--    public void setCyclePeriod(CyclePeriod cyclePeriod) {
--        this.cyclePeriod = cyclePeriod;
--    }
--
--    public AtomicLong getCycleId() {
--        return cycleId;
--    }
--
--    public void setCycleId(AtomicLong cycleId) {
--        this.cycleId = cycleId;
--    }
--
--    public String getExpression() {
--        return expression;
--    }
--
--    public void setExpression(String expression) {
--        this.expression = expression;
--    }
--
--    public boolean isInit() {
--        return isInit;
--    }
--
--    public void setInit(boolean init) {
--        isInit = init;
--    }
--
--    public long getCycleDiff() {
--        return cycleDiff;
--    }
--
--    public static CycleSchedule getInstance(String expr, Date date) {
--        if (INSTANCE == null) {
--            synchronized (CycleSchedule.class) {
--                if (INSTANCE == null) {
--                    try {
--                        INSTANCE = new CycleSchedule(expr, date);
--                    } catch (ParseException e) {
--                        e.printStackTrace();
--                    }
--                }
--            }
--        }
--        return INSTANCE;
--    }
--
--    public static class Cycle extends BasedConfigurable implements Serializable {
--
--        private static final long serialVersionUID = 4842560538716388622L;
--        Long cycleId;
--        List<String> allPattern;
--        String cycleDateStr;
--        Integer cycleCount;
--        String curDateStr;
--        long cycleDiff;
--
--        public Integer getCycleCount() {
--            return cycleCount;
--        }
--
--        public void setCycleCount(Integer cycleCount) {
--            this.cycleCount = cycleCount;
--        }
--
--        public Cycle() {
--        }
--
--        public Long getCycleId() {
--            return cycleId;
--        }
--
--        public void setCycleId(Long cycleId) {
--            this.cycleId = cycleId;
--        }
--
--        public List<String> getAllPattern() {
--            return allPattern;
--        }
--
--        public void setAllPattern(List<String> allPattern) {
--            this.allPattern = allPattern;
--        }
--
--        public String getCycleDateStr() {
--            return cycleDateStr;
--        }
--
--        public void setCycleDateStr(String cycleDateStr) {
--            this.cycleDateStr = cycleDateStr;
--        }
--
--        public String getCurDateStr() {
--            return curDateStr;
--        }
--
--        public void setCurDateStr(String curDateStr) {
--            this.curDateStr = curDateStr;
--        }
--
--        public long getCycleDiff() {
--            return cycleDiff;
--        }
--
--        public void setCycleDiff(long cycleDiff) {
--            this.cycleDiff = cycleDiff;
--        }
--
--        @Override
--        public String toString() {
--            return "Cycle{" +
--                "cycleId=" + cycleId +
--                ", cycleDateStr='" + cycleDateStr + '\'' +
--                ", cycleCount=" + cycleCount +
--                ", curDateStr='" + curDateStr + '\'' +
--                ", cycleDiff=" + cycleDiff +
--                ", allPattern=" + Arrays.toString(allPattern.toArray()) +
--                '}';
--        }
--    }
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java
index 0cdc0762,0cdc0762..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java
+++ /dev/null
@@@ -1,106 -1,106 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.source.filter;
--
--import java.io.Serializable;
--import java.text.ParseException;
--import java.text.SimpleDateFormat;
--import org.apache.commons.logging.Log;
--import org.apache.commons.logging.LogFactory;
--
--/**
-- * @description
-- */
--public class DataFormatPatternFilter extends AbstractPatternFilter implements Serializable {
--
--    private static final long serialVersionUID = 3604787588465242642L;
--
--    static final Log logger = LogFactory.getLog(DataFormatPatternFilter.class);
--
--    static final String yyyyMMddHHmmss = "yyyyMMddHHmmss";
--    static final String yyyyMMdd = "yyyyMMdd";
--    static final String yyyyMMddHH = "yyyyMMddHH";
--
--    SimpleDateFormat format1 = new SimpleDateFormat(yyyyMMdd);
--    SimpleDateFormat format2 = new SimpleDateFormat(yyyyMMddHH);
--    SimpleDateFormat format3 = new SimpleDateFormat(yyyyMMddHHmmss);
--
--    @Override
--    public boolean filter(String sourceName, String logicTableName, String tableNameSuffix) {
--
--        int len = tableNameSuffix.length();
--        boolean isFilter = false;
--
--        switch (len) {
--            case 8:
--                try {
--                    format1.parse(tableNameSuffix);
--                    isFilter = true;
--                } catch (ParseException e) {
--                    e.printStackTrace();
--                    isFilter = false;
--                }
--                break;
--            case 10:
--                try {
--                    format2.parse(tableNameSuffix);
--                    isFilter = true;
--                } catch (ParseException e) {
--                    e.printStackTrace();
--                    isFilter = false;
--                }
--                break;
--            case 14:
--                try {
--                    format3.parse(tableNameSuffix);
--                    isFilter = true;
--                } catch (ParseException e) {
--                    e.printStackTrace();
--                    isFilter = false;
--                }
--                break;
--        }
--
--        if (isFilter) {
--            logger.info(String.format("filter sourceName %s, logicTableName %s, suffix %s", sourceName, logicTableName, tableNameSuffix));
--            return true;
--        }
--        if (next != null) {
--            return next.filter(sourceName, logicTableName, tableNameSuffix);
--        }
--        return false;
--    }
--
--    @Override
--    public PatternFilter setNext(PatternFilter filter) {
--        super.setNext(filter);
--        return this;
--    }
--
--    public PatternFilter getNext() {
--        return next;
--    }
--
--    public static void main(String[] args) {
--        DataFormatPatternFilter filter = new DataFormatPatternFilter();
--//        System.out.println(filter.filter("20200101"));
--//        System.out.println(filter.filter("2020010101"));
--//        System.out.println(filter.filter("20200101010101"));
--
--    }
--
--}
diff --cc rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/PatternFilter.java
index 42365007,42365007..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/PatternFilter.java
+++ /dev/null
@@@ -1,41 -1,41 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.rocketmq.streams.connectors.source.filter;
--
--/**
-- * @description
-- */
--public interface PatternFilter {
--
--    String yyyyMMddHHmmss = "yyyyMMddHHmmss";
--    String yyyyMMdd = "yyyyMMdd";
--    String yyyyMMddHH = "yyyyMMddHH";
--    String yyyyMMddHHmm = "yyyyMMddHHmm";
--
--
--    /**
--     * 根据sourceName和tableName判断是否符合
--     * @param sourceName
--     * @param tableName
--     * @return
--     */
--    boolean filter(String sourceName, String logicTableName, String tableName);
--
--    PatternFilter setNext(PatternFilter filter);
--
--
--}
diff --cc rocketmq-streams-runner/assembly/distribution.xml
index f44e34fe,f44e34fe..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-runner/assembly/distribution.xml
+++ /dev/null
@@@ -1,69 -1,69 +1,0 @@@
--<!--
--Licensed to the Apache Software Foundation (ASF) under one or more
--contributor license agreements.  See the NOTICE file distributed with
--this work for additional information regarding copyright ownership.
--The ASF licenses this file to You under the Apache License, Version 2.0
--(the "License"); you may not use this file except in compliance with
--the License.  You may obtain a copy of the License at
--
--  http://www.apache.org/licenses/LICENSE-2.0
--
--Unless required by applicable law or agreed to in writing, software
--distributed under the License is distributed on an "AS IS" BASIS,
--WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--See the License for the specific language governing permissions and
--limitations under the License.
---->
--<assembly>
--    <id>distribution</id>
--    <formats>
--        <format>tar.gz</format>
--    </formats>
--    <fileSets>
--        <fileSet>
--            <fileMode>0775</fileMode>
--            <directory>target/rocketmq-streams-${project.version}-standalone/bin</directory>
--            <outputDirectory>bin</outputDirectory>
--        </fileSet>
--        <fileSet>
--            <directory>target/rocketmq-streams-${project.version}-standalone/conf</directory>
--            <outputDirectory>conf</outputDirectory>
--        </fileSet>
--        <fileSet>
--            <directory>target/rocketmq-streams-${project.version}-standalone/jobs</directory>
--            <outputDirectory>jobs</outputDirectory>
--        </fileSet>
--        <fileSet>
--            <fileMode>0775</fileMode>
--            <directory>target/rocketmq-streams-${project.version}-standalone/lib</directory>
--            <outputDirectory>lib</outputDirectory>
--        </fileSet>
--        <fileSet>
--            <fileMode>0775</fileMode>
--            <directory>target/rocketmq-streams-${project.version}-standalone/log</directory>
--            <outputDirectory>log</outputDirectory>
--        </fileSet>
--    </fileSets>
--    <files>
--        <file>
--            <source>target/rocketmq-streams-${project.version}-standalone/LICENSE</source>
--            <outputDirectory/>
--        </file>
--        <file>
--            <source>target/rocketmq-streams-${project.version}-standalone/NOTICE</source>
--            <outputDirectory/>
--        </file>
--        <file>
--            <source>target/rocketmq-streams-${project.version}-standalone/quick_start.md</source>
--            <outputDirectory/>
--        </file>
--        <file>
--            <source>target/rocketmq-streams-${project.version}-standalone/README.md</source>
--            <outputDirectory/>
--        </file>
--        <file>
--            <source>target/rocketmq-streams-${project.version}-standalone/README-chinese.md</source>
--            <outputDirectory/>
--        </file>
--    </files>
--</assembly>
diff --cc rocketmq-streams-runner/assembly/standalone.xml
index 69075a35,69075a35..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-runner/assembly/standalone.xml
+++ /dev/null
@@@ -1,72 -1,72 +1,0 @@@
--<!--
--Licensed to the Apache Software Foundation (ASF) under one or more
--contributor license agreements.  See the NOTICE file distributed with
--this work for additional information regarding copyright ownership.
--The ASF licenses this file to You under the Apache License, Version 2.0
--(the "License"); you may not use this file except in compliance with
--the License.  You may obtain a copy of the License at
--
--  http://www.apache.org/licenses/LICENSE-2.0
--
--Unless required by applicable law or agreed to in writing, software
--distributed under the License is distributed on an "AS IS" BASIS,
--WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--See the License for the specific language governing permissions and
--limitations under the License.
---->
--<assembly>
--    <id>standalone</id>
--    <formats>
--        <format>dir</format>
--    </formats>
--    <includeBaseDirectory>false</includeBaseDirectory>
--
--    <fileSets>
--        <fileSet>
--            <directory>bin</directory>
--            <outputDirectory>/bin</outputDirectory>
--            <fileMode>0755</fileMode>
--        </fileSet>
--        <fileSet>
--            <directory>src/main/resources</directory>
--            <outputDirectory>/conf</outputDirectory>
--            <fileMode>0755</fileMode>
--        </fileSet>
--        <fileSet>
--            <directory>conf</directory>
--            <outputDirectory>/conf</outputDirectory>
--            <fileMode>0755</fileMode>
--        </fileSet>
--        <fileSet>
--            <directory>jobs</directory>
--            <outputDirectory>/jobs</outputDirectory>
--            <fileMode>0755</fileMode>
--        </fileSet>
--        <fileSet>
--            <directory>log</directory>
--            <outputDirectory>/log</outputDirectory>
--            <fileMode>0755</fileMode>
--        </fileSet>
--        <fileSet>
--            <directory>../</directory>
--            <outputDirectory>/</outputDirectory>
--            <includes>
--                <include>LICENSE</include>
--                <include>NOTICE</include>
--                <include>quick_start.md</include>
--                <include>README.md</include>
--                <include>README-chinese.md</include>
--            </includes>
--        </fileSet>
--    </fileSets>
--
--    <dependencySets>
--        <dependencySet>
--            <useProjectArtifact>true</useProjectArtifact>
--            <outputDirectory>lib</outputDirectory>
--            <!-- 只包含runtime作用域的依赖 -->
--            <scope>runtime</scope>
--        </dependencySet>
--    </dependencySets>
--
--</assembly>
diff --cc rocketmq-streams-runner/bin/start.sh
index 44797c45,44797c45..00000000
deleted file mode 100755,100755
--- a/rocketmq-streams-runner/bin/start.sh
+++ /dev/null
@@@ -1,58 -1,58 +1,0 @@@
--#!/bin/sh
--set -e
--
--PROG_NAME=$0
--MAIN_CLASS=$1
--
--if [ -z "${MAIN_CLASS}" ]; then
--  usage
--fi
--
--usage() {
--    echo "Usage: $PROG_NAME {mainClass or mainClasses splited with comma}"
--    exit 2 # bad usage
--}
--
--
--JVM_CONFIG=$2
--if [ -z "${JVM_CONFIG}" ]; then
--  JVM_CONFIG="-Xms2048m -Xmx2048m -Xss512k"
--fi
--
--ROCKETMQ_STREAMS_HOME=$(cd $(dirname ${BASH_SOURCE[0]})/..; pwd)
--ROCKETMQ_STREAMS_JOBS_DIR=$ROCKETMQ_STREAMS_HOME/jobs
--ROCKETMQ_STREAMS_DEPENDENCIES=$ROCKETMQ_STREAMS_HOME/lib
--ROCKETMQ_STREAMS_LOGS=$ROCKETMQ_STREAMS_HOME/log/catalina.out
--
--if [ -z "${JAVA_HOME:-}" ]; then
--  JAVA="java -server"
--else
--  JAVA="$JAVA_HOME/bin/java -server"
--fi
--
--JAVA_OPTIONS=${JAVA_OPTIONS:-}
--
--JVM_OPTS=()
--if [ ! -z "${JAVA_OPTIONS}" ]; then
--  JVM_OPTS+=("${JAVA_OPTIONS}")
--fi
--if [ ! -z "${JVM_CONFIG}" ]; then
--  JVM_OPTS+=("${JVM_CONFIG}")
--fi
--
--JVM_OPTS+=("-Dlogback.configurationFile=conf/logback.xml")
--
--
--
--# shellcheck disable=SC2039
--# shellcheck disable=SC2206
--array=(${MAIN_CLASS//,/ })
--
--# shellcheck disable=SC2068
--# shellcheck disable=SC2039
--for var in ${array[@]}
--do
--   # shellcheck disable=SC2068
--   # shellcheck disable=SC2039
--   eval exec $JAVA ${JVM_OPTS[@]} -classpath "$ROCKETMQ_STREAMS_JOBS_DIR/*:$ROCKETMQ_STREAMS_DEPENDENCIES/*" $var "&" >>"$ROCKETMQ_STREAMS_LOGS" 2>&1
--done
diff --cc rocketmq-streams-runner/bin/stop.sh
index 5734c926,5734c926..00000000
deleted file mode 100755,100755
--- a/rocketmq-streams-runner/bin/stop.sh
+++ /dev/null
@@@ -1,33 -1,33 +1,0 @@@
--#!/bin/sh
--set -e
--PROG_NAME=$0
--MAIN_CLASS=$1
--
--if [ -z "${MAIN_CLASS}" ]; then
--  usage
--fi
--
--# shellcheck disable=SC2039
--# shellcheck disable=SC2206
--array=(${MAIN_CLASS//,/ })
--
--# shellcheck disable=SC2068
--# shellcheck disable=SC2039
--for var in ${array[@]}
--do
--  STREAM_JOB_PIC="$(ps -ef | grep "$var" | grep -v grep | grep -v "$PROG_NAME" | awk '{print $2}' | sed 's/addr://g')"
--  if [ ! -z "$STREAM_JOB_PIC" ]; then
--    echo $STREAM_JOB_PIC
--    echo "Stop rocketmq-streams job"
--    echo "kill -9 $STREAM_JOB_PIC"
--    kill -9 $STREAM_JOB_PIC
--    echo "Job($MAIN_CLASS) shutdown completed."
--  else
--    echo "Job($MAIN_CLASS) not started."
--  fi
--done
--
--
--
--
--
diff --cc rocketmq-streams-runner/pom.xml
index 6dcd83f4,bcee09c8..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-runner/pom.xml
+++ /dev/null
@@@ -1,80 -1,66 +1,0 @@@
--<?xml version="1.0" encoding="UTF-8"?>
- <!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
- 
-       http://www.apache.org/licenses/LICENSE-2.0
- 
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-   -->
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 -<project xmlns="http://maven.apache.org/POM/4.0.0"
 -         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 -         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
--    <parent>
--        <artifactId>rocketmq-streams</artifactId>
--        <groupId>org.apache.rocketmq</groupId>
-         <version>1.0.2-preview-SNAPSHOT</version>
 -        <version>1.0.0-SNAPSHOT</version>
--    </parent>
--    <modelVersion>4.0.0</modelVersion>
--
--    <artifactId>rocketmq-streams-runner</artifactId>
--    <name>ROCKETMQ STREAMS :: runner</name>
--
--    <properties>
--        <maven.compiler.source>8</maven.compiler.source>
--        <maven.compiler.target>8</maven.compiler.target>
--    </properties>
--    <dependencies>
--        <dependency>
--            <groupId>org.apache.rocketmq</groupId>
--            <artifactId>rocketmq-streams-clients</artifactId>
--        </dependency>
--        <dependency>
--            <groupId>org.apache.rocketmq</groupId>
--            <artifactId>rocketmq-streams-examples</artifactId>
--        </dependency>
--    </dependencies>
--
--
--    <build>
--        <plugins>
--            <plugin>
--                <groupId>org.apache.maven.plugins</groupId>
--                <artifactId>maven-surefire-plugin</artifactId>
--            </plugin>
--            <plugin>
--                <groupId>org.apache.maven.plugins</groupId>
--                <artifactId>maven-failsafe-plugin</artifactId>
--            </plugin>
--            <plugin>
--                <groupId>org.apache.maven.plugins</groupId>
--                <artifactId>maven-assembly-plugin</artifactId>
--                <executions>
--                    <execution>
--                        <phase>package</phase>
--                        <goals>
--                            <goal>single</goal>
--                        </goals>
--                    </execution>
--                </executions>
--                <configuration>
--                    <attach>true</attach>
--                    <descriptors>
--                        <descriptor>assembly/standalone.xml</descriptor>
--                        <descriptor>assembly/distribution.xml</descriptor>
--                    </descriptors>
--                    <finalName>rocketmq-streams-${project.version}</finalName>
--                    <outputDirectory>target</outputDirectory>
--                    <workDirectory>target/assembly/work</workDirectory>
--                    <tarLongFileMode>warn</tarLongFileMode>
--                </configuration>
--            </plugin>
--        </plugins>
--    </build>
--</project>
diff --cc rocketmq-streams-runner/src/main/resources/log4j.xml
index de547783,de547783..00000000
deleted file mode 100644,100644
--- a/rocketmq-streams-runner/src/main/resources/log4j.xml
+++ /dev/null
@@@ -1,51 -1,51 +1,0 @@@
--<?xml version="1.0" encoding="UTF-8"?>
--<!DOCTYPE log4j:configuration PUBLIC "-//log4j/log4j Configuration//EN" "log4j.dtd">
--
--<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
--
--
--    <appender name="console" class="org.apache.log4j.ConsoleAppender">
--        <layout class="org.apache.log4j.PatternLayout">
--            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss SSS} %-5p %c{1} %m %n"/>
--        </layout>
--        <filter class="org.apache.log4j.varia.LevelRangeFilter">
--            <param name="levelMin" value="INFO"/>
--            <param name="levelMax" value="ERROR"/>
--            <param name="AcceptOnMatch" value="true"/>
--        </filter>
--    </appender>
--
--    <appender name="fileAppender" class="org.apache.log4j.DailyRollingFileAppender">
--        <param name="File" value="${log4j.home}/rocketmq-streams.log"/>
--        <param name="Append" value="true"/>
--        <param name="DatePattern" value="'.'yyyy-MM-dd'.log'"/>
--        <layout class="org.apache.log4j.PatternLayout">
--            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss SSS} %-5p %c{1} %m %n"/>
--        </layout>
--        <filter class="org.apache.log4j.varia.LevelRangeFilter">
--            <param name="levelMin" value="${log4j.level}"/>
--            <param name="levelMax" value="ERROR"/>
--            <param name="AcceptOnMatch" value="true"/>
--        </filter>
--    </appender>
--
--    <appender name="traceAppender" class="org.apache.log4j.DailyRollingFileAppender">
--        <param name="File" value="${log4j.home}/trace.log"/>
--        <param name="Append" value="true"/>
--        <param name="DatePattern" value="'.'yyyy-MM-dd'.log'"/>
--        <layout class="org.apache.log4j.PatternLayout">
--            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss SSS} %-5p %c{1} %m %n"/>
--        </layout>
--    </appender>
--
--    <!--<category name="org.apache.rocketmq.streams.common.utils.TraceUtil" additivity="false">-->
--    <!--<level value="INFO"/>-->
--    <!--<appender-ref ref="traceAppender"/>-->
--    <!--</category>-->
--
--    <root>
--        <appender-ref ref="console"/>
--        <!--<appender-ref ref="fileAppender"/>-->
--    </root>
--
--</log4j:configuration>