You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/09/25 00:56:14 UTC

[rocketmq-streams] branch main updated: Add a quick start which contain some examples. (#68)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 3612f79  Add a quick start which contain some examples. (#68)
3612f79 is described below

commit 3612f79663a372e7a06e4889bbf473865eae469e
Author: Ni Ze <31...@users.noreply.github.com>
AuthorDate: Sat Sep 25 08:56:06 2021 +0800

    Add a quick start which contain some examples. (#68)
    
    * a runnable window example reading data from rocketmq.
    
    * add quick start
    
    * modify quick_start
    
    * modify example.
    
    * modify and add another pageclick example.
    
    * remove private path
    
    * fix spelling mistakes
    
    * use mysql as remote checkpoint storage
    
    * modify code style of import class
    
    * modify annotation
    
    Co-authored-by: nize <un...@gmail.com>
---
 QUICKSTART.md                                      |  81 -----------
 README.md                                          |   2 +-
 pom.xml                                            |  12 +-
 quick_start.md                                     |  71 +++++++++
 .../apache/rocketmq/streams/sink/RocketMQSink.java |   1 +
 .../rocketmq/streams/source/RocketMQSource.java    |   8 +-
 rocketmq-streams-clients/pom.xml                   |   4 +
 .../rocketmq/streams/client/DataStreamAction.java  |   9 +-
 .../rocketmq/streams/client/DataStreamTest.java    |   6 +-
 .../streams/client/windows/AbstractWindowTest.java |  19 +--
 .../resources/{window_msg_10 => window_msg_10.txt} |   0
 .../{window_msg_100 => window_msg_100.txt}         |   0
 .../{window_msg_1000 => window_msg_1000.txt}       |   0
 .../{window_msg_10000 => window_msg_10000.txt}     |   0
 .../{window_msg_88121 => window_msg_88121.txt}     |   0
 .../common/channel/impl/OutputPrintChannel.java    |   7 +-
 .../common/channel/impl/file/FileSource.java       |  22 ++-
 .../streams/common/datatype/DateDataType.java      |  13 +-
 rocketmq-streams-dbinit/pom.xml                    |   3 -
 .../src/main/resources/tables_mysql_innodb.sql     |  19 +--
 rocketmq-streams-examples/README.md                | 159 +++++++++++++++++++++
 rocketmq-streams-examples/pom.xml                  |  48 +++++--
 .../examples/checkpoint/RemoteCheckpointTest.java  |  96 +++++++++++++
 .../examples/filesource/FileSourceExample.java     |   2 +-
 .../streams/examples/pageclick/PageDimension.java  |  81 +++++++++++
 .../streams/examples/pageclick/UsersDimension.java |  67 +++++++++
 .../streams/examples/rocketmqsource/Constant.java  |  27 ++++
 .../examples/rocketmqsource/ProducerFromFile.java  |  99 +++++++++++++
 .../rocketmqsource/RocketMQSourceExample1.java     |  24 +++-
 .../rocketmqsource/RocketMQSourceExample2.java     |  27 ++--
 .../rocketmqsource/RocketMQSourceExample3.java     |  43 ++++--
 .../rocketmqsource/RocketmqWindowTest.java         |  30 ++--
 .../src/main/resources/data.txt                    |  11 ++
 .../src/main/resources/pageClickData.txt           |  11 ++
 .../streams/window/fire/EventTimeManager.java      |   8 +-
 .../streams/window/fire/SplitEventTimeManager.java |  20 +--
 .../window/operator/impl/WindowOperator.java       |  79 +++++-----
 .../streams/window/shuffle/ShuffleChannel.java     |  28 ++--
 38 files changed, 883 insertions(+), 254 deletions(-)

diff --git a/QUICKSTART.md b/QUICKSTART.md
deleted file mode 100644
index f265669..0000000
--- a/QUICKSTART.md
+++ /dev/null
@@ -1,81 +0,0 @@
-# Quick Start
-
-本文档详细介绍了如何在rocketmq上执行流计算任务;
-
-## 所需环境
-
-+ 64bit OS, Linux/Unix/Mac is recommended;(Windows user see guide below)
-+ 64bit JDK 1.8+;
-+ Maven 3.2.X
-
-## 使用步骤
-
-### 1. 创建maven项目, 并依赖rocketmq-streams的客户端
-
-```xml
-
-<dependency>
-    <groupId>org.apache.rocketmq</groupId>
-    <artifactId>rocketmq-streams-clients</artifactId>
-    <version>1.0.0-SNAPSHOT</version>
-</dependency>
-```
-
-### 2. 在主函数中按照streams的开发规范,编写业务逻辑
-
-```java
-import org.apache.rocketmq.streams.client.transform.DataStream;
-
-public static void main(String[]args){
-    DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
-
-    source
-    .fromFile("~/admin/data/text.txt",false)
-    .map(message->message)
-    .toPrint(1)
-    .start();
-    }
-```
-
-### 3. 在pom.xml中加入shade插件, 将依赖的stream与业务代码一并打包, 形成-shaded.jar 包
-
-```xml
-
-<plugin>
-    <groupId>org.apache.maven.plugins</groupId>
-    <artifactId>maven-shade-plugin</artifactId>
-    <version>3.2.1</version>
-    <executions>
-        <execution>
-            <phase>package</phase>
-            <goals>
-                <goal>shade</goal>
-            </goals>
-            <configuration>
-                <minimizeJar>false</minimizeJar>
-                <shadedArtifactAttached>true</shadedArtifactAttached>
-                <artifactSet>
-                    <includes>
-                        <include>org.apache.rocketmq:rocketmq-streams-clients</include>
-                    </includes>
-                </artifactSet>
-            </configuration>
-        </execution>
-    </executions>
-</plugin>
-
-```
-
-### 4. 将jar包拷贝到应用服务器,作为普通的java应用直接运行
-
-```
-   java -jar XXXX-shade.jar \ 
-        -Dlog4j.level=ERROR \
-        -Dlog4j.home=/logs  \
-        -Xms1024m \
-        -Xmx1024m 
-```
-
-
-
-
diff --git a/README.md b/README.md
index 36b7270..d949a7b 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@
 [![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
 
 ## [中文文档](./README-Chinese.md)
-## [Quick Start](./QUICKSTART.md)
+## [Quick Start](./quick_start.md)
 
 
 ## Features
diff --git a/pom.xml b/pom.xml
index 5fbb574..1d2a636 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,7 +69,7 @@
         <commons-logging.version>1.1</commons-logging.version>
         <spring.version>3.2.13.RELEASE</spring.version>
         <auto-service.version>1.0-rc5</auto-service.version>
-        <mysql-connector.version>5.1.40</mysql-connector.version>
+        <mysql-connector.version>8.0.26</mysql-connector.version>
         <fastjson.version>1.2.78</fastjson.version>
         <quartz.version>2.2.1</quartz.version>
         <httpclient.version>4.5.2</httpclient.version>
@@ -112,13 +112,13 @@
                         <exclude>.asf.yaml</exclude>
                         <exclude>README.md</exclude>
                         <exclude>README-Chinese.md</exclude>
-                        <exclude>QUICKSTART.md</exclude>
+                        <exclude>quick_start.md</exclude>
                         <exclude>.github/**</exclude>
                         <exclude>*/target/**</exclude>
                         <exclude>*/*.iml</exclude>
                         <exclude>**/*.txt</exclude>
                         <exclude>**/*.cs</exclude>
-                        <exclude>src/test/resources/window_msg_*</exclude>
+                        <exclude>**/*.sql</exclude>
                     </excludes>
                 </configuration>
             </plugin>
@@ -283,6 +283,12 @@
                 <artifactId>rocketmq-streams-channel-rocketmq</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-streams-dbinit</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
 
             <!-- ================================================= -->
             <!-- rocketmq library -->
diff --git a/quick_start.md b/quick_start.md
new file mode 100644
index 0000000..6f6a314
--- /dev/null
+++ b/quick_start.md
@@ -0,0 +1,71 @@
+## rocketmq-streams 快速搭建
+---
+### 前言
+本文档主要介绍如何基于rocketmq-streams快速搭建流处理任务,搭建过程中某些例子会用到rocketmq,可以参考[rocketmq搭建文档](https://rocketmq.apache.org/docs/quick-start/)
+
+
+### 1、源码构建
+
+#### 1.1、构建环境
+ - JDK 1.8 and above
+ - Maven 3.2 and above
+
+#### 1.2、构建Rocketmq-streams
+
+`git clone https://github.com/apache/rocketmq-streams.git`
+`cd rocketmq-streams`
+`mvn clean -DskipTests  install -U`
+
+
+### 2、基于rocketmq-streams创建应用
+
+#### 2.1、pom依赖
+```xml
+<dependency>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-streams-clients</artifactId>
+</dependency>
+```
+#### 2.2、shade clients依赖包
+```xml
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.2.1</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <minimizeJar>false</minimizeJar>
+                            <shadedArtifactAttached>true</shadedArtifactAttached>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.rocketmq:rocketmq-streams-clients</include>
+                                </includes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+```
+
+#### 2.3、编写业务代码
+Please see the [rocketmq-streams-examples](rocketmq-streams-examples/README.md)
+#### 2.4、运行
+- 前提:在从rocketmq中读取数据做流处理时,需要运行topic在rocketmq中自动创建,因为做groupBy操作时,需要用到rocketmq作为shuffle数据的读写目的地。
+- 命令:
+```
+   java -jar XXXX-shade.jar \ 
+        -Dlog4j.level=ERROR \
+        -Dlog4j.home=/logs  \
+        -Xms1024m \
+        -Xmx1024m 
+```
+
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
index 6e18ba7..eb8134d 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.CommandUtil;
 
+
 public class RocketMQSink extends AbstractSupportShuffleSink {
 
     private static final Log LOG = LogFactory.getLog(RocketMQSink.class);
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index b45df86..34dc221 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -41,12 +41,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.streams.common.channel.source.AbstractSupportOffsetResetSource;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.context.AbstractContext;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
 import org.apache.rocketmq.streams.common.utils.ReflectUtil;
-import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
 import org.apache.rocketmq.streams.debug.DebugWriter;
 import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -60,7 +55,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class RocketMQSource extends AbstractSupportOffsetResetSource {
@@ -134,6 +128,8 @@ public class RocketMQSource extends AbstractSupportOffsetResetSource {
                     for (MessageExt msg : msgs) {
                         String data = new String(msg.getBody(), CHARSET);
                         JSONObject jsonObject = create(data);
+
+
                         String queueId = RocketMQMessageQueue.getQueueId(context.getMessageQueue());
                         String offset = msg.getQueueOffset() + "";
                         org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, queueId, offset, false);
diff --git a/rocketmq-streams-clients/pom.xml b/rocketmq-streams-clients/pom.xml
index 5632fd6..dfe0986 100644
--- a/rocketmq-streams-clients/pom.xml
+++ b/rocketmq-streams-clients/pom.xml
@@ -53,6 +53,10 @@
             <artifactId>rocketmq-streams-window</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-streams-dbinit</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <version>1.7.26</version>
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
index 0a052fa..a8b99fa 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
@@ -18,11 +18,6 @@
 package org.apache.rocketmq.streams.client;
 
 import com.google.common.collect.Maps;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
 import org.apache.rocketmq.streams.client.strategy.Strategy;
 import org.apache.rocketmq.streams.client.transform.DataStream;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
@@ -32,6 +27,10 @@ import org.apache.rocketmq.streams.common.topology.ChainStage;
 import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
 import org.apache.rocketmq.streams.configurable.ConfigurableComponent;
 
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
 public class DataStreamAction extends DataStream {
 
     private final Map<String, Object> properties = Maps.newHashMap();
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
index 49e8559..2684c18 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
@@ -28,7 +28,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.Serializable;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 
 public class DataStreamTest implements Serializable {
 
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java
index 688b485..3eef4f2 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java
@@ -18,6 +18,16 @@
 package org.apache.rocketmq.streams.client.windows;
 
 import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.streams.client.transform.DataStream;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import org.apache.rocketmq.streams.common.functions.ForEachFunction;
+import org.apache.rocketmq.streams.common.functions.MapFunction;
+import org.apache.rocketmq.streams.common.utils.DateUtil;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -28,15 +38,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.rocketmq.streams.client.transform.DataStream;
-import org.apache.rocketmq.streams.client.transform.window.Time;
-import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-import org.apache.rocketmq.streams.common.functions.ForEachFunction;
-import org.apache.rocketmq.streams.common.functions.MapFunction;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
 
 import static junit.framework.TestCase.assertTrue;
 
diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_10 b/rocketmq-streams-clients/src/test/resources/window_msg_10.txt
similarity index 100%
rename from rocketmq-streams-clients/src/test/resources/window_msg_10
rename to rocketmq-streams-clients/src/test/resources/window_msg_10.txt
diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_100 b/rocketmq-streams-clients/src/test/resources/window_msg_100.txt
similarity index 100%
rename from rocketmq-streams-clients/src/test/resources/window_msg_100
rename to rocketmq-streams-clients/src/test/resources/window_msg_100.txt
diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_1000 b/rocketmq-streams-clients/src/test/resources/window_msg_1000.txt
similarity index 100%
rename from rocketmq-streams-clients/src/test/resources/window_msg_1000
rename to rocketmq-streams-clients/src/test/resources/window_msg_1000.txt
diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_10000 b/rocketmq-streams-clients/src/test/resources/window_msg_10000.txt
similarity index 100%
rename from rocketmq-streams-clients/src/test/resources/window_msg_10000
rename to rocketmq-streams-clients/src/test/resources/window_msg_10000.txt
diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_88121 b/rocketmq-streams-clients/src/test/resources/window_msg_88121.txt
similarity index 100%
rename from rocketmq-streams-clients/src/test/resources/window_msg_88121
rename to rocketmq-streams-clients/src/test/resources/window_msg_88121.txt
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
index 8c4f63c..ebb9415 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
@@ -16,11 +16,10 @@
  */
 package org.apache.rocketmq.streams.common.channel.impl;
 
-import java.util.List;
-
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.utils.PrintUtil;
+
+import java.util.List;
 
 /**
  * 测试使用,输出就是把消息打印出来
@@ -30,7 +29,7 @@ public class OutputPrintChannel extends AbstractSink {
     @Override
     protected boolean batchInsert(List<IMessage> messages) {
         for (IMessage msg : messages) {
-            //System.out.println(msg.getMessageValue());
+            System.out.println(msg.getMessageValue());
         }
         return false;
     }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileSource.java
index f8fcbcd..3add3df 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileSource.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
+import java.net.URL;
 import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -55,7 +56,7 @@ public class FileSource extends AbstractBatchSource {
     @Override
     protected boolean initConfigurable() {
         super.initConfigurable();
-        File file = new File(filePath);
+        File file = getFile(filePath);
         if (file.exists() && file.isDirectory()) {
             executorService = new ThreadPoolExecutor(maxThread, maxThread,
                 0L, TimeUnit.MILLISECONDS,
@@ -64,6 +65,23 @@ public class FileSource extends AbstractBatchSource {
         return true;
     }
 
+    private File getFile(String filePath) {
+        File file = new File(filePath);
+        if (!file.exists()) {
+            ClassLoader loader = getClass().getClassLoader();
+            URL url = loader.getResource(filePath);
+
+            if (url != null) {
+                String path = url.getFile();
+                file = new File(path);
+                this.filePath = path;
+            }
+        }
+        return file;
+
+
+    }
+
     @Override
     protected boolean startSource() {
 
@@ -97,7 +115,7 @@ public class FileSource extends AbstractBatchSource {
      */
     protected LinkedBlockingQueue<FileIterator> createIteratorList() {
         LinkedBlockingQueue<FileIterator> iterators = new LinkedBlockingQueue<>(1000);
-        File file = new File(filePath);
+        File file = getFile(filePath);
         if (file.exists() == false) {
             return null;
         }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java
index 34d5536..41a113c 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java
@@ -17,13 +17,16 @@
 package org.apache.rocketmq.streams.common.datatype;
 
 import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.utils.NumberUtils;
+
 import java.sql.Timestamp;
 import java.text.ParsePosition;
 import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.Date;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.utils.NumberUtils;
 
 public class DateDataType extends BaseDataType<Date> {
 
@@ -85,6 +88,10 @@ public class DateDataType extends BaseDataType<Date> {
         if (Timestamp.class.isInstance(object)) {
             return new Date(((Timestamp)object).getTime());
         }
+        if (object instanceof LocalDateTime) {
+            LocalDateTime tempTime = (LocalDateTime) object;
+            return Date.from(tempTime.atZone(ZoneId.systemDefault()).toInstant());
+        }
         return super.convert(object);
     }
 
diff --git a/rocketmq-streams-dbinit/pom.xml b/rocketmq-streams-dbinit/pom.xml
index 1eba812..14f5679 100644
--- a/rocketmq-streams-dbinit/pom.xml
+++ b/rocketmq-streams-dbinit/pom.xml
@@ -31,13 +31,10 @@
         <resources>
             <resource>
                 <directory>src/main/resources</directory>
-
                 <includes>
                     <include>**/*.sql</include>
                     <include>**/*.properties</include>
                 </includes>
-
-
                 <filtering>true</filtering>
             </resource>
         </resources>
diff --git a/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql b/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql
index 0c6bc81..d6c23ee 100644
--- a/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql
+++ b/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql
@@ -1,24 +1,9 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 CREATE TABLE IF NOT EXISTS  `window_max_value` (
   `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
   `gmt_create` datetime NOT NULL,
   `gmt_modified` datetime NOT NULL,
+  `max_offset` varchar(20) NOT NULL,
+  `is_max_offset_long` int(11) DEFAULT NULL,
   `max_value` bigint(20) unsigned NOT NULL,
   `max_event_time` bigint(20) unsigned NOT NULL,
   `msg_key` varchar(256) NOT NULL,
diff --git a/rocketmq-streams-examples/README.md b/rocketmq-streams-examples/README.md
new file mode 100644
index 0000000..53bb503
--- /dev/null
+++ b/rocketmq-streams-examples/README.md
@@ -0,0 +1,159 @@
+## rocketmq-streams-examples
+
+### 1、fileSource example
+逐行读取文件数据,并打印出来。
+```java
+public class FileSourceExample {
+    public static void main(String[] args) {
+        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+        source.fromFile("data.txt", false)
+                .map(message -> message)
+                .toPrint(1)
+                .start();
+    }
+}
+
+```
+
+
+### 2、分时间段,统计分组中某字段的和
+
+#### 2.1 安装rocketmq
+可以参考[rocketmq搭建文档](https://rocketmq.apache.org/docs/quick-start/)
+
+#### 2.2 源数据
+[源数据](./../rocketmq-streams-examples/src/main/resources/data.txt)
+```xml
+{"InFlow":"1","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"0"}
+{"InFlow":"2","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"1"}
+{"InFlow":"3","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"2"}
+{"InFlow":"4","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"3"}
+{"InFlow":"5","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"4"}
+{"InFlow":"6","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"5"}
+{"InFlow":"7","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"6"}
+{"InFlow":"8","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"7"}
+{"InFlow":"9","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"8"}
+{"InFlow":"10","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"9"}
+```
+
+#### 2.3 代码示例
+
+[代码示例](./../rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java)
+
+
+#### 2.4 结果说明
+这个例子中,使用rocketmq-streams消费rocketmq中的数据,并按照ProjectName和LogStore两个字段联合分组统计,两个字段的值相同,分为一组。
+分别统计每组的InFlow和OutFlow两字段累计和。
+
+data.text数据运行的结果部分如下:
+
+```xml
+"InFlow":22,"total":4,"ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":18
+"InFlow":18,"total":3,"ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":15
+"InFlow":15,"total":3,"ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":12
+```
+可见"ProjectName":"ProjectName-0","LogStore":"LogStore-0"分组公有4条数据,"ProjectName":"ProjectName-2","LogStore":"LogStore-2",3条数据。
+"ProjectName":"ProjectName-1","LogStore":"LogStore-1"分组3条数据,总共10条数据。结果与源数据一致。
+
+### 3、网页点击统计
+#### 3.1、数据说明
+原始数据为resources路径下的[pageClickData.txt](./../rocketmq-streams-examples/src/main/resources/pageClickData.txt)
+
+第一列是用户id,第二列是用户点击时间,最后一列是网页地址
+```xml
+{"userId":"1","eventTime":"1631700000000","method":"GET","url":"page-1"}
+{"userId":"2","eventTime":"1631700030000","method":"POST","url":"page-2"}
+{"userId":"3","eventTime":"1631700040000","method":"GET","url":"page-3"}
+{"userId":"1","eventTime":"1631700050000","method":"DELETE","url":"page-2"}
+{"userId":"1","eventTime":"1631700060000","method":"DELETE","url":"page-2"}
+{"userId":"2","eventTime":"1631700070000","method":"POST","url":"page-3"}
+{"userId":"3","eventTime":"1631700080000","method":"GET","url":"page-1"}
+{"userId":"1","eventTime":"1631700090000","method":"GET","url":"page-2"}
+{"userId":"2","eventTime":"1631700100000","method":"PUT","url":"page-3"}
+{"userId":"4","eventTime":"1631700120000","method":"POST","url":"page-1"}
+```
+
+#### 3.1、统计某段时间窗口内用户点击网页次数
+[代码示例](./../rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/UsersDimension.java)
+
+结果:
+```xml
+{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"SPVGTV6DaXmxV5mGNzQixQ==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","userId":"2"}
+{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"dzAZ104qjUAwzTE6gbKSPA==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","userId":"3"}
+{"start_time":"2021-09-15 18:00:00","total":2,"windowInstanceId":"wrTTyU5DiDkrAb6669Ig9w==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","userId":"1"}
+{"start_time":"2021-09-15 18:01:00","total":1,"windowInstanceId":"vabkmx14xHsJ7G7w16vwug==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","userId":"3"}
+{"start_time":"2021-09-15 18:01:00","total":2,"windowInstanceId":"YIgEKptN2Wf+Oq2m8sEcYw==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","userId":"2"}
+{"start_time":"2021-09-15 18:01:00","total":2,"windowInstanceId":"iYKnwMYAzXFJYbO1KvDnng==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","userId":"1"}
+{"start_time":"2021-09-15 18:02:00","total":1,"windowInstanceId":"HBojuU6/2F/6llkyefECxw==","offset":53892181100000001,"end_time":"2021-09-15 18:03:00","userId":"4"}
+```
+
+在时间范围 18:00:00- 18:01:00内:
+
+|userId|点击次数|
+|------|---|
+|   1  | 2 |
+|   2  | 1 |
+|   3  | 1 |
+
+在时间范围 18:01:00- 18:02:00内:
+
+|userId|点击次数|
+|------|---|
+|   1  | 2 |
+|   2  | 2 |
+|   3  | 1 |
+
+在时间范围 18:02:00- 18:03:00内:
+
+|userId|点击次数|
+|------|---|
+|   4  | 1 | 
+
+可查看原数据文件,eventTime为时间字段,简单检查后上述结果与预期相符合。
+
+#### 3.2、统计某段时间窗口内,被点击次数最多的网页
+[代码示例](./../rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/PageDimension.java)
+
+运行结果:
+```xml
+{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"wrTTyU5DiDkrAb6669Ig9w==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","url":"page-1"}
+{"start_time":"2021-09-15 18:00:00","total":2,"windowInstanceId":"seECZRcaQSRsET1rDc6ZAw==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","url":"page-2"}
+{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"dzAZ104qjUAwzTE6gbKSPA==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","url":"page-3"}
+{"start_time":"2021-09-15 18:01:00","total":2,"windowInstanceId":"uCqvAeaLTYRnjQm8dCZOvw==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","url":"page-2"}
+{"start_time":"2021-09-15 18:01:00","total":3,"windowInstanceId":"vabkmx14xHsJ7G7w16vwug==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","url":"page-3"}
+{"start_time":"2021-09-15 18:02:00","total":1,"windowInstanceId":"NdgwYMT8azNMu55NUIvygg==","offset":53892181100000001,"end_time":"2021-09-15 18:03:00","url":"page-1"}
+
+```
+在时间窗口18:00:00 - 18:01:00 内,有4条数据;
+
+在时间窗口18:01:00 - 18:02:00 内,有5条数据;
+
+在时间窗口18:02:00 - 18:03:00 内,有1条数据;
+
+分钟统计窗口内,被点击次数最多的网页.
+得到上述数据后,需要按照窗口进行筛选最大值,需要再次计算。
+代码:
+```java
+    public void findMax() {
+        DataStreamSource source = StreamBuilder.dataStream("ns-1", "pl-1");
+        source.fromFile("/home/result.txt", false)
+        .map(message -> JSONObject.parseObject((String) message))
+        .window(TumblingWindow.of(Time.seconds(5)))
+        .groupBy("start_time","end_time")
+        .max("total")
+        .waterMark(1)
+        .setLocalStorageOnly(true)
+        .toDataSteam()
+        .toPrint(1)
+        .start();
+   }
+
+```
+得到结果:
+```xml
+{"start_time":"2021-09-17 11:09:35","total":"2","windowInstanceId":"kRRpe2hPEQtEuTkfnXUaHg==","offset":54040181100000001,"end_time":"2021-09-17 11:09:40"}
+{"start_time":"2021-09-17 11:09:35","total":"3","windowInstanceId":"kRRpe2hPEQtEuTkfnXUaHg==","offset":54040181100000002,"end_time":"2021-09-17 11:09:40"}
+{"start_time":"2021-09-17 11:09:35","total":"1","windowInstanceId":"kRRpe2hPEQtEuTkfnXUaHg==","offset":54040181100000003,"end_time":"2021-09-17 11:09:40"}
+```
+
+可以得到三个窗口中网页点击次数最多分别是2次,1次,3次。
diff --git a/rocketmq-streams-examples/pom.xml b/rocketmq-streams-examples/pom.xml
index ee6bbf6..8d97fa6 100644
--- a/rocketmq-streams-examples/pom.xml
+++ b/rocketmq-streams-examples/pom.xml
@@ -27,24 +27,48 @@
 
     <artifactId>rocketmq-streams-examples</artifactId>
     <name>ROCKETMQ STREAMS :: examples</name>
+
+    <properties>
+        <file_encoding>UTF-8</file_encoding>
+        <project.build.sourceEncoding>${file_encoding}</project.build.sourceEncoding>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+
     <dependencies>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-streams-clients</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>ch.qos.logback</groupId>
-                    <artifactId>logback-classic</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
     </dependencies>
     <packaging>jar</packaging>
 
-    <properties>
-        <file_encoding>UTF-8</file_encoding>
-        <project.build.sourceEncoding>${file_encoding}</project.build.sourceEncoding>
-        <maven.compiler.source>8</maven.compiler.source>
-        <maven.compiler.target>8</maven.compiler.target>
-    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.2.1</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <minimizeJar>false</minimizeJar>
+                            <shadedArtifactAttached>true</shadedArtifactAttached>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.rocketmq:rocketmq-streams-clients</include>
+                                </includes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointTest.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointTest.java
new file mode 100644
index 0000000..2daa525
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointTest.java
@@ -0,0 +1,96 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.streams.examples.checkpoint;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.streams.client.StreamBuilder;
+import org.apache.rocketmq.streams.client.source.DataStreamSource;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.dbinit.mysql.delegate.DBDelegate;
+import org.apache.rocketmq.streams.dbinit.mysql.delegate.DBDelegateFactory;
+import org.apache.rocketmq.streams.examples.rocketmqsource.ProducerFromFile;
+
+import static org.apache.rocketmq.streams.db.driver.DriverBuilder.DEFALUT_JDBC_DRIVER;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
+
+
+public class RemoteCheckpointTest {
+    //replace with your mysql url, database name can be anyone else.
+    private static final String URL = "jdbc:mysql://localhost:3306/rocketmq_streams";
+    // user name of mysql
+    private static final String USER_NAME = "";
+    //password of mysql
+    private static final String PASSWORD = "";
+
+
+    static  {
+        ComponentCreator.getProperties().put(ConfigureFileKey.CONNECT_TYPE, "DB");
+        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_URL, URL);
+        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_DRIVER, DEFALUT_JDBC_DRIVER);
+        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_USERNAME, USER_NAME);
+        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_PASSWORD, PASSWORD);
+    }
+
+    public static void main(String[] args) {
+        ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
+        DBDelegate delegate = DBDelegateFactory.getDelegate();
+        delegate.init();
+
+        try {
+            Thread.sleep(1000 * 3);
+        } catch (InterruptedException e) {
+        }
+        System.out.println("begin streams code.");
+
+        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+        source.fromRocketmq(
+                RMQ_TOPIC,
+                RMQ_CONSUMER_GROUP_NAME,
+                false,
+                NAMESRV_ADDRESS)
+                .filter((message) -> {
+                    try {
+                        JSONObject.parseObject((String) message);
+                    } catch (Throwable t) {
+                        // if can not convert to json, discard it.because all operator are base on json.
+                        return true;
+                    }
+                    return false;
+                })
+                //must convert message to json.
+                .map(message -> JSONObject.parseObject((String) message))
+                .window(TumblingWindow.of(Time.seconds(10)))
+                .groupBy("ProjectName","LogStore")
+                .sum("OutFlow", "OutFlow")
+                .sum("InFlow", "InFlow")
+                .count("total")
+                .waterMark(5)
+                .setLocalStorageOnly(false)
+                .toDataSteam()
+                .toPrint(1)
+                .start();
+    }
+
+}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
index d568d5f..a7b4361 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
@@ -22,7 +22,7 @@ import org.apache.rocketmq.streams.client.source.DataStreamSource;
 public class FileSourceExample {
     public static void main(String[] args) {
         DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
-        source.fromFile("/your/file/path", false)
+        source.fromFile("data.txt", false)
                 .map(message -> message)
                 .toPrint(1)
                 .start();
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/PageDimension.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/PageDimension.java
new file mode 100644
index 0000000..95bfd5c
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/PageDimension.java
@@ -0,0 +1,81 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.streams.examples.pageclick;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.streams.client.StreamBuilder;
+import org.apache.rocketmq.streams.client.source.DataStreamSource;
+import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import org.apache.rocketmq.streams.examples.rocketmqsource.ProducerFromFile;
+import org.junit.Test;
+
+public class PageDimension {
+    private static final String topic = "pageClick";
+    private static final String namesrv = "127.0.0.1:9876";
+
+    /**
+     * In a certain period of time, how many times did a user click on a certain webpage
+     *
+     * @param args
+     */
+    public static void main(String[] args) {
+        ProducerFromFile.produce("pageClickData.txt", namesrv, topic);
+
+        try {
+            Thread.sleep(1000 * 3);
+        } catch (InterruptedException e) {
+        }
+        System.out.println("begin streams code.");
+
+        DataStreamSource source = StreamBuilder.dataStream("pageClickNS", "pageClickPL");
+        source.fromRocketmq(topic, "pageClickGroup", false, namesrv)
+                .map(message -> JSONObject.parseObject((String) message))
+                .window(TumblingWindow.of(Time.minutes(1)))
+                .groupBy("url")
+                .setTimeField("eventTime")
+                .count("total")
+                .waterMark(1)
+                .setLocalStorageOnly(true)
+                .toDataSteam()
+                .toFile("/home/result.txt")
+                .with(WindowStrategy.highPerformance())
+                .start();
+    }
+
+    @Test
+    public void findMax() {
+
+        DataStreamSource source = StreamBuilder.dataStream("ns-1", "pl-1");
+        source.fromFile("/home/result.txt", false)
+                .map(message -> JSONObject.parseObject((String) message))
+                .window(TumblingWindow.of(Time.seconds(5)))
+                .groupBy("start_time","end_time")
+                .max("total")
+                .waterMark(1)
+                .setLocalStorageOnly(true)
+                .toDataSteam()
+                .toPrint(1)
+                .start();
+
+    }
+
+}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/UsersDimension.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/UsersDimension.java
new file mode 100644
index 0000000..c7e6c74
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/UsersDimension.java
@@ -0,0 +1,67 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.streams.examples.pageclick;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.streams.client.StreamBuilder;
+import org.apache.rocketmq.streams.client.source.DataStreamSource;
+import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import org.apache.rocketmq.streams.examples.rocketmqsource.ProducerFromFile;
+
+
+public class UsersDimension {
+    private static final String topic = "pageClick";
+    private static final String namesrv = "127.0.0.1:9876";
+
+    /**
+     * Count the number of times a user clicks on a webpage within 5s
+     * @param args
+     */
+    public static void main(String[] args) {
+        ProducerFromFile.produce("pageClickData.txt",namesrv, topic);
+
+        try {
+            Thread.sleep(1000 * 3);
+        } catch (InterruptedException e) {
+        }
+        System.out.println("begin streams code.");
+
+
+        DataStreamSource source = StreamBuilder.dataStream("pageClickNS", "pageClickPL");
+        source.fromRocketmq(topic, "pageClickGroup", false, namesrv)
+                .map(message -> JSONObject.parseObject((String) message))
+                .window(TumblingWindow.of(Time.minutes(1)))
+                .groupBy("userId")
+                .setTimeField("eventTime")
+                .count("total")
+                .waterMark(1)
+                .setLocalStorageOnly(true)
+                .toDataSteam()
+                .toPrint(1)
+                .with(WindowStrategy.highPerformance())
+                .start();
+
+    }
+
+
+
+}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/Constant.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/Constant.java
new file mode 100644
index 0000000..c9287f4
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/Constant.java
@@ -0,0 +1,27 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.streams.examples.rocketmqsource;
+
+public class Constant {
+    public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
+    public static final String RMQ_TOPIC = "NormalTestTopic";
+    public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-02";
+    public static final String TAGS = "*";
+}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java
new file mode 100644
index 0000000..b26b66f
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java
@@ -0,0 +1,99 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.streams.examples.rocketmqsource;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ProducerFromFile {
+
+    public static void produce(String filePath, String nameServ, String topic) {
+        try {
+            DefaultMQProducer producer = new DefaultMQProducer("test-group");
+            producer.setNamesrvAddr(nameServ);
+            producer.start();
+
+            List<String> result = ProducerFromFile.read(filePath);
+
+            for (String str : result) {
+                Message msg = new Message(topic, "", str.getBytes(RemotingHelper.DEFAULT_CHARSET));
+                SendResult sendResult = producer.send(msg);
+                System.out.printf("%s%n", sendResult);
+            }
+            //Shut down once the producer instance is not longer in use.
+            producer.shutdown();
+        } catch (Throwable t) {
+            t.printStackTrace();
+        }
+
+    }
+
+    private static File getFile(String filePath) {
+        File file = new File(filePath);
+        if (!file.exists()) {
+            ClassLoader loader = ProducerFromFile.class.getClassLoader();
+            URL url = loader.getResource(filePath);
+
+            if (url != null) {
+                String path = url.getFile();
+                file = new File(path);
+            }
+        }
+        return file;
+
+    }
+
+    private static List<String> read(String path) {
+        File file = getFile(path);
+        List<String> result = new ArrayList<>();
+        BufferedReader reader = null;
+        try {
+            reader = new BufferedReader(new FileReader(file));
+
+            String line = reader.readLine();
+            while (line != null && !"".equals(line)) {
+                result.add(line);
+                line = reader.readLine();
+            }
+
+        } catch (Throwable t) {
+            t.printStackTrace();
+        } finally {
+            if (reader!= null) {
+                try {
+                    reader.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        return result;
+    }
+}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java
index c2e6bd1..c56c975 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java
@@ -19,14 +19,30 @@ package org.apache.rocketmq.streams.examples.rocketmqsource;
 import org.apache.rocketmq.streams.client.StreamBuilder;
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
 
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
+
+
 public class RocketMQSourceExample1 {
     public static void main(String[] args) {
-        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+        ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
 
+        try {
+            Thread.sleep(1000 * 3);
+        } catch (InterruptedException e) {
+        }
+
+        System.out.println("begin streams code.");
+
+        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+        /**
+         * 1、before run this case, make sure some data has already been rocketmq.
+         */
         source.fromRocketmq(
-                RocketMQSourceExample2.RMQ_TOPIC,
-                RocketMQSourceExample2.RMQ_CONSUMER_GROUP_NAME,
-                RocketMQSourceExample2.NAMESRV_ADDRESS
+                RMQ_TOPIC,
+                RMQ_CONSUMER_GROUP_NAME,
+                NAMESRV_ADDRESS
         )
                 .map(message -> message)
                 .toPrint(1)
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
index c9d9bda..69c217b 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
@@ -18,21 +18,28 @@ package org.apache.rocketmq.streams.examples.rocketmqsource;
 
 import org.apache.rocketmq.streams.client.StreamBuilder;
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.client.transform.window.Time;
-import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-import org.apache.rocketmq.streams.client.transform.window.WindowInfo;
 
 import java.util.Arrays;
 
-public class RocketMQSourceExample2 {
-    public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
-    public static final String RMQ_TOPIC = "NormalTestTopic";
-    public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-01";
-    public static final String TAGS = "*";
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
 
+public class RocketMQSourceExample2 {
+    /**
+     * 1、before run this case, make sure some data has already been rocketmq.
+     */
     public static void main(String[] args) {
-        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+        ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
 
+        try {
+            Thread.sleep(1000 * 3);
+        } catch (InterruptedException e) {
+        }
+
+        System.out.println("begin streams code.");
+
+        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
         source.fromRocketmq(
                 RMQ_TOPIC,
                 RMQ_CONSUMER_GROUP_NAME,
@@ -47,7 +54,7 @@ public class RocketMQSourceExample2 {
                 .filter((value) -> {
                     System.out.println("filter: ===========");
                     String messageValue = (String)value;
-                    return !messageValue.contains("RocketMQ");
+                    return !messageValue.contains("InFlow");
                 })
                 .flatMap((message)->{
                     String value = (String) message;
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
index 1af6206..8411745 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
@@ -17,20 +17,34 @@
 
 package org.apache.rocketmq.streams.examples.rocketmqsource;
 
+import com.alibaba.fastjson.JSONObject;
 import org.apache.rocketmq.streams.client.StreamBuilder;
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
 
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
-public class RocketMQSourceExample3 {
-    public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
-    public static final String RMQ_TOPIC = "NormalTestTopic";
-    public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-03";
-    public static final String TAGS = "*";
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
 
+public class RocketMQSourceExample3 {
+    /**
+     * 1、before run this case, make sure some data has already been rocketmq.
+     */
     public static void main(String[] args) {
-        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+        ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
+
+        try {
+            Thread.sleep(1000 * 3);
+        } catch (InterruptedException e) {
+        }
 
+        System.out.println("begin streams code.");
+
+        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
         source.fromRocketmq(
                 RMQ_TOPIC,
                 RMQ_CONSUMER_GROUP_NAME,
@@ -42,12 +56,19 @@ public class RocketMQSourceExample3 {
                 .map(message -> message)
                 .filter((value) -> {
                     String messageValue = (String) value;
-                    return messageValue.contains("RocketMQ");
+                    return !messageValue.contains("InFlow");
                 })
                 .flatMap((message) -> {
-                    String value = (String) message;
-                    String[] result = value.split(" ");
-                    return Arrays.asList(result);
+                    JSONObject jsonObject = JSONObject.parseObject((String) message);
+                    Set<Map.Entry<String, Object>> entries = jsonObject.entrySet();
+
+                    List<String> result = new ArrayList<>();
+
+                    for (Map.Entry<String, Object> entry : entries) {
+                        String str = entry.getKey() + ":" + entry.getValue();
+                        result.add(str);
+                    }
+                    return result;
                 })
                 .toPrint(1)
                 .start();
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
index a50d6cb..6dfdfb5 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
@@ -24,15 +24,26 @@ import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
 import org.apache.rocketmq.streams.client.transform.window.Time;
 import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
 
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
+
 public class RocketmqWindowTest {
-    public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
-    public static final String RMQ_TOPIC = "NormalTestTopic";
-    public static final String RMQ_CONSUMER_GROUP_NAME = "group-03";
-    public static final String TAGS = "*";
 
+    /**
+     * 1、before run this case, make sure some data has already been rocketmq.
+     * 2、rocketmq allow create topic automatically.
+     */
     public static void main(String[] args) {
-        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+        ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
 
+        try {
+            Thread.sleep(1000 * 3);
+        } catch (InterruptedException e) {
+        }
+        System.out.println("begin streams code.");
+
+        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
         source.fromRocketmq(
                 RMQ_TOPIC,
                 RMQ_CONSUMER_GROUP_NAME,
@@ -47,11 +58,14 @@ public class RocketmqWindowTest {
                     }
                     return false;
                 })
+                //must convert message to json.
                 .map(message -> JSONObject.parseObject((String) message))
-                .window(TumblingWindow.of(Time.seconds(1)))
-                .groupBy("ProjectName", "LogStore")
+                .window(TumblingWindow.of(Time.seconds(10)))
+                .groupBy("ProjectName","LogStore")
+                .sum("OutFlow", "OutFlow")
+                .sum("InFlow", "InFlow")
                 .count("total")
-                .waterMark(1)
+                .waterMark(5)
                 .setLocalStorageOnly(true)
                 .toDataSteam()
                 .toPrint(1)
diff --git a/rocketmq-streams-examples/src/main/resources/data.txt b/rocketmq-streams-examples/src/main/resources/data.txt
new file mode 100644
index 0000000..d6a9ae9
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/resources/data.txt
@@ -0,0 +1,11 @@
+{"InFlow":"1","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"0"}
+{"InFlow":"2","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"1"}
+{"InFlow":"3","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"2"}
+{"InFlow":"4","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"3"}
+{"InFlow":"5","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"4"}
+{"InFlow":"6","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"5"}
+{"InFlow":"7","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"6"}
+{"InFlow":"8","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"7"}
+{"InFlow":"9","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"8"}
+{"InFlow":"10","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"9"}
+
diff --git a/rocketmq-streams-examples/src/main/resources/pageClickData.txt b/rocketmq-streams-examples/src/main/resources/pageClickData.txt
new file mode 100644
index 0000000..51bd89d
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/resources/pageClickData.txt
@@ -0,0 +1,11 @@
+{"userId":"1","eventTime":"1631700000000","method":"GET","url":"page-1"}
+{"userId":"2","eventTime":"1631700030000","method":"POST","url":"page-2"}
+{"userId":"3","eventTime":"1631700040000","method":"GET","url":"page-3"}
+{"userId":"1","eventTime":"1631700050000","method":"DELETE","url":"page-2"}
+{"userId":"1","eventTime":"1631700060000","method":"DELETE","url":"page-2"}
+{"userId":"2","eventTime":"1631700070000","method":"POST","url":"page-3"}
+{"userId":"3","eventTime":"1631700080000","method":"GET","url":"page-2"}
+{"userId":"1","eventTime":"1631700090000","method":"GET","url":"page-3"}
+{"userId":"2","eventTime":"1631700100000","method":"PUT","url":"page-3"}
+{"userId":"4","eventTime":"1631700120000","method":"POST","url":"page-1"}
+
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
index 8850ada..46eb1dd 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
@@ -16,13 +16,13 @@
  */
 package org.apache.rocketmq.streams.window.fire;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.window.operator.AbstractWindow;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class EventTimeManager {
     private Map<String,SplitEventTimeManager> eventTimeManagerMap=new HashMap<>();
     protected ISource source;
@@ -40,6 +40,8 @@ public class EventTimeManager {
                 }
             }
         }
+
+
         splitEventTimeManager.updateEventTime(message,window);
     }
 
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
index 70742f3..42f5408 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
@@ -16,32 +16,26 @@
  */
 package org.apache.rocketmq.streams.window.fire;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.context.MessageOffset;
-import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.db.driver.DriverBuilder;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
 import org.apache.rocketmq.streams.window.model.WindowCache;
 import org.apache.rocketmq.streams.window.model.WindowInstance;
 import org.apache.rocketmq.streams.window.operator.AbstractWindow;
 
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class SplitEventTimeManager {
     protected static final Log LOG = LogFactory.getLog(SplitEventTimeManager.class);
-    protected Map<String,Long> messageSplitId2MaxTime=new HashMap<>();
+    protected static Map<String,Long> messageSplitId2MaxTime=new HashMap<>();
     private AtomicInteger queueIdCount=new AtomicInteger(0);
     protected Long lastUpdateTime;
 
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
index 0f573e2..435aa71 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
@@ -16,18 +16,6 @@
  */
 package org.apache.rocketmq.streams.window.operator.impl;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.context.MessageOffset;
@@ -48,6 +36,18 @@ import org.apache.rocketmq.streams.window.storage.IWindowStorage;
 import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
 import org.apache.rocketmq.streams.window.storage.WindowStorage.WindowBaseValueIterator;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class WindowOperator extends AbstractShuffleWindow {
 
     private static final String ORDER_BY_SPLIT_NUM="_order_by_split_num_";//key=_order;queueid,windowinstanceid,partitionNum
@@ -80,7 +80,7 @@ public class WindowOperator extends AbstractShuffleWindow {
     protected transient AtomicInteger shuffleCount=new AtomicInteger(0);
     protected transient AtomicInteger fireCountAccumulator=new AtomicInteger(0);
     @Override
-    public int fireWindowInstance(WindowInstance instance, String queueId,Map<String,String> queueId2Offset) {
+    public int fireWindowInstance(WindowInstance instance, String queueId, Map<String,String> queueId2Offset) {
         List<WindowValue> windowValues=new ArrayList<>();
         int fireCount=0;
         long startTime=System.currentTimeMillis();
@@ -100,10 +100,9 @@ public class WindowOperator extends AbstractShuffleWindow {
                 continue;
             }
             WindowValue windowValue=(WindowValue)windowBaseValue;
-            Integer currentValue=(Integer)windowValue.getComputedColumnResultByKey("total");
-            if(currentValue==null){
-                currentValue=0;
-            }
+
+            Integer currentValue = getValue(windowValue, "total");
+
               fireCountAccumulator.addAndGet(currentValue);
             windowValues.add((WindowValue)windowBaseValue);
             if(windowValues.size()>=windowCache.getBatchSize()){
@@ -121,15 +120,9 @@ public class WindowOperator extends AbstractShuffleWindow {
             sendCost+=(System.currentTimeMillis()-sendFireCost);
             fireCount+=windowValues.size();
         }
-//        if(fireCountAccumulator.get()>25000){
-//            System.out.println("fire count is "+fireCountAccumulator.get());
-//        }
-
-        //long clearStart=System.currentTimeMillis();
         clearFire(instance);
         this.sqlCache.addCache(new FiredNotifySQLElement(queueId,instance.createWindowInstanceId()));
-        // System.out.println("=============== fire cost is "+(System.currentTimeMillis()-startTime)+"send cost is "+sendCost+" clear cost is "+(System.currentTimeMillis()-clearStart));
-        return fireCount;
+         return fireCount;
     }
 
     protected transient Map<String,Integer>  shuffleWindowInstanceId2MsgCount=new HashMap<>();
@@ -161,39 +154,39 @@ public class WindowOperator extends AbstractShuffleWindow {
             }
             allWindowValues.put(storeKey,windowValue);
             windowValue.incrementUpdateVersion();
-            Integer origValue=(Integer)windowValue.getComputedColumnResultByKey("total");
-            if(origValue==null){
-                origValue=0;
-            }
+
+            Integer origValue = getValue(windowValue, "total");
+
             if(msgs!=null){
                 for(IMessage message:msgs){
                     windowValue.calculate(this,message);
                 }
             }
 
-            Integer currentValue=(Integer)windowValue.getComputedColumnResultByKey("total");
-            if(currentValue==null){
-                currentValue=0;
-            }
+            Integer currentValue = getValue(windowValue, "total");
+
             shuffleCount.addAndGet(-origValue);
             shuffleCount.addAndGet(currentValue);
-           // if(shuffleCount.get()>25000){
-            //    System.out.println("==========shuffle count is "+shuffleCount.get());
-            //}
-
-
         }
         if(DebugWriter.getDebugWriter(this.getConfigureName()).isOpenDebug()){
             DebugWriter.getDebugWriter(this.getConfigureName()).writeWindowCalculate(this,new ArrayList(allWindowValues.values()),queueId);
         }
 
         saveStorage(allWindowValues,instance,queueId);
-        //Integer count=shuffleWindowInstanceId2MsgCount.get(instance.createWindowInstanceId());
-        //if(count==null){
-        //    count=0;
-        //}
-        //count+=messages.size();
-        //shuffleWindowInstanceId2MsgCount.put(instance.createWindowInstanceId(),count);
+    }
+
+    private Integer getValue(WindowValue windowValue, String fieldName) {
+        Object value = windowValue.getComputedColumnResultByKey(fieldName);
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Integer) {
+            return (Integer) value;
+        } else if (value instanceof String) {
+            String strValue = (String) value;
+            return Integer.valueOf(strValue);
+        }
+        throw new ClassCastException("value:["+value+"] of fieldName:["+fieldName+"] can not change to number.");
     }
 
 
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index ba17a44..523eb0f 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -20,10 +20,6 @@ import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
 import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
 import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
@@ -32,32 +28,36 @@ import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
 import org.apache.rocketmq.streams.common.checkpoint.CheckPointState;
 import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.context.AbstractContext;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.common.context.MessageOffset;
 import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
 import org.apache.rocketmq.streams.common.topology.ChainPipeline;
 import org.apache.rocketmq.streams.common.topology.model.Pipeline;
 import org.apache.rocketmq.streams.common.utils.CollectionUtil;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.common.utils.TraceUtil;
 import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
 import org.apache.rocketmq.streams.window.debug.DebugWriter;
+import org.apache.rocketmq.streams.window.model.WindowCache;
+import org.apache.rocketmq.streams.window.model.WindowInstance;
 import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
 import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.common.context.AbstractContext;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.model.WindowCache;
 import org.apache.rocketmq.streams.window.operator.impl.WindowOperator.WindowRowOperator;
 import org.apache.rocketmq.streams.window.sqlcache.impl.SQLElement;
 import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * 负责处理分片