You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/06/21 03:55:03 UTC

[rocketmq-streams] 04/16: merge from snapshot-1.0.3

This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch snapshot-1.0.4
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit a5a6ddde987adf1143fc5a1f4ed6902dff5368aa
Merge: 93d7f775 9d3ae58b
Author: 维章 <un...@gmail.com>
AuthorDate: Mon May 23 15:32:38 2022 +0800

    merge from snapshot-1.0.3

 NOTICE                                             |   2 +-
 README.md                                          |  16 +-
 SUMMARY.md                                         |   7 +
 docs/README.md                                     | 142 ------
 docs/SUMMARY.md                                    |   8 -
 ...225\264\344\275\223\346\236\266\346\236\204.md" |  33 --
 .../2.\346\236\204\345\273\272DataStream.md"       |  73 ----
 .../3.\345\220\257\345\212\250DataStream.md"       |  53 ---
 ...265\201\350\275\254\350\277\207\347\250\213.md" |  63 ---
 ...256\227\345\255\220\350\247\243\346\236\220.md" |  55 ---
 ...256\236\347\216\260\345\256\271\351\224\231.md" |   0
 "docs/images/Pipeline\347\261\273\345\233\276.png" | Bin 44207 -> 0 bytes
 docs/images/img.png                                | Bin 0 -> 38684 bytes
 docs/images/img_1.png                              | Bin 0 -> 43711 bytes
 docs/images/img_2.png                              | Bin 0 -> 103151 bytes
 docs/images/window.png                             | Bin 241692 -> 0 bytes
 ...75\223\346\236\266\346\236\204\345\233\276.png" | Bin 60493 -> 0 bytes
 ...00\273\344\275\223\350\277\207\347\250\213.png" | Bin 44252 -> 0 bytes
 .../\346\211\251\345\256\271\345\211\215.png"      | Bin 56733 -> 0 bytes
 ...12\266\346\200\201\347\256\227\345\255\220.png" | Bin 35766 -> 0 bytes
 "docs/images/\347\212\266\346\200\201.png"         | Bin 47527 -> 0 bytes
 "docs/images/\347\274\251\345\256\271.png"         | Bin 51087 -> 0 bytes
 docs/quick_start/README.md                         |  46 --
 pom.xml                                            | 118 ++---
 quick_start.md                                     |  92 ++--
 .../pom.xml                                        |  41 +-
 rocketmq-streams-cep/src/test/resources/log4j.xml  |  36 ++
 rocketmq-streams-channel-db/pom.xml                |   4 +-
 .../streams/db/sink/AbstractMultiTableSink.java    |  12 +-
 .../streams/db/sink/DynamicMultipleDBSink.java     |  11 +-
 .../streams/db/sink/SelfMultiTableSink.java        |   2 +-
 .../streams/db/sink/SplitBySerialNumber.java       |   2 +-
 .../streams/db/sink/SplitByTimeMultiTableSink.java |   2 +-
 rocketmq-streams-channel-es/pom.xml                |  27 +-
 .../{ESSinkBuilder.java => ESChannelBuilder.java}  |  54 +--
 .../rocketmq/streams/es/sink/ESSinkBuilder.java    |   1 +
 .../streams/es/sink/ESSinkOnlyChannel.java         |  43 +-
 .../apache/rocketmq/streams/es/sink/EsClient.java  | 135 ++++++
 rocketmq-streams-channel-http/pom.xml              |   4 +-
 rocketmq-streams-channel-kafka/pom.xml             |  32 ++
 .../streams/kafka/KafkaChannelBuilder.java         |  52 ++-
 .../apache/rocketmq/streams/kafka/KafkaSplit.java  |  55 +++
 .../rocketmq/streams/kafka/sink/KafkaSink.java     | 200 +++++++++
 .../rocketmq/streams/kafka/source/KafkaSource.java | 238 ++++++++++
 .../rocketmq/streams/kafka/KafkaChannelTest.java   | 104 +++++
 .../src/test/resources/log4j.xml                   |  20 +
 rocketmq-streams-channel-mqtt/pom.xml              |  20 +-
 .../rocketmq/streams/mqtt/source/PahoSource.java   |  41 +-
 rocketmq-streams-channel-rocketmq/pom.xml          |  28 +-
 .../apache/rocketmq/streams/debug/DebugWriter.java |  92 ++--
 .../apache/rocketmq/streams/sink/RocketMQSink.java |  55 ++-
 .../rocketmq/streams/RocketMQChannelTest.java      |   2 +-
 rocketmq-streams-channel-syslog/pom.xml            |  19 +-
 .../rocketmq/streams/syslog/SyslogChannel.java     |  35 +-
 .../streams/syslog/SyslogChannelBuilder.java       |   4 +
 .../streams/syslog/SyslogChannelManager.java       |   6 +-
 .../rocketmq/streams/syslog/SyslogServer.java      |  35 +-
 .../rocketmq/streams/syslog/SyslogClient.java      |  22 +-
 rocketmq-streams-clients/pom.xml                   |  14 +-
 .../streams/client/source/DataStreamSource.java    |  19 +
 .../streams/client/transform/DataStream.java       |  41 +-
 .../streams/client/transform/JoinStream.java       |   5 +-
 .../streams/client/transform/SplitStream.java      |  24 +-
 .../streams/client/transform/WindowStream.java     |  50 ++-
 .../rocketmq/streams/client/ApplicationTest.java   |  57 +++
 .../rocketmq/streams/client/MqttSourceExample.java |  80 ++++
 .../{sink/UserDefinedSink.java => ScriptTest.java} |  27 +-
 .../apache/rocketmq/streams/client/WindowTest.java |  14 +-
 .../rocketmq/streams/client/example/JoinTest.java  |   7 +-
 .../rocketmq/streams/client/example/SplitTest.java |  31 +-
 .../streams/client/sink/UserDefinedSink.java       |   2 +-
 .../client/sink/UserDefinedSupportShuffleSink.java |   4 +-
 rocketmq-streams-commons/pom.xml                   |  46 +-
 .../MappedByteBufferTableWithPrimaryIndex.java     | 482 +++++++++++++++++++++
 .../streams/common/cache/compress/KVAddress.java   |  48 +-
 .../streams/common/channel/AbstractChannel.java    |  14 +-
 .../AbstractSupportShuffleChannelBuilder.java      |   2 +-
 .../common/channel/builder/IChannelBuilder.java    |  25 +-
 .../channel/builder/IShuffleChannelBuilder.java    |   4 +-
 .../common/channel/impl/CollectionSink.java        |  39 +-
 .../common/channel/impl/CollectionSinkBuilder.java |  34 +-
 .../common/channel/impl/PrintChannelBuilder.java   |  30 +-
 .../channel/impl/file/FileChannelBuilder.java      |  45 +-
 .../streams/common/channel/impl/file/FileSink.java |   4 +-
 .../common/channel/impl/file/FileSource.java       |   2 +-
 .../channel/impl/memory/MemoryChannelBuilder.java  |  64 +++
 .../common/channel/impl/memory/MemorySink.java     |   4 +-
 .../channel/impl/view/ViewChannelBuilder.java      |  43 +-
 .../streams/common/channel/impl/view/ViewSink.java |  31 +-
 .../common/channel/impl/view/ViewSource.java       |  66 +++
 .../streams/common/channel/sink/AbstractSink.java  |  17 +-
 .../channel/sink/AbstractSupportShuffleSink.java   |   8 +-
 .../sink/AbstractSupportShuffleUDFSink.java        |   6 +-
 .../common/channel/sink/AbstractUDFSink.java       |  23 +-
 .../streams/common/channel/sink/ISink.java         |   9 +-
 .../impl/AbstractMultiSplitMessageCache.java       |  22 +-
 .../common/channel/source/AbstractSource.java      |  89 +++-
 .../channel/source/AbstractUnreliableSource.java   |   2 +-
 .../streams/common/channel/split/ISplit.java       |   5 +-
 .../streams/common/component/ComponentCreator.java |   4 +
 .../common/configurable/AbstractConfigurable.java  |  17 +-
 .../common/configurable/BasedConfigurable.java     |  13 +-
 .../common/configurable/IConfigurableService.java  |   5 +-
 .../streams/common/configure/ConfigureFileKey.java |   7 +-
 .../streams/common/context/AbstractContext.java    |  33 +-
 .../streams/common/context/MessageHeader.java      |  89 ++--
 .../streams/common/datatype/IntDataType.java       |   3 +-
 .../streams/common/datatype/ShortDataType.java     |   2 +-
 .../streams/common/interfaces/ISerialize.java      |   9 +-
 .../rocketmq/streams/common/model/NameCreator.java |  14 +-
 .../NameCreatorContext.java}                       |  36 +-
 .../common/monitor/ConsoleMonitorManager.java      | 412 ++++++++++++++++++
 .../streams/common/monitor/DataSyncConstants.java  |  54 +++
 .../streams/common/monitor/HttpClient.java         | 116 +++++
 .../rocketmq/streams/common/monitor/HttpUtil.java  | 248 +++++++++++
 .../monitor/MonitorDataSyncServiceFactory.java     |  61 +++
 .../common/monitor/group/MonitorCommander.java     |   4 +-
 .../streams/common/monitor/impl/DipperMonitor.java |   2 +-
 .../streams/common/monitor/model/JobStage.java     | 350 +++++++++++++++
 .../streams/common/monitor/model/TraceIdsDO.java   | 126 ++++++
 .../common/monitor/model/TraceMonitorDO.java       | 250 +++++++++++
 .../service/MonitorDataSyncService.java}           |  16 +-
 .../service/impl/DBMonitorDataSyncImpl.java        |  63 +++
 .../service/impl/HttpMonitorDataSyncImpl.java      | 151 +++++++
 .../service/impl/RocketMQMonitorDataSyncImpl.java  | 185 ++++++++
 .../optimization/IHomologousOptimization.java      |   2 +-
 .../common/optimization/MessageGlobleTrace.java    |  16 +-
 .../streams/common/optimization/Re2Engine.java     |  47 +-
 .../common/optimization/TaskOptimization.java      |  17 +-
 .../optimization/fingerprint/FingerprintCache.java |   4 +-
 .../optimization/fingerprint/PreFingerprint.java   |  54 ++-
 .../streams/common/schedule/ScheduleTask.java      |   4 +-
 .../common/threadpool/ThreadPoolFactory.java       |  34 +-
 .../AbstractMutilPipelineChainPipline.java         |  81 ++--
 .../streams/common/topology/ChainPipeline.java     | 255 ++++++-----
 .../streams/common/topology/ChainStage.java        |  32 +-
 .../common/topology/builder/PipelineBuilder.java   | 139 ++++--
 .../common/topology/metric/NotFireReason.java      | 176 ++++++++
 .../streams/common/topology/metric/StageGroup.java | 248 +++++++++++
 .../common/topology/metric/StageMetric.java        | 138 ++++++
 .../common/topology/model/AbstractStage.java       | 177 +++++---
 .../streams/common/topology/model/IWindow.java     |   3 +-
 .../streams/common/topology/model/Pipeline.java    |  56 ++-
 .../topology/model/PipelineSourceJoiner.java       |  48 --
 .../topology/shuffle/IShuffleKeyGenerator.java     |  11 +-
 .../common/topology/shuffle/ShuffleMQCreator.java  | 398 ++++++++++-------
 .../topology/stages/AbstractWindowStage.java       |   5 +-
 .../stages/EmptyChainStage.java}                   |  27 +-
 .../common/topology/stages/FilterChainStage.java   | 157 ++-----
 .../common/topology/stages/JoinChainStage.java     |   3 +-
 .../common/topology/stages/JoinEndChainStage.java  |  11 +-
 .../topology/stages/JoinStartChainStage.java       |  67 +++
 .../common/topology/stages/OutputChainStage.java   |  81 ++--
 .../topology/stages/ShuffleConsumerChainStage.java | 193 +++++++++
 .../topology/stages/ShuffleProducerChainStage.java | 345 +++++++++++++++
 .../topology/stages/SubPiplineChainStage.java      | 138 ------
 .../common/topology/stages/UnionChainStage.java    |   3 +-
 .../common/topology/stages/UnionEndChainStage.java |  11 +-
 ...onChainStage.java => UnionStartChainStage.java} |  43 +-
 .../ViewChainStage.java}                           | 479 ++++++++++----------
 .../common/topology/stages/udf/UDFChainStage.java  |  27 +-
 .../streams/common/topology/task/StreamsTask.java  | 444 +++----------------
 .../streams/common/topology/task/TaskAssigner.java |  20 +-
 .../streams/common/utils/ConfigurableUtil.java     |   6 +-
 .../streams/common/utils/ContantsUtil.java         |  33 +-
 .../streams/common/utils/DataTypeUtil.java         |  16 +-
 .../rocketmq/streams/common/utils/FileUtil.java    |  89 +++-
 .../streams/common/utils/InstantiationUtil.java    |  48 +-
 .../streams/common/utils/JsonableUtil.java         |   5 +
 .../rocketmq/streams/common/utils/KryoUtil.java    | 214 +++++++++
 .../streams/common/utils/NameCreatorUtil.java      |  60 ---
 .../streams/common/utils/PipelineHTMLUtil.java     | 299 +++++++++++++
 .../streams/common/utils/PropertiesUtils.java      |   2 +-
 .../rocketmq/streams/common/utils/ReflectUtil.java |  62 ++-
 .../streams/common/utils/SerializeUtil.java        |  23 +-
 .../streams/common/utils/ServiceLoadUtil.java      |  63 +++
 .../rocketmq/streams/common/utils/TraceUtil.java   |  23 +-
 .../rocketmq/streams/common/channel/SinkTest.java  |   4 +-
 rocketmq-streams-configurable/pom.xml              |   9 +-
 .../configurable/ConfigurableComponent.java        |  17 +-
 .../service/AbstractConfigurableService.java       |  98 ++---
 .../service/impl}/FileConfigureService.java        |   2 +-
 .../impl}/FileSupportParentConfigureService.java   |   2 +-
 .../service/impl/HttpConfigureService.java         | 377 ++++++++++++++++
 .../impl/HttpSupportParentConfigureService.java    |  20 +-
 .../service/impl}/MemoryConfigureService.java      |   2 +-
 .../impl}/MemorySupportParentConfigureService.java |   2 +-
 rocketmq-streams-connectors/pom.xml                |   9 +-
 .../{IBounded.java => IScheduleCallback.java}      |   9 +-
 .../connectors/source/AbstractPullSource.java      | 148 ++++---
 .../connectors/source/MutilBatchTaskSource.java    | 158 +++++++
 rocketmq-streams-db-operator/pom.xml               |   4 +-
 .../streams/db/configuable/DBConfigureService.java |  11 +-
 rocketmq-streams-dim/pom.xml                       |   4 +-
 .../intelligence/AbstractIntelligenceCache.java    |   5 +-
 rocketmq-streams-examples/pom.xml                  |  20 +-
 .../mutilconsumer/MultiStreamsExample.java         |   3 +
 .../streams/examples/send/ProducerFromFile.java    |   8 +-
 .../streams/examples/source/FileSourceExample.java |   2 +-
 .../src/main/resources/joinData-2.txt              |   4 -
 rocketmq-streams-filter/pom.xml                    |   3 +-
 .../streams/filter/builder/ExpressionBuilder.java  |  49 +--
 .../streams/filter/context/RuleContext.java        |  31 +-
 .../filter/engine/impl/DefaultRuleEngine.java      |  29 +-
 .../function/expression/CompareFunction.java       |  16 +-
 .../rocketmq/streams/filter/operator/Rule.java     |  58 +++
 .../expression/ExpressionRelationParser.java       |   9 +-
 .../expression/ExpressionRelationPaser.java        | 107 -----
 .../operator/expression/GroupExpression.java       |   3 +-
 .../operator/expression/RelationExpression.java    |  35 +-
 .../PiplineLogFingerprintAnalysis.java             |   6 +-
 .../dependency/BlinkRuleV2Expression.java          |   5 +-
 .../optimization/dependency/DependencyTree.java    |  22 +-
 .../dependency/SimplePipelineTree.java             |   3 +-
 .../dependency/StateLessDependencyTree.java        |  84 ++++
 .../optimization/homologous/HomologousCompute.java |   5 +-
 .../homologous/HomologousOptimization.java         |  13 +-
 rocketmq-streams-lease/pom.xml                     |   4 +-
 rocketmq-streams-runner/assembly/distribution.xml  |  69 ---
 rocketmq-streams-runner/assembly/standalone.xml    |  72 ---
 rocketmq-streams-runner/bin/start.sh               |  58 ---
 rocketmq-streams-runner/bin/stop.sh                |  33 --
 rocketmq-streams-runner/pom.xml                    |  80 ----
 .../src/main/resources/log4j.xml                   |  51 ---
 rocketmq-streams-schedule/pom.xml                  |   4 +-
 .../schedule/job/ConfigurableExecutorJob.java      |  30 +-
 rocketmq-streams-script/pom.xml                    |   4 +-
 .../function/aggregation/LastValueAccumulator.java |  67 +++
 .../function/impl/distinct/DistinctFunction.java   |   1 -
 .../function/impl/json/JsonCreatorFunction.java    |   4 +-
 .../function/impl/json/UDTFFieldNameFunction.java  |  50 +++
 .../script/function/impl/parser/GrokFunction.java  |   4 +-
 .../function/impl/parser/Paser2JsonFunction.java   |  17 +-
 .../function/impl/parser/PaserBySplitFunction.java |  44 +-
 .../function/impl/parser/RegexParserFunction.java  |  24 +-
 .../script/function/impl/router/RouteFunction.java |   4 +-
 .../script/operator/impl/AggregationScript.java    |  19 +-
 .../streams/script/service/IAccumulator.java       |   4 +-
 .../script/service/udf/SimpleUDAFScript.java       |  35 +-
 .../streams/script/service/udf/UDFScript.java      |  26 +-
 .../streams/script/function/FunctionTest.java      |  19 +-
 rocketmq-streams-serviceloader/pom.xml             |   4 +-
 rocketmq-streams-state/pom.xml                     |   4 +-
 .../streams/state/kv/rocksdb/RocksDBOperator.java  |  37 +-
 rocketmq-streams-transport-minio/pom.xml           |   4 +-
 rocketmq-streams-window/pom.xml                    |   9 +-
 .../window/minibatch/MiniBatchMsgCache.java        |  58 +++
 .../window/minibatch/ShuffleMessageCache.java      | 187 ++++++++
 .../rocketmq/streams/window/model/WindowCache.java | 182 +++-----
 .../streams/window/model/WindowInstance.java       |  22 +-
 .../window/operator/AbstractShuffleWindow.java     |  18 +-
 .../streams/window/operator/AbstractWindow.java    |  73 +++-
 .../window/operator/impl/SessionOperator.java      |   2 +-
 .../window/operator/impl/WindowOperator.java       |   8 +-
 .../streams/window/operator/join/JoinWindow.java   |   3 +-
 .../window/shuffle/AbstractSystemChannel.java      |  94 ++--
 .../streams/window/shuffle/ShuffleCache.java       |  14 +-
 .../streams/window/shuffle/ShuffleChannel.java     |  54 +--
 .../streams/window/state/impl/WindowValue.java     |  54 ++-
 .../streams/window/trigger/WindowTrigger.java      |   1 -
 .../rocketmq/streams/window/util/ShuffleUtil.java  |  62 +++
 .../org/apache/rocketmq/streams/RocksdbTest.java   |   2 +-
 docs/stream_sink/README.md => stream_sink.md       |  19 +-
 docs/stream_source/README.md => stream_source.md   |  30 +-
 .../README.md => stream_transform.md               |   0
 265 files changed, 9865 insertions(+), 4083 deletions(-)

diff --cc pom.xml
index 70288c26,7caa0cbc..2d139613
--- a/pom.xml
+++ b/pom.xml
@@@ -184,9 -137,9 +136,10 @@@
                          <exclude>**/*.xml</exclude>
                          <exclude>**/*.sh</exclude>
                          <exclude>**/*.out</exclude>
+                         <exclude>**/*.sql</exclude>
                          <exclude>**/*.properties</exclude>
                          <exclude>docs/**/*</exclude>
 +                        <exclude>**/*.sql</exclude>
                      </excludes>
                  </configuration>
              </plugin>
diff --cc rocketmq-streams-channel-es/pom.xml
index 15723b3c,fe50aeef..b4ce4d85
--- a/rocketmq-streams-channel-es/pom.xml
+++ b/rocketmq-streams-channel-es/pom.xml
@@@ -20,8 -7,8 +7,8 @@@
      <parent>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-streams</artifactId>
 -        <version>1.0.2-SNAPSHOT</version>
 +        <version>1.0.2-preview-SNAPSHOT</version>
-     </parent>
+      </parent>
      <artifactId>rocketmq-streams-channel-es</artifactId>
      <name>ROCKETMQ STREAMS :: channel-es</name>
      <packaging>jar</packaging>
diff --cc rocketmq-streams-channel-kafka/pom.xml
index 00000000,89bd645c..76b73909
mode 000000,100644..100644
--- a/rocketmq-streams-channel-kafka/pom.xml
+++ b/rocketmq-streams-channel-kafka/pom.xml
@@@ -1,0 -1,32 +1,32 @@@
+ <?xml version="1.0" encoding="UTF-8"?>
+ <project xmlns="http://maven.apache.org/POM/4.0.0"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+     <parent>
+         <artifactId>rocketmq-streams</artifactId>
+         <groupId>org.apache.rocketmq</groupId>
 -        <version>1.0.2-SNAPSHOT</version>
++        <version>1.0.2-preview-SNAPSHOT</version>
+     </parent>
+     <modelVersion>4.0.0</modelVersion>
+ 
+     <artifactId>rocketmq-streams-channel-kafka</artifactId>
+     <name>ROCKETMQ STREAMS :: channel-kafka</name>
+ 
+     <properties>
+         <maven.compiler.source>8</maven.compiler.source>
+         <maven.compiler.target>8</maven.compiler.target>
+     </properties>
+ 
+ 
+     <dependencies>
+         <dependency>
+             <groupId>org.apache.kafka</groupId>
+             <artifactId>kafka_2.12</artifactId>
+         </dependency>
+ 
+         <dependency>
+             <groupId>org.apache.rocketmq</groupId>
+             <artifactId>rocketmq-streams-commons</artifactId>
+         </dependency>
+     </dependencies>
+ </project>
diff --cc rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
index bb6e9454,d825603a..1de92e31
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
@@@ -17,8 -17,10 +17,11 @@@
  
  package org.apache.rocketmq.streams.sink;
  
+ import java.nio.charset.StandardCharsets;
  import java.util.ArrayList;
 +import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.Comparator;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.List;
@@@ -27,8 -29,10 +30,9 @@@ import java.util.Set
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 -import org.apache.rocketmq.client.exception.MQBrokerException;
  import org.apache.rocketmq.client.exception.MQClientException;
  import org.apache.rocketmq.client.producer.DefaultMQProducer;
+ import org.apache.rocketmq.common.MixAll;
  import org.apache.rocketmq.common.TopicConfig;
  import org.apache.rocketmq.common.message.Message;
  import org.apache.rocketmq.common.message.MessageQueue;
@@@ -149,10 -154,10 +154,6 @@@ public class RocketMQSink extends Abstr
                      destroy();
                      producer = new DefaultMQProducer(groupName + "producer", true, null);
                      try {
--                        //please not use the code,the name srv addr may be empty in jmenv
--//                        if (this.namesrvAddr == null || "".equals(this.namesrvAddr)) {
--//                            throw new RuntimeException("namesrvAddr can not be null.");
--//                        }
  
                          if (StringUtil.isNotEmpty(this.namesrvAddr)) {
                              producer.setNamesrvAddr(this.namesrvAddr);
@@@ -263,7 -275,7 +272,7 @@@
  
      @Override
      public int getSplitNum() {
-         List<ISplit> splits = getSplitList();
 -        List<ISplit<?, ?>> splits = getSplitList();
++        List<ISplit<?,?>> splits = getSplitList();
          if (splits == null || splits.size() == 0) {
              return 0;
          }
diff --cc rocketmq-streams-clients/pom.xml
index 8fb31672,94c7f3e7..2d15efce
--- a/rocketmq-streams-clients/pom.xml
+++ b/rocketmq-streams-clients/pom.xml
@@@ -50,6 -57,18 +57,11 @@@
              <groupId>org.apache.rocketmq</groupId>
              <artifactId>rocketmq-streams-filter</artifactId>
          </dependency>
+         <dependency>
+             <groupId>org.apache.rocketmq</groupId>
+             <artifactId>rocketmq-streams-channel-syslog</artifactId>
+         </dependency>
 -        <dependency>
 -            <groupId>org.apache.rocketmq</groupId>
 -            <artifactId>rocketmq-streams-channel-kafka</artifactId>
 -        </dependency>
 -        <dependency>
 -            <groupId>org.apache.rocketmq</groupId>
 -            <artifactId>rocketmq-streams-dbinit</artifactId>
 -        </dependency>
++
          <dependency>
              <groupId>org.apache.rocketmq</groupId>
              <artifactId>rocketmq-streams-window</artifactId>
diff --cc rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
index 9422519a,5004f230..1e4b15b6
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
@@@ -178,6 -179,25 +178,25 @@@ public class DataStreamSource 
          return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
      }
  
 -    public DataStream fromKafka(String endpoint, String topic, String groupName) {
 -        return fromKafka(endpoint, topic, groupName, true);
 -    }
 -
 -    public DataStream fromKafka(String endpoint, String topic, String groupName, Boolean isJson) {
 -        return fromKafka(endpoint, topic, groupName, isJson, 1);
 -    }
 -
 -    public DataStream fromKafka(String endpoint, String topic, String groupName, Boolean isJson, int maxThread) {
 -        KafkaSource kafkaChannel = new KafkaSource();
 -        kafkaChannel.setBootstrapServers(endpoint);
 -        kafkaChannel.setTopic(topic);
 -        kafkaChannel.setGroupName(groupName);
 -        kafkaChannel.setJsonData(isJson);
 -        kafkaChannel.setMaxThread(maxThread);
 -        this.mainPipelineBuilder.setSource(kafkaChannel);
 -        return new DataStream(this.mainPipelineBuilder, null);
 -    }
++//    public DataStream fromKafka(String endpoint, String topic, String groupName) {
++//        return fromKafka(endpoint, topic, groupName, true);
++//    }
++//
++//    public DataStream fromKafka(String endpoint, String topic, String groupName, Boolean isJson) {
++//        return fromKafka(endpoint, topic, groupName, isJson, 1);
++//    }
++//
++//    public DataStream fromKafka(String endpoint, String topic, String groupName, Boolean isJson, int maxThread) {
++//        KafkaSource kafkaChannel = new KafkaSource();
++//        kafkaChannel.setBootstrapServers(endpoint);
++//        kafkaChannel.setTopic(topic);
++//        kafkaChannel.setGroupName(groupName);
++//        kafkaChannel.setJsonData(isJson);
++//        kafkaChannel.setMaxThread(maxThread);
++//        this.mainPipelineBuilder.setSource(kafkaChannel);
++//        return new DataStream(this.mainPipelineBuilder, null);
++//    }
+ 
      public DataStream from(ISource<?> source) {
          this.mainPipelineBuilder.setSource(source);
          return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
diff --cc rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
index 80ae510e,96e24a0a..563af478
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
@@@ -51,8 -51,11 +51,10 @@@ import org.apache.rocketmq.streams.comm
  import org.apache.rocketmq.streams.common.topology.ChainStage;
  import org.apache.rocketmq.streams.common.topology.builder.IStageBuilder;
  import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
 -import org.apache.rocketmq.streams.common.topology.model.AbstractRule;
  import org.apache.rocketmq.streams.common.topology.model.Union;
  import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage;
+ import org.apache.rocketmq.streams.common.topology.stages.ShuffleConsumerChainStage;
+ import org.apache.rocketmq.streams.common.topology.stages.ShuffleProducerChainStage;
  import org.apache.rocketmq.streams.common.topology.stages.udf.StageBuilder;
  import org.apache.rocketmq.streams.common.topology.stages.udf.UDFChainStage;
  import org.apache.rocketmq.streams.common.topology.stages.udf.UDFUnionChainStage;
@@@ -646,6 -669,13 +667,13 @@@ public class DataStream implements Seri
          return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, output);
      }
  
 -    public DataStream toKafka(String bootstrapServers, String topic) {
 -        KafkaSink kafkaSink = new KafkaSink(bootstrapServers, topic);
 -        ChainStage<?> output = this.mainPipelineBuilder.createStage(kafkaSink);
 -        this.mainPipelineBuilder.setTopologyStages(currentChainStage, output);
 -        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, output);
 -    }
++//    public DataStream toKafka(String bootstrapServers, String topic) {
++//        KafkaSink kafkaSink = new KafkaSink(bootstrapServers, topic);
++//        ChainStage<?> output = this.mainPipelineBuilder.createStage(kafkaSink);
++//        this.mainPipelineBuilder.setTopologyStages(currentChainStage, output);
++//        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, output);
++//    }
+ 
      public DataStream toEnhanceDBSink(String url, String userName, String password, String tableName) {
          EnhanceDBSink sink = new EnhanceDBSink(url, userName, password, tableName);
          ChainStage<?> output = this.mainPipelineBuilder.createStage(sink);
diff --cc rocketmq-streams-commons/pom.xml
index 5ea155d4,72af956e..2e73c6f8
--- a/rocketmq-streams-commons/pom.xml
+++ b/rocketmq-streams-commons/pom.xml
@@@ -98,11 -101,36 +101,40 @@@
              <artifactId>re2j</artifactId>
              <version>1.6</version>
          </dependency>
 +        <dependency>
 +            <groupId>org.apache.rocketmq</groupId>
 +            <artifactId>rocketmq-tools</artifactId>
 +        </dependency>
  
+         <dependency>
+             <groupId>org.apache.rocketmq</groupId>
+             <artifactId>rocketmq-client</artifactId>
+             <exclusions>
+                 <exclusion>
+                     <groupId>ch.qos.logback</groupId>
+                     <artifactId>logback-classic</artifactId>
+                 </exclusion>
+                 <exclusion>
+                     <groupId>ch.qos.logback</groupId>
+                     <artifactId>logback-core</artifactId>
+                 </exclusion>
+             </exclusions>
+         </dependency>
+         <dependency>
+             <groupId>de.ruedigermoeller</groupId>
+             <artifactId>fst</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>com.esotericsoftware</groupId>
+             <artifactId>kryo</artifactId>
+             <version>5.3.0</version>
+         </dependency>
+ 
  
+         <dependency>
+             <groupId>commons-codec</groupId>
+             <artifactId>commons-codec</artifactId>
+             <version>1.13</version>
+         </dependency>
      </dependencies>
  </project>
diff --cc rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
index 939b1d54,320cbd12..2bc7e55f
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
@@@ -18,9 -18,10 +18,11 @@@ package org.apache.rocketmq.streams.com
  
  import com.alibaba.fastjson.JSONArray;
  import com.alibaba.fastjson.JSONObject;
 +
  import java.io.UnsupportedEncodingException;
  import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collections;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.Iterator;
@@@ -58,15 -58,22 +60,23 @@@ import org.apache.rocketmq.streams.comm
  public abstract class AbstractSource extends BasedConfigurable implements ISource<AbstractSource>, ILifeCycle {
  
      public static String CHARSET = "UTF-8";
+     /**
+      * 输入的消息是否为json
+      */
+     protected Boolean isJsonData = true;
+     /**
+      * 输入的消息是否为json array
+      */
+     protected Boolean msgIsJsonArray = false;
  
-     protected Boolean isJsonData = true;//输入的消息是否为json
-     protected Boolean msgIsJsonArray = false;//输入的消息是否为json array
      @ENVDependence
-     protected String groupName;//group name
+     protected String groupName;
+ 
      protected int maxThread = Runtime.getRuntime().availableProcessors();
+ 
      @ENVDependence
      protected String topic = "";
 +    protected String namesrvAddr;
      /**
       * 多长时间做一次checkpoint
       */
@@@ -136,11 -167,9 +170,10 @@@
       * @param message
       * @return
       */
 -    public AbstractContext doReceiveMessage(JSONObject message, boolean needSetCheckPoint, String queueId, String offset) {
 +    public AbstractContext doReceiveMessage(JSONObject message, boolean needSetCheckPoint, String queueId,
 +                                            String offset) {
          Message msg = createMessage(message, queueId, offset, needSetCheckPoint);
-         AbstractContext context = executeMessage(msg);
-         return context;
+         return executeMessage(msg);
      }
  
      /**
diff --cc rocketmq-streams-connectors/pom.xml
index 8c8277e3,d544e3d5..e557a19a
mode 100755,100644..100644
--- a/rocketmq-streams-connectors/pom.xml
+++ b/rocketmq-streams-connectors/pom.xml
diff --cc rocketmq-streams-examples/pom.xml
index 62a4038c,da6a130b..7ced0cb0
--- a/rocketmq-streams-examples/pom.xml
+++ b/rocketmq-streams-examples/pom.xml
@@@ -39,6 -41,6 +41,22 @@@
              <groupId>org.apache.rocketmq</groupId>
              <artifactId>rocketmq-streams-clients</artifactId>
          </dependency>
++        <dependency>
++            <groupId>com.alibaba</groupId>
++            <artifactId>fastjson</artifactId>
++        </dependency>
++        <dependency>
++            <groupId>junit</groupId>
++            <artifactId>junit</artifactId>
++        </dependency>
++        <dependency>
++            <groupId>org.apache.rocketmq</groupId>
++            <artifactId>rocketmq-client</artifactId>
++        </dependency>
++        <dependency>
++            <groupId>org.apache.rocketmq</groupId>
++            <artifactId>rocketmq-streams-commons</artifactId>
++        </dependency>
      </dependencies>
      <packaging>jar</packaging>
  
diff --cc rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java
index f5f3d46c,2b823e44..8835e943
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java
@@@ -24,13 -23,11 +24,15 @@@ import com.alibaba.fastjson.JSONObject
  import java.util.Random;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Executors;
 +
++import org.apache.rocketmq.common.message.Message;
  import org.apache.rocketmq.streams.client.StreamBuilder;
  import org.apache.rocketmq.streams.client.source.DataStreamSource;
  import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
++import org.apache.rocketmq.streams.client.transform.DataStream;
  import org.apache.rocketmq.streams.client.transform.window.Time;
  import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
 +import org.apache.rocketmq.streams.examples.send.ProducerFromFile;
  
  import static org.apache.rocketmq.streams.examples.aggregate.Constant.NAMESRV_ADDRESS;
  import static org.apache.rocketmq.streams.examples.aggregate.Constant.RMQ_CONSUMER_GROUP_NAME;
@@@ -64,6 -61,6 +66,7 @@@ public class MultiStreamsExample 
  
      private static void runOneStreamsClient(int index) {
          DataStreamSource source = StreamBuilder.dataStream("namespace" + index, "pipeline" + index);
++
          source.fromRocketmq(
                  RMQ_TOPIC,
                  RMQ_CONSUMER_GROUP_NAME,
diff --cc rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/send/ProducerFromFile.java
index 163d8116,58d3710c..0508c303
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/send/ProducerFromFile.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/send/ProducerFromFile.java
@@@ -17,8 -17,8 +17,12 @@@
   *
   */
  
 -package org.apache.rocketmq.streams.examples.aggregate;
 +package org.apache.rocketmq.streams.examples.send;
 +
++import org.apache.rocketmq.client.producer.DefaultMQProducer;
++import org.apache.rocketmq.common.message.Message;
++import org.apache.rocketmq.remoting.common.RemotingHelper;
+ 
  import java.io.BufferedReader;
  import java.io.File;
  import java.io.FileReader;
@@@ -26,44 -26,18 +30,40 @@@ import java.io.IOException
  import java.net.URL;
  import java.util.ArrayList;
  import java.util.List;
 -import org.apache.rocketmq.client.producer.DefaultMQProducer;
 -import org.apache.rocketmq.client.producer.SendResult;
 -import org.apache.rocketmq.common.message.Message;
 -import org.apache.rocketmq.remoting.common.RemotingHelper;
 +import java.util.concurrent.atomic.AtomicLong;
  
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
- 
  public class ProducerFromFile {
 +    private static final DefaultMQProducer producer = new DefaultMQProducer("test-group");
 +    private static final AtomicLong count = new AtomicLong(0);
 +    private static boolean init = false;
  
 -    public static void produce(String filePath, String nameServ, String topic) {
 -        try {
 -            DefaultMQProducer producer = new DefaultMQProducer("test-group");
 +    private static synchronized void initProducer(String nameServ) throws Throwable {
 +        if (!init) {
              producer.setNamesrvAddr(nameServ);
              producer.start();
 +            init = true;
 +        }
 +    }
 +
 +    public static void produceInLoop(String filePath, String nameServ, String topic, long interval) {
 +        while (true) {
 +            try {
 +                produce(filePath, nameServ, topic, false);
 +
 +                Thread.sleep(interval);
 +
 +                if (count.get() % 500 == 0) {
 +                    System.out.println("send message num: " + count.get());
 +                }
 +            } catch (Throwable t) {
 +                t.printStackTrace();
 +            }
 +        }
 +    }
 +
 +    public static void produce(String filePath, String nameServ, String topic, boolean shutdown) {
 +        try {
 +            initProducer(nameServ);
  
              List<String> result = ProducerFromFile.read(filePath);
  
diff --cc rocketmq-streams-window/pom.xml
index 40fcbf94,7a2be2dd..3ce4dac8
--- a/rocketmq-streams-window/pom.xml
+++ b/rocketmq-streams-window/pom.xml
@@@ -48,10 -50,6 +50,13 @@@
              <groupId>org.rocksdb</groupId>
              <artifactId>rocksdbjni</artifactId>
          </dependency>
- 
 -        
 +        <dependency>
 +            <groupId>org.apache.rocketmq</groupId>
 +            <artifactId>rocketmq-tools</artifactId>
 +        </dependency>
++        <dependency>
++            <groupId>org.apache.rocketmq</groupId>
++            <artifactId>rocketmq-streams-commons</artifactId>
++        </dependency>
      </dependencies>
  </project>
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java
index 00000000,39629cc1..a36309e2
mode 000000,100644..100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java
@@@ -1,0 -1,76 +1,58 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.rocketmq.streams.window.minibatch;
+ 
 -import com.alibaba.fastjson.JSONArray;
 -import com.alibaba.fastjson.JSONObject;
 -import java.util.ArrayList;
 -import java.util.HashMap;
 -import java.util.Iterator;
 -import java.util.List;
 -import java.util.Map;
 -import java.util.Set;
+ import org.apache.commons.lang3.tuple.Pair;
+ import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
+ import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
+ import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
+ import org.apache.rocketmq.streams.common.channel.split.ISplit;
+ import org.apache.rocketmq.streams.common.context.IMessage;
 -import org.apache.rocketmq.streams.common.context.Message;
 -import org.apache.rocketmq.streams.common.context.MessageHeader;
+ import org.apache.rocketmq.streams.common.topology.shuffle.IShuffleKeyGenerator;
 -import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 -import org.apache.rocketmq.streams.common.utils.ReflectUtil;
 -import org.apache.rocketmq.streams.common.utils.StringUtil;
 -import org.apache.rocketmq.streams.script.operator.impl.AggregationScript;
 -import org.apache.rocketmq.streams.window.model.WindowInstance;
+ import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
 -import org.apache.rocketmq.streams.window.operator.AbstractWindow;
 -import org.apache.rocketmq.streams.window.state.impl.WindowValue;
 -import org.apache.rocketmq.streams.window.util.ShuffleUtil;
+ 
+ public class MiniBatchMsgCache extends AbstractMultiSplitMessageCache<Pair<ISplit,IMessage>> {
+     public static String SHUFFLE_KEY="shuffle_key";
+ 
+ 
+ 
+     protected transient IShuffleKeyGenerator shuffleKeyGenerator;
+     protected transient AbstractShuffleWindow window;
+ 
+ 
+ 
+ 
+     public MiniBatchMsgCache(
+         IMessageFlushCallBack<Pair<ISplit,IMessage>> flushCallBack, IShuffleKeyGenerator shuffleKeyGenerator,
+         AbstractShuffleWindow window) {
+         super(flushCallBack);
+         this.shuffleKeyGenerator=shuffleKeyGenerator;
+         this.window=window;
+     }
+ 
+ 
+     @Override protected String createSplitId(Pair<ISplit, IMessage> msg) {
+         return msg.getLeft().getQueueId();
+     }
+ 
+     @Override protected MessageCache createMessageCache() {
+         ShuffleMessageCache messageCache=new ShuffleMessageCache(this.flushCallBack);
+         messageCache.setWindow(window);
+         messageCache.setShuffleKeyGenerator(shuffleKeyGenerator);
+         return messageCache;
+     }
+ }
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
index c749bd84,fed18863..4062b1f5
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
@@@ -116,6 -122,19 +116,10 @@@ public class WindowInstance extends Ent
          return windowInstance;
      }
  
 -    /**
 -     * 创建window instance的唯一ID
 -     *
 -     * @return
 -     */
+     public String createWindowInstanceId() {
+         return MapKeyUtil.createKey(splitId, windowNameSpace, windowName, windowInstanceName, startTime, endTime);
+     }
+ 
 -    public String createWindowInstanceIdWithoutSplitid() {
 -        return MapKeyUtil.createKey(windowNameSpace, windowName, windowInstanceName, startTime, endTime);
 -    }
 -
      public String createWindowInstanceTriggerId() {
          return MapKeyUtil.createKey(splitId, windowNameSpace, windowName, windowInstanceName, startTime, endTime, fireTime);
      }
@@@ -162,7 -254,20 +165,18 @@@
       * @return
       * @Param isWindowInstance2DB 如果是秒级窗口,可能windowinstacne不必存表,只在内存保存,可以通过这个标志设置
       */
 -    public static List<WindowInstance> getOrCreateWindowInstance(AbstractWindow window, Long occurTime,
 -        int timeUnitAdjust, String queueId) {
 +    public static List<WindowInstance> getOrCreateWindowInstance(AbstractWindow window, Long occurTime, int timeUnitAdjust, String queueId) {
+         return getOrCreateWindowInstance(window,occurTime,timeUnitAdjust,queueId,false);
+     }
+     /**
+      * 查询或者创建Window的实例,滑动窗口有可能返回多个,滚动窗口返回一个
+      *
+      * @param window
+      * @param occurTime
+      * @return
+      * @Param isWindowInstance2DB 如果是秒级窗口,可能windowinstacne不必存表,只在内存保存,可以通过这个标志设置
+      */
 -    public static List<WindowInstance> getOrCreateWindowInstance(AbstractWindow window, Long occurTime,
 -        int timeUnitAdjust, String queueId, boolean isCreateOnly) {
++    public static List<WindowInstance> getOrCreateWindowInstance(AbstractWindow window, Long occurTime, int timeUnitAdjust, String queueId, boolean isCreateOnly) {
          int windowSlideInterval = window.getSlideInterval();
          int windowSizeInterval = window.getSizeInterval();
          if (windowSlideInterval == 0) {
@@@ -246,10 -345,9 +264,10 @@@
              }
          }
          List<WindowInstance> lostInstanceList = null;
 +        //todo 这里针对lost的都创建一次
          lostInstanceList = WindowInstance.createWindowInstances(window, lostWindowTimeList, lostFireList, queueId);
          instanceList.addAll(lostInstanceList);
-         if (CollectionUtil.isNotEmpty(lostInstanceList)) {
+         if (CollectionUtil.isNotEmpty(lostInstanceList)&&!isCreateOnly) {
              for (WindowInstance windowInstance : instanceList) {
                  List<WindowInstance> emitInstances = createEmitWindowInstance(window, windowInstance);
                  if (emitInstances != null && emitInstances.size() > 0) {
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
index 070a40f0,5338734f..7c78f478
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
@@@ -85,8 -74,7 +100,7 @@@ public abstract class AbstractShuffleWi
          Set<String> splitIds = new HashSet<>();
          splitIds.add(windowInstance.getSplitId());
          shuffleChannel.flush(splitIds);
- 
 -        return fireWindowInstance(windowInstance, windowInstance.getSplitId(), queueId2Offset);
 +        return doFireWindowInstance(windowInstance);
      }
  
      /**
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
index 49d79c5b,8fd07737..eb548aa6
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
@@@ -17,6 -17,17 +17,8 @@@
  package org.apache.rocketmq.streams.window.operator;
  
  import com.alibaba.fastjson.JSONObject;
 -import java.util.ArrayList;
 -import java.util.Date;
 -import java.util.HashMap;
 -import java.util.Iterator;
 -import java.util.LinkedList;
 -import java.util.List;
 -import java.util.Map;
 -import java.util.Map.Entry;
 -import java.util.concurrent.ConcurrentHashMap;
 -import java.util.stream.Collectors;
++
+ import javafx.util.Pair;
  import org.apache.commons.lang3.StringUtils;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
@@@ -137,8 -164,8 +145,9 @@@ public abstract class AbstractWindow ex
      protected boolean isLocalStorageOnly = true;//是否只用本地存储,可以提高性能,但不保证可靠性
      protected String reduceSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
      protected transient IReducer reducer;
- 
 +    protected transient Long maxPartitionNum = 100000000L;
+     protected String mapFunctionSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
+     protected transient MapFunction<JSONObject, Pair<WindowInstance, JSONObject>> mapFunction;
      /**
       * the computed column and it's process of computing
       */
@@@ -164,12 -191,23 +173,13 @@@
       */
      protected transient String WINDOW_NAME;
  
 -    /**
 -     * 内部使用,定期检查窗口有没有触发
 -     */
 -    //protected transient ScheduledExecutorService fireWindowInstanceChecker =new ScheduledThreadPoolExecutor(3);
 -
 -    // protected transient ExecutorService deleteService = Executors.newSingleThreadExecutor();
  
      protected volatile transient WindowCache windowCache;
 -    protected transient WindowStorage storage;
 +    protected transient IStorage storage;
      protected transient WindowTrigger windowFireSource;
 -    protected transient SQLCache sqlCache;
      protected transient EventTimeManager eventTimeManager;
+     protected transient ISink contextMsgSink;
  
 -    //create and save window instacne max partitionNum and window max eventTime
 -    protected transient IWindowMaxValueManager windowMaxValueManager;
 -
      public AbstractWindow() {
          setType(IWindow.TYPE);
      }
@@@ -199,8 -242,12 +209,12 @@@
              byte[] bytes = Base64Utils.decode(this.reduceSerializeValue);
              reducer = InstantiationUtil.deserializeObject(bytes);
          }
+         if (StringUtil.isNotEmpty(this.mapFunctionSerializeValue)) {
+             byte[] bytes = Base64Utils.decode(this.mapFunctionSerializeValue);
+             this.mapFunction = InstantiationUtil.deserializeObject(bytes);
+         }
          eventTimeManager = new EventTimeManager();
 -        windowMaxValueManager = new WindowMaxValueManager(this, sqlCache);
 +
  
          return success;
      }
@@@ -306,11 -347,11 +320,12 @@@
       * @param message
       * @return
       */
-     protected String generateShuffleKey(IMessage message) {
+     @Override
+     public String generateShuffleKey(IMessage message) {
          if (StringUtil.isEmpty(groupByFieldName)) {
-             return null;
+             return "globle_window";
          }
 +
          JSONObject msg = message.getMessageBody();
          String[] fieldNames = groupByFieldName.split(";");
          String[] values = new String[fieldNames.length];
@@@ -773,13 -845,45 +796,53 @@@
          this.maxDelay = maxDelay;
      }
  
 +    public Long getMaxPartitionNum() {
 +        return maxPartitionNum;
 +    }
 +
 +    public void setMaxPartitionNum(Long maxPartitionNum) {
 +        this.maxPartitionNum = maxPartitionNum;
 +    }
 +
      public abstract boolean supportBatchMsgFinish();
+ 
+     public ISink getContextMsgSink() {
+         return contextMsgSink;
+     }
+ 
+     public String getContextMsgSinkName() {
+         return contextMsgSinkName;
+     }
+ 
+     public void setContextMsgSinkName(String contextMsgSinkName) {
+         this.contextMsgSinkName = contextMsgSinkName;
+     }
+ 
+     public String getMapFunctionSerializeValue() {
+         return mapFunctionSerializeValue;
+     }
+ 
+     public void setMapFunctionSerializeValue(String mapFunctionSerializeValue) {
+         this.mapFunctionSerializeValue = mapFunctionSerializeValue;
+     }
+ 
+     public void saveMsgContext(String queueId,WindowInstance windowInstance, List<IMessage> messages) {
+         if(this.mapFunction!=null&&this.contextMsgSink!=null){
+             if(messages!=null){
+                 for(IMessage message:messages){
+                     JSONObject msg=message.getMessageBody();
+                     try {
+                         msg=this.mapFunction.map(new Pair(windowInstance,msg));
+                         Message copyMsg=new Message(msg);
+                         copyMsg.getHeader().setQueueId(queueId);
+                         copyMsg.getHeader().setOffset(message.getHeader().getOffset());
+                         this.contextMsgSink.batchAdd(copyMsg);
+                     } catch (Exception e) {
+                         throw new RuntimeException("save window context msg error ",e);
+                     }
+                 }
+                 this.contextMsgSink.flush();
+             }
+         }
+     }
  }
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
index 65ac261b,8da03987..0c10a9bf
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
@@@ -515,13 -481,12 +515,13 @@@ public class SessionOperator extends Wi
          store(lastValueMap, instance, queueId);
      }
  
 +
      @Override
      public long incrementAndGetSplitNumber(WindowInstance instance, String shuffleId) {
 -        long number = super.incrementAndGetSplitNumber(instance, shuffleId);
 -        if (number > 900000000) {
 -            this.getWindowMaxValueManager().resetSplitNum(instance, shuffleId);
 +        long numer = super.incrementAndGetSplitNumber(instance, shuffleId);
 +        if (numer > 900000000) {
 +            this.storage.putMaxPartitionNum(shuffleId, instance.getWindowInstanceId(), numer);
          }
 -        return number;
 +        return numer;
      }
- }
+ }
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
index 65f326f6,d2806081..de6dce22
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
@@@ -16,113 -16,155 +16,119 @@@
   */
  package org.apache.rocketmq.streams.window.operator.impl;
  
 -import java.util.ArrayList;
 -import java.util.HashMap;
 -import java.util.Iterator;
 -import java.util.List;
 -import java.util.Map;
 -import java.util.Map.Entry;
 -import java.util.Set;
 -import java.util.concurrent.atomic.AtomicInteger;
 -import java.util.concurrent.atomic.AtomicLong;
 -import org.apache.rocketmq.streams.common.channel.split.ISplit;
++
  import org.apache.rocketmq.streams.common.context.IMessage;
  import org.apache.rocketmq.streams.common.context.MessageOffset;
 -import org.apache.rocketmq.streams.common.utils.CollectionUtil;
 -import org.apache.rocketmq.streams.common.utils.DateUtil;
  import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
  import org.apache.rocketmq.streams.common.utils.StringUtil;
 -import org.apache.rocketmq.streams.db.driver.batchloader.IRowOperator;
 -import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
 -import org.apache.rocketmq.streams.script.operator.impl.AggregationScript;
  import org.apache.rocketmq.streams.window.debug.DebugWriter;
 -import org.apache.rocketmq.streams.window.model.FunctionExecutor;
  import org.apache.rocketmq.streams.window.model.WindowInstance;
  import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
 -import org.apache.rocketmq.streams.window.operator.AbstractWindow;
 -import org.apache.rocketmq.streams.window.sqlcache.impl.FiredNotifySQLElement;
  import org.apache.rocketmq.streams.window.state.WindowBaseValue;
  import org.apache.rocketmq.streams.window.state.impl.WindowValue;
 -import org.apache.rocketmq.streams.window.storage.IWindowStorage;
 -import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
 -import org.apache.rocketmq.streams.window.storage.WindowStorage.WindowBaseValueIterator;
 -
 -public class WindowOperator extends AbstractShuffleWindow {
 +import org.apache.rocketmq.streams.window.storage.IteratorWrap;
 +import org.apache.rocketmq.streams.window.storage.RocksdbIterator;
 +import org.apache.rocketmq.streams.window.storage.WindowType;
  
- import java.util.*;
 -    private static final String ORDER_BY_SPLIT_NUM = "_order_by_split_num_";//key=_order;queueid,windowinstanceid,partitionNum
++import java.util.ArrayList;
++import java.util.Comparator;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.stream.Collectors;
  
 +public class WindowOperator extends AbstractShuffleWindow {
      public WindowOperator() {
          super();
      }
  
 -    protected transient boolean supportQuickStoreModel=false;
 -    protected transient List<String> schema=new ArrayList<>();
 +    @Override
 +    public int doFireWindowInstance(WindowInstance instance) {
 +        String windowInstanceId = instance.getWindowInstanceId();
 +        String queueId = instance.getSplitId();
  
 -    @Deprecated
 -    public WindowOperator(String timeFieldName, int windowPeriodMinute) {
 -        super();
 -        super.timeFieldName = timeFieldName;
 -        super.sizeInterval = windowPeriodMinute;
 -    }
 +        RocksdbIterator<WindowBaseValue> rocksdbIterator = storage.getWindowBaseValue(queueId, windowInstanceId, WindowType.NORMAL_WINDOW, null);
  
 -    @Deprecated
 -    public WindowOperator(String timeFieldName, int windowPeriodMinute, String calFieldName) {
 -        super();
 -        super.timeFieldName = timeFieldName;
 -        super.sizeInterval = windowPeriodMinute;
 -    }
 +        ArrayList<WindowValue> windowValues = new ArrayList<>();
 +        while (rocksdbIterator.hasNext()) {
 +            IteratorWrap<WindowBaseValue> next = rocksdbIterator.next();
 +            WindowValue data = (WindowValue)next.getData();
 +            windowValues.add(data);
 +        }
 +
 +        windowValues.sort(Comparator.comparingLong(WindowBaseValue::getPartitionNum));
  
 -    public WindowOperator(int sizeInterval, String groupByFieldName, Map<String, String> select) {
 -        this.sizeInterval = sizeInterval;
 -        this.slideInterval = sizeInterval;
 -        this.groupByFieldName = groupByFieldName;
 -        this.setSelectMap(select);
 +        int fireCount = sendBatch(windowValues, queueId, 0);
 +
 +        clearFire(instance);
 +
 +        return fireCount;
      }
  
 -    protected transient AtomicInteger shuffleCount = new AtomicInteger(0);
 -    protected transient AtomicInteger fireCountAccumulator = new AtomicInteger(0);
  
 -    protected transient AtomicLong fireCost=new AtomicLong(0);
 -    @Override
 -    public int fireWindowInstance(WindowInstance instance, String queueId, Map<String, String> queueId2Offset) {
 -        List<WindowValue> windowValues = new ArrayList<>();
 -        int fireCount = 0;
 -        //long startTime = System.currentTimeMillis();
 -        //int sendCost = 0;
 -      //  int currentCount = 0;
 -        //for(String queueId:currentQueueIds){
 -        WindowBaseValueIterator<WindowBaseValue> it = storage.loadWindowInstanceSplitData(getOrderBypPrefix(), queueId, instance.createWindowInstanceId(), null, getWindowBaseValueClass());
 -        if (queueId2Offset != null) {
 -            String offset = queueId2Offset.get(queueId);
 -            if (StringUtil.isNotEmpty(offset)) {
 -                it.setPartitionNum(Long.valueOf(offset));
 -            }
 +    private int sendBatch(List<WindowValue> windowValues, String queueId, int fireCount) {
 +        if (windowValues == null || windowValues.size() == 0) {
 +            return fireCount;
          }
 -        while (it.hasNext()) {
 -            WindowBaseValue windowBaseValue = it.next();
 -            if (windowBaseValue == null) {
 -                continue;
 -            }
 -            WindowValue windowValue = (WindowValue) windowBaseValue;
 -
 -            Integer currentValue = getValue(windowValue, "total");
 -
 -            fireCountAccumulator.addAndGet(currentValue);
 -            windowValues.add((WindowValue) windowBaseValue);
 -            if (windowValues.size() >= windowCache.getBatchSize()) {
 -                long sendFireCost = System.currentTimeMillis();
 -                sendFireMessage(windowValues, queueId);
 -                //sendCost += (System.currentTimeMillis() - sendFireCost);
 -                fireCount += windowValues.size();
 -                windowValues = new ArrayList<>();
 -            }
  
 -        }
 -        if (windowValues.size() > 0) {
 -            long sendFireCost = System.currentTimeMillis();
 +        if (windowValues.size() <= windowCache.getBatchSize()) {
              sendFireMessage(windowValues, queueId);
 -         //   sendCost += (System.currentTimeMillis() - sendFireCost);
 +
              fireCount += windowValues.size();
 +
 +            return fireCount;
 +        } else {
 +            ArrayList<WindowValue> temp = new ArrayList<>();
 +            for (int i = 0; i < windowCache.getBatchSize(); i++) {
 +                temp.add(windowValues.remove(i));
 +            }
 +
 +            sendFireMessage(temp, queueId);
 +
 +            return sendBatch(windowValues, queueId, fireCount + windowCache.getBatchSize());
          }
 -        clearFire(instance);
 -        this.sqlCache.addCache(new FiredNotifySQLElement(queueId, instance.createWindowInstanceId()));
 -        //long cost= this.fireCost.addAndGet(System.currentTimeMillis()-startTime);
 -      //  System.out.println("fire cost is "+cost+"   "+ DateUtil.getCurrentTimeString());
 -        return fireCount;
      }
  
 -    protected transient Map<String, Integer> shuffleWindowInstanceId2MsgCount = new HashMap<>();
 -    protected transient int windowvaluecount = 0;
 -    protected transient AtomicLong shuffleCost=new AtomicLong(0);
 +
      @Override
      public void shuffleCalculate(List<IMessage> messages, WindowInstance instance, String queueId) {
+         Long startTime=System.currentTimeMillis();
          DebugWriter.getDebugWriter(getConfigureName()).writeShuffleCalcultateReceveMessage(instance, messages, queueId);
 +
          List<String> sortKeys = new ArrayList<>();
          Map<String, List<IMessage>> groupBy = groupByGroupName(messages, sortKeys);
 -        Set<String> groupByKeys = groupBy.keySet();
 -        List<String> storeKeys = new ArrayList<>();
 -        for (String groupByKey : groupByKeys) {
 -            String storeKey = createStoreKey(queueId, groupByKey, instance);
 -            storeKeys.add(storeKey);
 +
 +        RocksdbIterator<WindowBaseValue> windowBaseValue = storage.getWindowBaseValue(queueId, instance.getWindowInstanceId(), WindowType.NORMAL_WINDOW, null);
 +
 +        ArrayList<WindowBaseValue> windowValues = new ArrayList<>();
 +        while (windowBaseValue.hasNext()) {
 +            IteratorWrap<WindowBaseValue> next = windowBaseValue.next();
 +            windowValues.add(next.getData());
          }
 -        Map<String, WindowBaseValue> allWindowValues = new HashMap<>();
 -        //从存储中,查找window value对象,value是对象的json格式
 -        Map<String, WindowBaseValue> existWindowValues = storage.multiGet(getWindowBaseValueClass(), storeKeys, instance.createWindowInstanceId(), queueId);
 -        //  Iterator<Entry<String, List<IMessage>>> it = groupBy.entrySet().iterator();
 -        for (String groupByKey : sortKeys) {
  
 +        Map<String, List<WindowValue>> temp = windowValues.stream().map((value) -> (WindowValue) value).collect(Collectors.groupingBy(WindowValue::getMsgKey));
 +
 +        Map<String, List<WindowValue>> groupByMsgKey = new HashMap<>(temp);
 +
 +        List<WindowValue> allWindowValues = new ArrayList<>();
 +
 +        //处理不同groupBy的message
 +        for (String groupByKey : sortKeys) {
              List<IMessage> msgs = groupBy.get(groupByKey);
              String storeKey = createStoreKey(queueId, groupByKey, instance);
 -            WindowValue windowValue = (WindowValue) existWindowValues.get(storeKey);
 -            ;
 -            if (windowValue == null) {
 -                windowvaluecount++;
 +
 +            //msgKey 为唯一键
 +            List<WindowValue> windowValueList = groupByMsgKey.get(storeKey);
 +            WindowValue windowValue;
 +            if (windowValueList == null || windowValueList.size() == 0) {
                  windowValue = createWindowValue(queueId, groupByKey, instance);
 -                // windowValue.setOrigOffset(msgs.get(0).getHeader().getOffset());
 +            } else {
 +                windowValue = windowValueList.get(0);
              }
 -            allWindowValues.put(storeKey, windowValue);
 -            windowValue.incrementUpdateVersion();
  
 -            Integer origValue = getValue(windowValue, "total");
 +            allWindowValues.add(windowValue);
 +            windowValue.incrementUpdateVersion();
  
              if (msgs != null) {
                  for (IMessage message : msgs) {
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index 9c1c0af6,5eb3b784..f356a286
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@@ -60,11 -62,16 +60,10 @@@ public class JoinWindow extends Abstrac
  
      protected String joinType;//join类型,值为INNER,LEFT
      protected String expression;//条件表达式。在存在非等值比较时使用
 -    protected transient DBOperator joinOperator = new DBOperator();
 -    protected String rightDependentTableName;
 -    //    @Override
 -    //    protected void addPropertyToMessage(IMessage oriMessage, JSONObject oriJson){
 -    //        oriJson.put("AbstractWindow", this);
 -    //
 -    //    }
  
- 
      @Override
 -    protected int fireWindowInstance(WindowInstance instance, String shuffleId, Map<String, String> queueId2Offsets) {
 +    protected int doFireWindowInstance(WindowInstance instance) {
 +        //todo 只是清理吗?
          clearFire(instance);
          return 0;
      }
@@@ -359,8 -422,18 +358,8 @@@
      }
  
      @Override
-     protected String generateShuffleKey(IMessage message) {
+     public String generateShuffleKey(IMessage message) {
 -        String routeLabel =null;
 -        String lable = message.getHeader().getMsgRouteFromLable();
 -        if (lable != null) {
 -            if (lable.equals(rightDependentTableName)) {
 -                routeLabel = MessageHeader.JOIN_RIGHT;
 -            } else {
 -                routeLabel = MessageHeader.JOIN_LEFT;
 -            }
 -        } else {
 -            throw new RuntimeException("can not dipatch message, need route label " + toJson());
 -        }
 +        String routeLabel = message.getHeader().getMsgRouteFromLable();
          String messageKey = generateKey(message.getMessageBody(), routeLabel, leftJoinFieldNames, rightJoinFieldNames);
          return messageKey;
      }
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
index 6c4659b3,d0e499f2..0a3845fd
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
@@@ -54,11 -55,12 +55,12 @@@ public abstract class AbstractSystemCha
      protected static final String CHANNEL_PROPERTY_KEY_PREFIX = "CHANNEL_PROPERTY_KEY_PREFIX";
      protected static final String CHANNEL_TYPE = "CHANNEL_TYPE";
  
-     protected ISource consumer;
+     protected ISource<?> consumer;
      protected AbstractSupportShuffleSink producer;
      protected Map<String, String> channelConfig = new HashMap<>();
 -    protected volatile boolean hasCreateShuffleChannel = false;
 +    protected boolean hasCreateShuffleChannel = false;
  
+     protected transient AtomicBoolean hasStart = new AtomicBoolean(false);
      public void startChannel() {
          if (consumer == null) {
              return;
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java
index 5139f707,054a418d..7b4d8b4f
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java
@@@ -22,11 -22,11 +22,11 @@@ import java.util.Collections
  import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
 -import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.Future;
- 
  import org.apache.commons.lang3.tuple.Pair;
+ import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
  import org.apache.rocketmq.streams.common.context.IMessage;
 -import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
 +import org.apache.rocketmq.streams.common.context.MessageOffset;
  import org.apache.rocketmq.streams.window.debug.DebugWriter;
  import org.apache.rocketmq.streams.window.model.WindowCache;
  import org.apache.rocketmq.streams.window.model.WindowInstance;
@@@ -36,9 -37,8 +36,9 @@@ import org.apache.rocketmq.streams.wind
  /**
   * save receiver messages into cachefilter when checkpoint/autoflush/flush, process cachefilter message
   */
- public class ShuffleCache extends WindowCache {
+ public class ShuffleCache extends AbstractSink {
      protected AbstractShuffleWindow window;
 +    private HashMap<String, Boolean> hasLoad = new HashMap<>();
  
      public ShuffleCache(AbstractShuffleWindow window) {
          this.window = window;
@@@ -64,64 -54,23 +64,65 @@@
          for (Pair<String, String> queueIdAndInstanceKey : keys) {
              String queueId = queueIdAndInstanceKey.getLeft();
              String windowInstanceId = queueIdAndInstanceKey.getRight();
 +
              List<IMessage> messages = instance2Messages.get(queueIdAndInstanceKey);
 +
              WindowInstance windowInstance = windowInstanceMap.get(windowInstanceId);
 +
              DebugWriter.getDebugWriter(window.getConfigureName()).writeShuffleReceive(window, messages, windowInstance);
 +
 +            stateMustLoad(queueId);
 +
              window.shuffleCalculate(messages, windowInstance, queueId);
 +
 +            //保存处理进度
              saveSplitProgress(queueId, messages);
+             window.saveMsgContext(queueId,windowInstance,messages);
          }
-         return true;
+             return true;
      }
  
 +    private void stateMustLoad(String queueId) {
 +        Boolean load = this.hasLoad.get(queueId);
 +        if (load != null && load) {
 +            return;
 +        }
 +
 +        //在计算之前需要异步加载状态完成
 +        HashMap<String, Future<?>> loadResult = this.window.getShuffleChannel().getLoadResult();
 +        Future<?> future = loadResult.get(queueId);
 +
 +        if (future == null) {
 +            return;
 +        }
 +
 +        try {
 +            long before = System.currentTimeMillis();
 +            future.get();
 +            long after = System.currentTimeMillis();
 +
 +            System.out.println("message wait before state recover:[" + (after - before) + "] ms, queueId=" + queueId);
 +
 +            for (String loadQueueId : loadResult.keySet()) {
 +                hasLoad.put(loadQueueId, true);
 +            }
 +        } catch (Throwable t) {
 +            throw new RuntimeException("check remote with queueId:" + queueId + ",error", t);
 +        }
 +    }
 +
      /**
 -     * save consumer progress(offset)for groupby  source queueId
 +     * save consumer progress(offset)for groupby  source shuffleId
 +     * window configName: name_window_10001
 +     * shuffleId: shuffle_NormalTestTopic_namespace_name_broker-a_001
 +     * oriQueueId: NormalTestTopic2_broker-a_000
       *
 -     * @param queueId
 +     * @param shuffleId
       * @param messages
       */
 -    protected void saveSplitProgress(String queueId, List<IMessage> messages) {
 +    protected void saveSplitProgress(String shuffleId, List<IMessage> messages) {
 +        IStorage delegator = this.window.getStorage();
 +
          Map<String, String> queueId2OrigOffset = new HashMap<>();
          Boolean isLong = false;
          for (IMessage message : messages) {
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index f9bd3413,9280a2e4..d834af41
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@@ -18,7 -18,18 +18,8 @@@ package org.apache.rocketmq.streams.win
  
  import com.alibaba.fastjson.JSONArray;
  import com.alibaba.fastjson.JSONObject;
 -import java.util.ArrayList;
 -import java.util.HashMap;
 -import java.util.HashSet;
 -import java.util.List;
 -import java.util.Map;
 -import java.util.Properties;
 -import java.util.Set;
 -import java.util.concurrent.ConcurrentHashMap;
 -import java.util.concurrent.atomic.AtomicBoolean;
 -import java.util.concurrent.atomic.AtomicLong;
  import org.apache.commons.lang3.StringUtils;
+ import org.apache.commons.lang3.tuple.MutablePair;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
@@@ -42,7 -57,10 +46,8 @@@ import org.apache.rocketmq.streams.comm
  import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
  import org.apache.rocketmq.streams.common.utils.StringUtil;
  import org.apache.rocketmq.streams.common.utils.TraceUtil;
 -import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
  import org.apache.rocketmq.streams.window.debug.DebugWriter;
+ import org.apache.rocketmq.streams.window.minibatch.MiniBatchMsgCache;
  import org.apache.rocketmq.streams.window.model.WindowCache;
  import org.apache.rocketmq.streams.window.model.WindowInstance;
  import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
@@@ -77,11 -90,17 +85,13 @@@ public class ShuffleChannel extends Abs
  
      protected ShuffleCache shuffleCache;
  
-     protected Map<String, ISplit> queueMap = new ConcurrentHashMap<>();
-     protected List<ISplit> queueList;//所有的分片
+     protected Map<String, ISplit<?, ?>> queueMap = new ConcurrentHashMap<>();
+     /**
+      * 所有的分片
+      */
+     protected List<ISplit<?, ?>> queueList;
  
-     // protected NotifyChannel notfiyChannel;//负责做分片的通知管理
      protected AbstractShuffleWindow window;
 -    /**
 -     * 当前管理的分片
 -     */
 -    private Set<String> currentQueueIds;
  
      protected transient boolean isWindowTest = false;
  
@@@ -116,28 -126,16 +117,16 @@@
       * init shuffle channel
       */
      public void init() {
-         this.consumer = createSource(window.getNameSpace(), window.getConfigureName());
- 
-         this.producer = createSink(window.getNameSpace(), window.getConfigureName());
-         if (this.consumer == null || this.producer == null) {
-             autoCreateShuffleChannel(window.getFireReceiver().getPipeline());
-         }
-         if (this.consumer == null) {
-             return;
-         }
-         if (this.consumer instanceof AbstractSource) {
-             ((AbstractSource) this.consumer).setJsonData(true);
-         }
+         init(this.window);
          if (producer != null && (queueList == null || queueList.size() == 0)) {
              queueList = producer.getSplitList();
-             Map<String, ISplit> tmp = new ConcurrentHashMap<>();
-             for (ISplit queue : queueList) {
+             Map<String, ISplit<?, ?>> tmp = new ConcurrentHashMap<>();
+             for (ISplit<?, ?> queue : queueList) {
                  tmp.put(queue.getQueueId(), queue);
              }
- 
              this.queueMap = tmp;
          }
 -        isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
 +//        isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
      }
  
      /**
@@@ -467,8 -505,11 +455,8 @@@
          return splitNum > 0 ? splitNum : 32;
      }
  
 -    public Set<String> getCurrentQueueIds() {
 -        return currentQueueIds;
 -    }
  
-     public List<ISplit> getQueueList() {
+     public List<ISplit<?, ?>> getQueueList() {
          return queueList;
      }
  
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
index b012b175,69b6494a..e3bb3f83
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
@@@ -287,7 -287,7 +287,6 @@@ public class WindowTrigger extends Abst
              }
          });
          for (WindowInstance windowInstance : windowInstanceList) {
-             System.out.println("fire by finish flag");
 -          //  System.out.println("fire by finish flag");
              fireWindowInstance(windowInstance);
          }
      }