You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/09/18 11:39:17 UTC

[rocketmq-streams] branch async-ck updated: add checkpoint storage (#69)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch async-ck
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/async-ck by this push:
     new 2766990  add checkpoint storage (#69)
2766990 is described below

commit 276699067024103b6f35da4699c44d05848ad0f4
Author: cyril <cw...@gmail.com>
AuthorDate: Sat Sep 18 19:39:13 2021 +0800

    add checkpoint storage (#69)
    
    * add channel-db module
    
    * add channel-configurable
    
    * add chinese
    
    * fixed DataStream flatMap
    
    * U
    
    * stash & pre merge
    
    * add license for chackpoint/pom.xml
    
    * add license for checkpoint/pom.xml
    
    * fix license
    
    Co-authored-by: vv <ze...@alibaba-inc.com>
---
 README-chinese.md                                  | 106 +++++++++
 pom.xml                                            |   1 +
 .../pom.xml                                        |  44 +++-
 .../streams/checkpoint/db/DBCheckPointStorage.java |  65 ++++++
 .../streams/common/channel/AbstractChannel.java    |  10 +
 .../common/channel/impl/memory/MemoryChannel.java  |   5 +
 .../streams/common/channel/sink/AbstractSink.java  |   2 +-
 .../common/channel/source/AbstractSource.java      |  53 ++++-
 .../streams/common/channel/source/ISource.java     |  18 ++
 .../checkpoint/AbstractCheckPointStorage.java      | 184 +++++++++++++++
 .../streams/common/checkpoint/CheckPoint.java      |  98 ++++++--
 .../common/checkpoint/CheckPointManager.java       | 254 ++++++++-------------
 .../common/checkpoint/CheckPointMessage.java       |  10 +
 .../checkpoint/CheckPointStorageFactory.java       |  87 +++++++
 .../ICheckPointStorage.java}                       |  24 +-
 .../streams/common/checkpoint/SourceSnapShot.java  |  79 +++++++
 .../streams/common/checkpoint/SourceState.java     |  57 +++++
 .../streams/common/configure/ConfigureFileKey.java |   6 +
 .../streams/common/functions/FlatMapFunction.java  |   4 +-
 .../optimization/cachefilter/CacheFilterMeta.java  |  18 +-
 rocketmq-streams-configurable/pom.xml              |   1 +
 rocketmq-streams-state/pom.xml                     |   0
 22 files changed, 923 insertions(+), 203 deletions(-)

diff --git a/README-chinese.md b/README-chinese.md
new file mode 100644
index 0000000..5d8215f
--- /dev/null
+++ b/README-chinese.md
@@ -0,0 +1,106 @@
+# RocketMQ Streams
+## Features
+
+* 轻量级部署:可以单独部署,也支持集群部署
+* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等
+
+## DataStream Example
+
+```java
+import org.apache.rocketmq.streams.client.transform.DataStream;
+
+DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
+
+    source
+    .fromFile("~/admin/data/text.txt",false)
+    .map(message->message)
+    .toPrint(1)
+    .start();
+```
+
+## Maven Repository
+
+```xml
+
+<dependency>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-streams-clients</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+</dependency>
+```
+
+# Core API
+
+rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求;
+
+## StreamBuilder
+
+StreamBuilder 用于构建流任务的源; 内部包含```dataStream()```和```tableStream()```俩个方法,分别返回DataStreamSource和TableStreamSource俩个源;
+
++ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务;
++ [tableStream(nameSpaceName,pipelineName)]()返回TableStreamSource实例, 用于脚本编程实现流计算任务;
+
+## DataStream API
+
+### Source
+
+DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;
+
++ ```fromFile```  从文件中读取数据, 该方法包含俩个参数
+  + ```filePath``` 文件路径,必填参数
+  + ```isJsonData```  是否json数据, 非必填参数, 默认为```true```
+
+
++ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
+  + ```topic``` rocketmq消息队列的topic名称,必填参数
+  + ```groupName``` 消费者组的名称,必填参数
+  + ```isJson``` 是否json格式,非必填参数
+  + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
+
++ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源
+
+### transform
+
+transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类;
+
+#### DataStream
+
+DataStream实现了一系列常见的流计算算子
+
++ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
++ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
++ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
++ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
++ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
++ ```operate```  对每个记录执行一次自定义的函数,返回一个新的DataStream
++ ```script```  针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
++ ```toPrint``` 将结果在控制台打印,生成新的DataStreamAction实例
++ ```toFile``` 将结果保存为文件,生成一个新的DataStreamAction实例
++ ```toDB``` 将结果保存到数据库
++ ```toRocketmq``` 将结果输出到rocketmq
++ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
++ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
+  + ```count``` 在窗口内计数
+  + ```min``` 获取窗口内统计值的最小值
+  + ```max``` 获取窗口内统计值得最大值
+  + ```avg``` 获取窗口内统计值的平均值
+  + ```sum``` 获取窗口内统计值的加和值
+  + ```reduce``` 在窗口内进行自定义的汇总运算
++ ```join``` 根据条件将将俩个流进行关联, 合并为一个大流进行相关的运算
++ ```union``` 将俩个流进行合并
++ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
++ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等
+
+# Strategy
+
+策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型;
+
+```java
+//指定checkpoint的存储策略
+source
+    .fromRocketmq("TSG_META_INFO","")
+    .map(message->message+"--")
+    .toPrint(1)
+    .with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
+    .start();
+```
diff --git a/pom.xml b/pom.xml
index 7bbe1f4..6bdd9ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
         <module>rocketmq-streams-channel-http</module>
         <module>rocketmq-streams-state</module>
         <module>rocketmq-streams-examples</module>
+        <module>rocketmq-streams-checkpoint</module>
 
     </modules>
 
diff --git a/rocketmq-streams-configurable/pom.xml b/rocketmq-streams-checkpoint/pom.xml
old mode 100755
new mode 100644
similarity index 54%
copy from rocketmq-streams-configurable/pom.xml
copy to rocketmq-streams-checkpoint/pom.xml
index f160060..40a28c0
--- a/rocketmq-streams-configurable/pom.xml
+++ b/rocketmq-streams-checkpoint/pom.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
@@ -18,19 +18,49 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
     <parent>
-        <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-streams</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
         <version>1.0.0-SNAPSHOT</version>
     </parent>
-    <artifactId>rocketmq-streams-configurable</artifactId>
-    <name>ROCKETMQ STREAMS :: configurable</name>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-streams-checkpoint</artifactId>
+    <name>ROCKETMQ STREAMS :: checkpoint</name>
     <packaging>jar</packaging>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
     <dependencies>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-streams-serviceloader</artifactId>
+            <artifactId>rocketmq-streams-commons</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.auto.service</groupId>
+                    <artifactId>auto-service</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-streams-db-operator</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.auto.service</groupId>
+                    <artifactId>auto-service</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.auto.service</groupId>
+            <artifactId>auto-service</artifactId>
+            <optional>true</optional>
         </dependency>
     </dependencies>
-</project>
+
+</project>
\ No newline at end of file
diff --git a/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java b/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
new file mode 100644
index 0000000..df1d046
--- /dev/null
+++ b/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.checkpoint.db;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.checkpoint.AbstractCheckPointStorage;
+import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
+import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
+import org.apache.rocketmq.streams.common.checkpoint.SourceSnapShot;
+import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
+
+import java.util.List;
+
+/**
+ * @description
+ */
+public class DBCheckPointStorage extends AbstractCheckPointStorage {
+
+    static final Log logger = LogFactory.getLog(DBCheckPointStorage.class);
+
+    static final String STORAGE_NAME = "DB";
+
+    public DBCheckPointStorage(){
+
+    }
+
+    @Override
+    public String getStorageName() {
+        return STORAGE_NAME;
+    }
+
+    @Override
+    public <T> void save(List<T> checkPointState) {
+        logger.info(String.format("save checkpoint size %d", checkPointState.size()));
+        ORMUtil.batchReplaceInto(checkPointState);
+    }
+
+    @Override
+    //todo
+    public CheckPoint recover(ISource iSource, String queueId) {
+        String sourceName = CheckPointManager.createSourceName(iSource, null);
+        String key = CheckPointManager.createCheckPointKey(sourceName, queueId);
+        String sql = "select * from source_snap_shot where `key` = " + "'" + key + "';";
+        SourceSnapShot snapShot = ORMUtil.queryForObject(sql, null, SourceSnapShot.class);
+
+        logger.info(String.format("checkpoint recover key is %s, sql is %s, recover sourceSnapShot : %s", key, sql, snapShot == null ? "null snapShot" : snapShot.toString()));
+        return new CheckPoint().fromSnapShot(snapShot);
+    }
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java
index 74ff5f4..7765669 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java
@@ -227,4 +227,14 @@ public abstract class AbstractChannel extends BasedConfigurable implements IChan
         create();
         ((AbstractSource)source).setJsonData(isJsonData);
     }
+
+    @Override
+    public String getTopic(){
+        return source.getTopic();
+    }
+
+    @Override
+    public void setTopic(String topic){
+        source.setTopic(topic);
+    }
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java
index 06dff1c..f3d06ee 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java
@@ -74,4 +74,9 @@ public class MemoryChannel extends AbstractChannel {
             }
         };
     }
+
+    @Override
+    public String createCheckPointName() {
+        return "memory-source";
+    }
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
index 82218c2..aa7c1bc 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
@@ -32,8 +32,8 @@ import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MultiSplitMessa
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
-import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager.SourceState;
 import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
+import org.apache.rocketmq.streams.common.checkpoint.SourceState;
 import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
 import org.apache.rocketmq.streams.common.configurable.IConfigurableIdentification;
 import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
index f5de98b..bf45e7e 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.streams.common.context.MessageHeader;
 import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
 import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
 import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 
 /**
@@ -75,6 +76,7 @@ public abstract class AbstractSource extends BasedConfigurable implements ISourc
 
     protected List<String> logFingerprintFields;//log fingerprint to filter msg quickly
 
+
     /**
      * 数据源投递消息的算子,此算子用来接收source的数据,做处理
      */
@@ -223,18 +225,22 @@ public abstract class AbstractSource extends BasedConfigurable implements ISourc
         return createJson(message);
     }
 
-
+    /**
+     * 交给receiver执行后续逻辑
+     *
+     * @param channelMessage
+     * @return
+     */
     public AbstractContext executeMessage(Message channelMessage) {
         AbstractContext context = new Context(channelMessage);
-        if (!channelMessage.getHeader().isSystemMessage()) {
-            messageQueueChangedCheck(channelMessage.getHeader());
-        }
-
         if (isSplitInRemoving(channelMessage)) {
             return context;
         }
+        if (!channelMessage.getHeader().isSystemMessage()) {
+            messageQueueChangedCheck(channelMessage.getHeader());
+        }
 
-        boolean needFlush = !channelMessage.getHeader().isSystemMessage() && channelMessage.getHeader().isNeedFlush();
+        boolean needFlush = channelMessage.getHeader().isSystemMessage() == false && channelMessage.getHeader().isNeedFlush();
 
         if (receiver != null) {
             receiver.doMessage(channelMessage, context);
@@ -277,6 +283,9 @@ public abstract class AbstractSource extends BasedConfigurable implements ISourc
      * @param header
      */
     protected void messageQueueChangedCheck(MessageHeader header) {
+        if (supportNewSplitFind() && supportRemoveSplitFind()) {
+            return;
+        }
         Set<String> queueIds = new HashSet<>();
         String msgQueueId = header.getQueueId();
         if (StringUtil.isNotEmpty(msgQueueId)) {
@@ -287,7 +296,7 @@ public abstract class AbstractSource extends BasedConfigurable implements ISourc
             queueIds.addAll(checkpointQueueIds);
         }
         Set<String> newQueueIds = new HashSet<>();
-
+        Set<String> removeQueueIds = new HashSet<>();
         for (String queueId : queueIds) {
             if (isNotDataSplit(queueId)) {
                 continue;
@@ -536,4 +545,34 @@ public abstract class AbstractSource extends BasedConfigurable implements ISourc
         return isBatchMessage;
     }
 
+    @Override
+    public String createCheckPointName(){
+
+        ISource source = this;
+
+        String namespace = source.getNameSpace();
+        String name = source.getConfigureName();
+        String groupName = source.getGroupName();
+
+
+        if(StringUtil.isEmpty(namespace)){
+            namespace = "default_namespace";
+        }
+
+        if(StringUtil.isEmpty(name)){
+            name = "default_name";
+        }
+
+        if(StringUtil.isEmpty(groupName)){
+            groupName = "default_groupName";
+        }
+        String topic = source.getTopic();
+        if(topic == null || topic.trim().length() == 0){
+            topic = "default_topic";
+        }
+        return MapKeyUtil.createKey(namespace, groupName, topic, name);
+
+    }
+
+
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/ISource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/ISource.java
index 4b9f55f..d3f6802 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/ISource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/ISource.java
@@ -74,4 +74,22 @@ public interface ISource<T extends ISource> extends IConfigurable, IStageBuilder
      */
     long getCheckpointTime();
 
+    /**
+     *
+     * @param topic
+     */
+    void setTopic(String topic);
+
+    /**
+     *
+     * @return
+     */
+    String getTopic();
+
+    /**
+     * 创建checkpoint名字
+     * @return checkpoint key name
+     */
+    String createCheckPointName();
+
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java
new file mode 100644
index 0000000..fad0ae1
--- /dev/null
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.common.checkpoint;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
+import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
+import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
+import org.apache.rocketmq.streams.common.context.MessageOffset;
+
+import java.util.*;
+
+public abstract class AbstractCheckPointStorage implements ICheckPointStorage{
+
+    static final Log logger = LogFactory.getLog(AbstractCheckPointStorage.class);
+
+    protected transient IMessageCache<CheckPointMessage> messageCache;
+
+    public AbstractCheckPointStorage(){
+        messageCache = new MessageCache<>(new IMessageFlushCallBack<CheckPointMessage>() {
+            @Override
+            public boolean flushMessage(List<CheckPointMessage> messages) {
+                //合并最近的checkpoint,只存储一次
+                // key 为 sourceName, Value中的map, k : v = queueid : offset
+                Map<String, SourceState> sourceStateMap = mergeSourceState(messages);
+
+                logger.info(String.format("flushMessage raw size %d, merge size %d", messages.size(), sourceStateMap.size()));
+                logger.info("flushMessage : " + messages.get(0).getCheckPointStates().get(0).getQueueIdAndOffset().toString());
+
+                saveCheckPoint(sourceStateMap);
+                return true;
+            }
+        });
+        ((MessageCache)messageCache).setAutoFlushSize(10);
+        ((MessageCache)messageCache).setAutoFlushTimeGap(500);
+        messageCache.openAutoFlush();
+    }
+
+    public void flush(){
+        messageCache.flush();
+    }
+
+
+    /**
+     * 可能有多次的offset合并在一起,对offset合并
+     * 合并包含两个逻辑:1.同1个CheckPointMessage中,选择最小的作为本次的offset
+     * @param messages
+     */
+    protected  Map<String, SourceState> mergeSourceState(List<CheckPointMessage> messages) {
+        Map<String,SourceState> sourceStateMap = new HashMap<>();
+        for(CheckPointMessage checkPointMessage:messages){
+            SourceState sourceState = createSourceState(checkPointMessage);
+            if(sourceState == null){
+                continue;
+            }
+            String sourceName = sourceState.getSourceName();
+            SourceState existSourceState = sourceStateMap.get(sourceName);
+            SourceState lastSourceState = sourceState;
+            if(existSourceState != null){
+                lastSourceState = merge(sourceState,existSourceState);
+            }
+            sourceStateMap.put(sourceName,lastSourceState);
+        }
+        return sourceStateMap;
+    }
+
+    /**
+     * 如果多次的checkpoint在一起,先合并再保存
+     * @param sourceState
+     * @param existSourceState
+     * @return
+     */
+    protected SourceState merge(SourceState sourceState, SourceState existSourceState) {
+        Iterator<Map.Entry<String, MessageOffset>> it = sourceState.getQueueId2Offsets().entrySet()
+            .iterator();
+        while (it.hasNext()){
+            Map.Entry<String, MessageOffset> entry=it.next();
+            String queueId = entry.getKey();
+            MessageOffset offset = entry.getValue();
+            MessageOffset existOffset = existSourceState.getQueueId2Offsets().get(queueId);
+            if(existOffset == null){
+                existSourceState.getQueueId2Offsets().put(queueId,offset);
+            }else {
+                boolean isGreateThan = offset.greateThan(existOffset.getOffsetStr());
+                if(isGreateThan){
+                    existSourceState.getQueueId2Offsets().put(queueId,offset);
+                }
+            }
+        }
+        return existSourceState;
+    }
+
+    /**
+     * 一个pipeline流程中,找最小的offset提交保存
+     * @param checkPointMessage
+     * @return
+     */
+    protected SourceState createSourceState(CheckPointMessage checkPointMessage) {
+        SourceState sourceState = new SourceState();
+        String pipelineName = checkPointMessage.getPipelineName();
+
+        Map<String, MessageOffset> queueId2Offsets = new HashMap<>();
+        sourceState.setSourceName(CheckPointManager.createSourceName(checkPointMessage.getSource(),pipelineName));
+        sourceState.setQueueId2Offsets(queueId2Offsets);
+
+        for(CheckPointState checkPointState:checkPointMessage.getCheckPointStates()){
+
+            if(checkPointState.isReplyAnyOny()){
+                continue;
+            }
+            if(checkPointState.isReplyRefuse()){
+                return null;
+            }
+            Iterator<Map.Entry<String, MessageOffset>> it = checkPointState.getQueueIdAndOffset().entrySet()
+                .iterator();
+            while (it.hasNext()){
+                Map.Entry<String, MessageOffset> entry=it.next();
+                String queueId=entry.getKey();
+                MessageOffset offset = entry.getValue();
+                MessageOffset existOffset = queueId2Offsets.get(queueId);
+                if(existOffset==null){
+                    queueId2Offsets.put(queueId,offset);
+                }else {
+                    boolean isGreateThan=existOffset.greateThan(offset.getOffsetStr());
+                    if(isGreateThan){
+                        queueId2Offsets.put(queueId,offset);
+                    }else {
+                        queueId2Offsets.put(queueId,existOffset);
+                    }
+                }
+            }
+        }
+        return sourceState;
+    }
+
+    /**
+     * 先查询现在数据源的分片,如果已经不处理的分片,不做保存
+     * 否则把结果保存到db中
+     * @param sourceStateMap
+     */
+    protected void saveCheckPoint(Map<String, SourceState> sourceStateMap) {
+
+        List<SourceSnapShot> checkPoints = new ArrayList<>();
+
+        for(SourceState sourceState:sourceStateMap.values()){
+            for(Map.Entry<String, MessageOffset> entry : sourceState.getQueueId2Offsets().entrySet()){
+                CheckPoint checkPoint = new CheckPoint();
+                checkPoint.setSourceName(sourceState.getSourceName());
+                checkPoint.setQueueId(entry.getKey());
+                checkPoint.setData(entry.getValue().getMainOffset());
+                checkPoint.setGmtCreate(new Date());
+                checkPoint.setGmtModified(new Date());
+                SourceSnapShot object = checkPoint.toSnapShot();
+                checkPoints.add(object);
+
+            }
+        }
+        save(checkPoints);
+    }
+
+    public void addCheckPointMessage(CheckPointMessage message){
+        List<CheckPointState> states = message.getCheckPointStates();
+        for(CheckPointState state : states){
+            logger.debug(String.format("addCheckPointMessage states %s", state.getQueueIdAndOffset().toString()));
+        }
+        messageCache.addCache(message);
+    }
+
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java
index 4eb7211..8bcedbd 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java
@@ -16,28 +16,24 @@
  */
 package org.apache.rocketmq.streams.common.checkpoint;
 
+import com.alibaba.fastjson.JSONObject;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.model.Entity;
 
 /**
  * model for checkpoint,need save in store
  */
-public class CheckPoint extends Entity {
+public class CheckPoint<T> extends Entity {
+
     protected String sourceNamespace;
+    protected String pipelineName;
     protected String sourceName;
     protected String queueId;
-    protected String offset;
+    protected String topic;
 
-    public static String loadOffset(ISource source, String splitId) {
-        return null;
-        //Set<String> splits=new HashSet<>();
-        //splits.add(splitId);
-        //Map<String,String> queueId2Offset=loadOffset(source,splits);
-        //if(queueId2Offset==null||queueId2Offset.containsKey(splitId)==false){
-        //    return null;
-        //}
-        //return queueId2Offset.get(splitId);
-    }
+    protected T data;
+
+    protected JSONObject content;
 
     public String getQueueId() {
         return queueId;
@@ -47,13 +43,6 @@ public class CheckPoint extends Entity {
         this.queueId = queueId;
     }
 
-    public String getOffset() {
-        return offset;
-    }
-
-    public void setOffset(String offset) {
-        this.offset = offset;
-    }
 
     public String getSourceNamespace() {
         return sourceNamespace;
@@ -70,4 +59,73 @@ public class CheckPoint extends Entity {
     public void setSourceName(String sourceName) {
         this.sourceName = sourceName;
     }
-}
+
+    public String getPipelineName() {
+        return pipelineName;
+    }
+
+    public void setPipelineName(String pipelineName) {
+        this.pipelineName = pipelineName;
+    }
+
+    public JSONObject getContent() {
+        return content;
+    }
+
+    public void setContent(JSONObject content) {
+        this.content = content;
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public void setData(T data) {
+        this.data = data;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public SourceSnapShot toSnapShot(){
+        SourceSnapShot snapShot = new SourceSnapShot();
+        snapShot.setGmtCreate(gmtCreate);
+        snapShot.setGmtModified(gmtModified);
+        snapShot.setKey(CheckPointManager.createCheckPointKey(sourceName, queueId));
+        if(content == null){
+            content = new JSONObject();
+        }
+        content.put("offset", data);
+
+        snapShot.setValue(content.toString());
+        return snapShot;
+
+    }
+
+    public CheckPoint fromSnapShot(SourceSnapShot sourceSnapShot){
+
+        if(sourceSnapShot == null){
+            return null;
+        }
+
+        String key = sourceSnapShot.getKey();
+        String value = sourceSnapShot.getValue();
+        CheckPoint<String> checkPoint = new CheckPoint<>();
+        String[] tmp1 = CheckPointManager.parseCheckPointKey(key);
+        String[] tmp2 = tmp1[0].split(";");
+        checkPoint.setSourceNamespace(tmp2[0]);
+        checkPoint.setPipelineName(tmp2[1]);
+        checkPoint.setSourceName(tmp2[2]);
+        checkPoint.setQueueId(tmp1[1]);
+        checkPoint.setData(value);
+        checkPoint.setGmtCreate(sourceSnapShot.getGmtCreate());
+        checkPoint.setGmtModified(sourceSnapShot.getGmtModified());
+        return checkPoint;
+
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java
index e29f7d7..f57bcc2 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java
@@ -16,164 +16,84 @@
  */
 package org.apache.rocketmq.streams.common.checkpoint;
 
-import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
-import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
-import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.configurable.IConfigurableIdentification;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
 import org.apache.rocketmq.streams.common.context.MessageOffset;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
 
-public class CheckPointManager {
-    protected IMessageCache<CheckPointMessage> messageCache;
+public class CheckPointManager extends BasedConfigurable {
+
     protected transient Map<String, Long> currentSplitAndLastUpdateTime = new HashMap<>();//保存这个实例处理的分片数
 
     protected transient Map<String, Long> removingSplits = new HashMap<>();//正在删除的分片
 
-    public CheckPointManager() {
-        messageCache = new MessageCache<>(new IMessageFlushCallBack<CheckPointMessage>() {
-            @Override
-            public boolean flushMessage(List<CheckPointMessage> messages) {
-                //合并最近的checkpoint,只存储一次,为了
-                Map<String, SourceState> sourceStateMap = mergeSourceState(messages);
-                saveCheckPoint(sourceStateMap);
-                return true;
-            }
-        });
-        messageCache.openAutoFlush();
-    }
+    protected transient ICheckPointStorage iCheckPointStorage;
 
-    public void flush() {
-        messageCache.flush();
+    public CheckPointManager(){
+        String name = ComponentCreator.getProperties().getProperty(ConfigureFileKey.CHECKPOINT_STORAGE_NAME);
+        iCheckPointStorage = CheckPointStorageFactory.getInstance().getStorage(name);
     }
 
-    public synchronized void addSplit(String splitId) {
-        this.currentSplitAndLastUpdateTime.put(splitId, System.currentTimeMillis());
-    }
 
-    public synchronized void removeSplit(String splitId) {
+    public synchronized void addSplit(String splitId){
+        this.currentSplitAndLastUpdateTime.put(splitId,System.currentTimeMillis());
+    }
+    public synchronized void removeSplit(String splitId){
         this.currentSplitAndLastUpdateTime.remove(splitId);
     }
 
-    public boolean contains(String splitId) {
+    public boolean contains(String splitId){
         return this.currentSplitAndLastUpdateTime.containsKey(splitId);
     }
 
-    /**
-     * 可能有多次的offset合并在一起,对offset合并 合并包含两个逻辑:1.同1个CheckPointMessage中,选择最小的作为本次的offset
-     *
-     * @param messages
-     */
-    protected Map<String, SourceState> mergeSourceState(List<CheckPointMessage> messages) {
-        Map<String, SourceState> sourceStateMap = new HashMap<>();
-        for (CheckPointMessage checkPointMessage : messages) {
-            SourceState sourceState = createSourceState(checkPointMessage);
-            if (sourceState == null) {
-                continue;
-            }
-            String sourceName = sourceState.getSourceName();
-            SourceState existSourceState = sourceStateMap.get(sourceName);
-            if (existSourceState != null) {
-                SourceState lastSourceState = merge(sourceState, existSourceState);
-                sourceStateMap.put(sourceName, lastSourceState);
+    private final List<CheckPoint> fromSourceState(Map<String, SourceState> sourceStateMap){
+
+        List<CheckPoint> checkPoints = new ArrayList<>();
+
+        for(Entry<String, SourceState> entry : sourceStateMap.entrySet()){
+            String key = entry.getKey();
+            SourceState value = entry.getValue();
+            String[] ss = key.split("\\;");
+            assert ss.length == 3 : "key length must be three. format is namespace;pipelineName;sourceName" + key;
+            for(Entry<String, MessageOffset> tmpEntry : value.getQueueId2Offsets().entrySet()){
+                String queueId = tmpEntry.getKey();
+                String offset = tmpEntry.getValue().getMainOffset();
+                CheckPoint checkPoint = new CheckPoint();
+                checkPoint.setSourceNamespace(ss[0]);
+                checkPoint.setPipelineName(ss[1]);
+                checkPoint.setSourceName(ss[2]);
+                checkPoint.setQueueId(queueId);
+                checkPoint.setData(offset);
+                checkPoints.add(checkPoint);
             }
         }
-        return sourceStateMap;
-    }
 
-    /**
-     * 一个pipeline流程中,找最小的offset提交保存
-     *
-     * @param checkPointMessage
-     * @return
-     */
-    protected SourceState createSourceState(CheckPointMessage checkPointMessage) {
-        SourceState sourceState = new SourceState();
-        String pipelineName = null;
-        if (checkPointMessage.getStreamOperator() instanceof IConfigurableIdentification) {
-            IConfigurableIdentification configurable = (IConfigurableIdentification)checkPointMessage.getCheckPointStates();
-            pipelineName = configurable.getConfigureName();
-        }
-        Map<String, MessageOffset> queueId2Offsets = new HashMap<>();
-        sourceState.setSourceName(createSourceName(checkPointMessage.getSource(), pipelineName));
-        sourceState.setQueueId2Offsets(queueId2Offsets);
+        return checkPoints;
 
-        for (CheckPointState checkPointState : checkPointMessage.getCheckPointStates()) {
-            if (checkPointState.isReplyAnyOny()) {
-                continue;
-            }
-            if (checkPointState.isReplyRefuse()) {
-                return null;
-            }
-            for (Entry<String, MessageOffset> entry : checkPointState.getQueueIdAndOffset().entrySet()) {
-                String queueId = entry.getKey();
-                MessageOffset offset = entry.getValue();
-                MessageOffset existOffset = queueId2Offsets.get(queueId);
-                if (existOffset == null) {
-                    queueId2Offsets.put(queueId, offset);
-                } else {
-                    boolean isGreateThan = existOffset.greateThan(offset.getOffsetStr());
-                    if (isGreateThan) {
-                        queueId2Offsets.put(queueId, offset);
-                    } else {
-                        queueId2Offsets.put(queueId, existOffset);
-                    }
-                }
-            }
-        }
-        return sourceState;
     }
 
-    /**
-     * 先查询现在数据源的分片,如果已经不处理的分片,不做保存 否则把结果保存到db中
-     *
-     * @param sourceStateMap
-     */
-    protected void saveCheckPoint(Map<String, SourceState> sourceStateMap) {
-
+    public void addCheckPointMessage(CheckPointMessage message){
+        this.iCheckPointStorage.addCheckPointMessage(message);
     }
 
-    /**
-     * 如果多次的checkpoint在一起,先合并再保存
-     *
-     * @param sourceState
-     * @param existSourceState
-     * @return
-     */
-    protected SourceState merge(SourceState sourceState, SourceState existSourceState) {
-        Iterator<Entry<String, MessageOffset>> it = sourceState.getQueueId2Offsets().entrySet()
-            .iterator();
-        while (it.hasNext()) {
-            Entry<String, MessageOffset> entry = it.next();
-            String queueId = entry.getKey();
-            MessageOffset offset = entry.getValue();
-            MessageOffset existOffset = existSourceState.getQueueId2Offsets().get(queueId);
-            if (existOffset == null) {
-                existSourceState.getQueueId2Offsets().put(queueId, offset);
-            } else {
-                boolean isGreaterThan = offset.greateThan(existOffset.getOffsetStr());
-                if (isGreaterThan) {
-                    existSourceState.getQueueId2Offsets().put(queueId, offset);
-                }
-            }
+    public CheckPoint recover(ISource iSource, ISplit iSplit){
+        String isRecover =  ComponentCreator.getProperties().getProperty(ConfigureFileKey.IS_RECOVER_MODE);
+        if(isRecover != null && Boolean.valueOf(isRecover)){
+            String queueId = iSplit.getQueueId();
+            return iCheckPointStorage.recover(iSource, queueId);
         }
-        return existSourceState;
-    }
-
-    public void addCheckPointMessage(CheckPointMessage message) {
-        this.messageCache.addCache(message);
+        return null;
     }
 
 
-
     public void updateLastUpdate(String queueId) {
         addSplit(queueId);
     }
@@ -182,54 +102,29 @@ public class CheckPointManager {
         return this.currentSplitAndLastUpdateTime.keySet();
     }
 
-    public static class SourceState {
-        protected String sourceName;
-        protected Map<String, MessageOffset> queueId2Offsets = new HashMap<>();
-
-        public String getSourceName() {
-            return sourceName;
-        }
+    public void flush(){
+        iCheckPointStorage.flush();
+    }
 
-        public void setSourceName(String sourceName) {
-            this.sourceName = sourceName;
-        }
 
-        public Map<String, MessageOffset> getQueueId2Offsets() {
-            return queueId2Offsets;
-        }
 
-        public void setQueueId2Offsets(
-            Map<String, MessageOffset> queueId2Offsets) {
-            this.queueId2Offsets = queueId2Offsets;
-        }
-    }
 
     /**
      * 根据source进行划分,主要是针对双流join的场景
-     *
      * @param source
      * @return
      */
-    public static String createSourceName(ISource source, String piplineName) {
-        String namespace = null;
-        String name = null;
-        if (source != null) {
-            namespace = source.getNameSpace();
-            name = source.getConfigureName();
-        }
+    public static String createSourceName(ISource source, String pipelineName){
 
-        if (StringUtil.isEmpty(namespace)) {
-            namespace = "default_namespace";
-        }
-        if (StringUtil.isEmpty(name)) {
-            name = "default_name";
+        if(StringUtil.isNotEmpty(pipelineName)){
+            return MapKeyUtil.createKey(source.createCheckPointName(), pipelineName);
         }
-        if (StringUtil.isEmpty(piplineName)) {
-            piplineName = "default_piplineName";
-        }
-        return MapKeyUtil.createKey(namespace, piplineName, name);
+        return source.createCheckPointName();
     }
 
+
+
+
     public Map<String, Long> getCurrentSplitAndLastUpdateTime() {
         return currentSplitAndLastUpdateTime;
     }
@@ -260,4 +155,43 @@ public class CheckPointManager {
         }
         return true;
     }
+
+    public static final String createCheckPointKey(String key, String queueId){
+        return key + "^^^" + queueId;
+    }
+
+    public static final String[] parseCheckPointKey(String checkPointKey){
+        return checkPointKey.split("\\^\\^\\^");
+    }
+
+    public static final String getNameSpaceFromCheckPointKey(String checkPointKey){
+        return parseCheckPointKey(checkPointKey)[0].split("\\;")[0];
+    }
+
+    public static final String getGroupNameFromCheckPointKey(String checkPointKey){
+        return parseCheckPointKey(checkPointKey)[0].split("\\;")[1];
+    }
+
+    public static final String getNameFromCheckPointKey(String checkPointKey){
+        return parseCheckPointKey(checkPointKey)[0].split("\\;")[2];
+    }
+
+    public static final String getTopicFromCheckPointKey(String checkPointKey){
+        return parseCheckPointKey(checkPointKey)[0].split("\\;")[3];
+    }
+
+    public static final String getQueueIdFromCheckPointKey(String checkPointKey){
+        return parseCheckPointKey(checkPointKey)[1];
+    }
+
+    public static void main(String[] args){
+        SourceSnapShot snapShot = new SourceSnapShot();
+        snapShot.setId(1L);
+        snapShot.setGmtCreate(new Date());
+        snapShot.setGmtModified(new Date());
+        snapShot.setKey("key");
+        snapShot.setValue("value");
+        System.out.println(ReflectUtil.serializeObject(snapShot));
+
+    }
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointMessage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointMessage.java
index 96c510c..ee91d9c 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointMessage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointMessage.java
@@ -27,6 +27,8 @@ public class CheckPointMessage implements ISystemMessage {
     protected IStreamOperator streamOperator;//当前的pipline
     protected List<CheckPointState> checkPointStates = new ArrayList<>();
     protected boolean isValidate = true;
+    protected String pipelineName;
+
 
     public ISource getSource() {
         return source;
@@ -68,4 +70,12 @@ public class CheckPointMessage implements ISystemMessage {
     public boolean isValidate() {
         return isValidate;
     }
+
+    public String getPipelineName() {
+        return pipelineName;
+    }
+
+    public void setPipelineName(String pipelineName) {
+        this.pipelineName = pipelineName;
+    }
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java
new file mode 100644
index 0000000..747b5be
--- /dev/null
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.common.checkpoint;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+/**
+ * @description
+ */
+public class CheckPointStorageFactory {
+
+    private static final Log logger = LogFactory.getLog(CheckPointStorageFactory.class);
+
+    public static final String DEFAULT_CHECKPOINT_TYPE_NAME = "DB";
+
+    private static CheckPointStorageFactory instance;
+
+    private ServiceLoader<ICheckPointStorage> loader;
+
+    private CheckPointStorageFactory() {
+        URLClassLoader classLoader = (URLClassLoader)Thread.currentThread().getContextClassLoader();
+        URL[] urls = classLoader.getURLs();
+        for(URL u : urls){
+            String s = u.toString();
+            if(s.contains("rocketmq-streams")){
+                logger.info(String.format("list class : %s", s));
+            }
+        }
+        loader = ServiceLoader.load(ICheckPointStorage.class);
+    }
+
+    public static CheckPointStorageFactory getInstance() {
+        if (null == instance) {
+            synchronized (CheckPointStorageFactory.class) {
+                if (null == instance) {
+                    instance = new CheckPointStorageFactory();
+                }
+            }
+        }
+        return instance;
+    }
+
+    public ICheckPointStorage getStorage(String name){
+
+        Iterator<ICheckPointStorage> it = loader.iterator();
+        ICheckPointStorage storage = null;
+
+        ICheckPointStorage defaultStorage = null;
+        while(it.hasNext()){
+            ICheckPointStorage local = it.next();
+            if(local.getStorageName().equalsIgnoreCase(name)){
+                return local;
+            }
+            if(local.getStorageName().equalsIgnoreCase(DEFAULT_CHECKPOINT_TYPE_NAME)){
+                defaultStorage = local;
+            }
+        }
+
+        if(storage == null){
+            logger.error(String.format("checkpoint storage name config error, name is %s. use default checkpoint type db.", name));
+            return defaultStorage;
+        }
+        return null;
+    }
+
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java
similarity index 63%
copy from rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java
copy to rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java
index 5319f41..bf60fff 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java
@@ -14,12 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.streams.common.functions;
+package org.apache.rocketmq.streams.common.checkpoint;
+
+
+import org.apache.rocketmq.streams.common.channel.source.ISource;
 
-import java.io.Serializable;
 import java.util.List;
 
-public interface FlatMapFunction <T, O> extends Function{
+/**
+ * @description 负责checkpoint的保存、恢复
+ */
+public interface ICheckPointStorage {
+
+    String TYPE = "checkpoint_storage";
+
+    String getStorageName();
+
+    <T> void save(List<T> checkPointState);
+
+    <T> T recover(ISource iSource, String queueID);
+
+    void flush();
+
+    void addCheckPointMessage(CheckPointMessage message);
 
-    List<T> flatMap(O message) throws Exception;
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java
new file mode 100644
index 0000000..43293d1
--- /dev/null
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.common.checkpoint;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.model.Entity;
+
+import java.io.Serializable;
+
+/**
+ * @create 2021-08-06 16:21:30
+ * @description
+ */
+public class SourceSnapShot extends Entity implements Serializable {
+
+    private static final long serialVersionUID = 4449170945607357658L;
+
+    public final static MetaData snapshotTable = new MetaData();
+
+    static{
+        snapshotTable.setTableName("checkpoint_snapshot");
+        snapshotTable.addMetaDataField("id", "long", false);
+        snapshotTable.addMetaDataField("gmt_create", "DATE", false);
+        snapshotTable.addMetaDataField("gmt_modified", "DATE", false);
+        snapshotTable.addMetaDataField("key", "string", false);
+        snapshotTable.addMetaDataField("value", "string", false);
+    }
+
+    String key;
+
+    String value;
+
+    public String getKey() {
+        return key;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    public void setValue(String value) {
+        this.value = value;
+    }
+
+
+    @Override
+    public String toString() {
+        return "CheckPointSnapShot{" +
+            "key='" + key + '\'' +
+            ", value='" + value + '\'' +
+            '}';
+    }
+
+    public JSONObject toJson(){
+        JSONObject object = new JSONObject();
+        object.put("key", key);
+        object.put("value", value);
+        return object;
+    }
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java
new file mode 100644
index 0000000..a239fdc
--- /dev/null
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.common.checkpoint;
+
+import org.apache.rocketmq.streams.common.context.MessageOffset;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @create 2021-08-11 15:51:50
+ * @description
+ */
+public class SourceState {
+
+    protected String sourceName;
+    protected Map<String, MessageOffset> queueId2Offsets = new HashMap<>();
+
+    public String getSourceName() {
+        return sourceName;
+    }
+
+    public void setSourceName(String sourceName) {
+        this.sourceName = sourceName;
+    }
+
+    public Map<String, MessageOffset> getQueueId2Offsets() {
+        return queueId2Offsets;
+    }
+
+    public void setQueueId2Offsets(
+        Map<String, MessageOffset> queueId2Offsets) {
+        this.queueId2Offsets = queueId2Offsets;
+    }
+
+    @Override
+    public String toString() {
+        return "SourceState{" +
+            "sourceName='" + sourceName + '\'' +
+            ", queueId2Offsets=" + queueId2Offsets +
+            '}';
+    }
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
index cbdb3e4..f060a7a 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
@@ -140,4 +140,10 @@ public interface ConfigureFileKey {
 
     String LEASE_STORAGE_NAME = "DB";//通过这个配置,可以修改lease 的底层存储
 
+    String CHECKPOINT_STORAGE_NAME = "checkPointStorageName";
+
+    String IS_RECOVER_MODE = "isRecover";
+
+    String IS_ATOMIC_DB_SINK = "isAtomicDbSink";
+
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java
index 5319f41..87802b9 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.streams.common.functions;
 import java.io.Serializable;
 import java.util.List;
 
-public interface FlatMapFunction <T, O> extends Function{
+public interface FlatMapFunction <OUT, IN> extends Function, Serializable {
 
-    List<T> flatMap(O message) throws Exception;
+    List<OUT> flatMap(IN message) throws Exception;
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/cachefilter/CacheFilterMeta.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/cachefilter/CacheFilterMeta.java
index ab4612b..bacb7b8 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/cachefilter/CacheFilterMeta.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/cachefilter/CacheFilterMeta.java
@@ -1,6 +1,20 @@
 package org.apache.rocketmq.streams.common.optimization.cachefilter;
-
-import java.util.BitSet;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 import java.util.Map;
 import org.apache.rocketmq.streams.common.cache.compress.BitSetCache;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
diff --git a/rocketmq-streams-configurable/pom.xml b/rocketmq-streams-configurable/pom.xml
index f160060..baacd27 100755
--- a/rocketmq-streams-configurable/pom.xml
+++ b/rocketmq-streams-configurable/pom.xml
@@ -1,4 +1,5 @@
 <?xml version="1.0" encoding="utf-8"?>
+
 <!--
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
diff --git a/rocketmq-streams-state/pom.xml b/rocketmq-streams-state/pom.xml
old mode 100755
new mode 100644