You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/09/18 11:39:17 UTC
[rocketmq-streams] branch async-ck updated: add checkpoint storage
(#69)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch async-ck
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/async-ck by this push:
new 2766990 add checkpoint storage (#69)
2766990 is described below
commit 276699067024103b6f35da4699c44d05848ad0f4
Author: cyril <cw...@gmail.com>
AuthorDate: Sat Sep 18 19:39:13 2021 +0800
add checkpoint storage (#69)
* add channel-db module
* add channel-configurable
* add chinese
* fixed DataStream flatMap
* U
* stash & pre merge
* add license for chackpoint/pom.xml
* add license for checkpoint/pom.xml
* fix license
Co-authored-by: vv <ze...@alibaba-inc.com>
---
README-chinese.md | 106 +++++++++
pom.xml | 1 +
.../pom.xml | 44 +++-
.../streams/checkpoint/db/DBCheckPointStorage.java | 65 ++++++
.../streams/common/channel/AbstractChannel.java | 10 +
.../common/channel/impl/memory/MemoryChannel.java | 5 +
.../streams/common/channel/sink/AbstractSink.java | 2 +-
.../common/channel/source/AbstractSource.java | 53 ++++-
.../streams/common/channel/source/ISource.java | 18 ++
.../checkpoint/AbstractCheckPointStorage.java | 184 +++++++++++++++
.../streams/common/checkpoint/CheckPoint.java | 98 ++++++--
.../common/checkpoint/CheckPointManager.java | 254 ++++++++-------------
.../common/checkpoint/CheckPointMessage.java | 10 +
.../checkpoint/CheckPointStorageFactory.java | 87 +++++++
.../ICheckPointStorage.java} | 24 +-
.../streams/common/checkpoint/SourceSnapShot.java | 79 +++++++
.../streams/common/checkpoint/SourceState.java | 57 +++++
.../streams/common/configure/ConfigureFileKey.java | 6 +
.../streams/common/functions/FlatMapFunction.java | 4 +-
.../optimization/cachefilter/CacheFilterMeta.java | 18 +-
rocketmq-streams-configurable/pom.xml | 1 +
rocketmq-streams-state/pom.xml | 0
22 files changed, 923 insertions(+), 203 deletions(-)
diff --git a/README-chinese.md b/README-chinese.md
new file mode 100644
index 0000000..5d8215f
--- /dev/null
+++ b/README-chinese.md
@@ -0,0 +1,106 @@
+# RocketMQ Streams
+## Features
+
+* 轻量级部署:可以单独部署,也支持集群部署
+* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等
+
+## DataStream Example
+
+```java
+import org.apache.rocketmq.streams.client.transform.DataStream;
+
+DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
+
+ source
+ .fromFile("~/admin/data/text.txt",false)
+ .map(message->message)
+ .toPrint(1)
+ .start();
+```
+
+## Maven Repository
+
+```xml
+
+<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-clients</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+</dependency>
+```
+
+# Core API
+
+rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求;
+
+## StreamBuilder
+
+StreamBuilder 用于构建流任务的源; 内部包含```dataStream()```和```tableStream()```俩个方法,分别返回DataStreamSource和TableStreamSource俩个源;
+
++ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务;
++ [tableStream(nameSpaceName,pipelineName)]()返回TableStreamSource实例, 用于脚本编程实现流计算任务;
+
+## DataStream API
+
+### Source
+
+DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;
+
++ ```fromFile``` 从文件中读取数据, 该方法包含俩个参数
+ + ```filePath``` 文件路径,必填参数
+ + ```isJsonData``` 是否json数据, 非必填参数, 默认为```true```
+
+
++ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
+ + ```topic``` rocketmq消息队列的topic名称,必填参数
+ + ```groupName``` 消费者组的名称,必填参数
+ + ```isJson``` 是否json格式,非必填参数
+ + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
+
++ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源
+
+### transform
+
+transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类;
+
+#### DataStream
+
+DataStream实现了一系列常见的流计算算子
+
++ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
++ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
++ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
++ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
++ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
++ ```operate``` 对每个记录执行一次自定义的函数,返回一个新的DataStream
++ ```script``` 针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
++ ```toPrint``` 将结果在控制台打印,生成新的DataStreamAction实例
++ ```toFile``` 将结果保存为文件,生成一个新的DataStreamAction实例
++ ```toDB``` 将结果保存到数据库
++ ```toRocketmq``` 将结果输出到rocketmq
++ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
++ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
+ + ```count``` 在窗口内计数
+ + ```min``` 获取窗口内统计值的最小值
+ + ```max``` 获取窗口内统计值得最大值
+ + ```avg``` 获取窗口内统计值的平均值
+ + ```sum``` 获取窗口内统计值的加和值
+ + ```reduce``` 在窗口内进行自定义的汇总运算
++ ```join``` 根据条件将将俩个流进行关联, 合并为一个大流进行相关的运算
++ ```union``` 将俩个流进行合并
++ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
++ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等
+
+# Strategy
+
+策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型;
+
+```java
+//指定checkpoint的存储策略
+source
+ .fromRocketmq("TSG_META_INFO","")
+ .map(message->message+"--")
+ .toPrint(1)
+ .with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
+ .start();
+```
diff --git a/pom.xml b/pom.xml
index 7bbe1f4..6bdd9ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
<module>rocketmq-streams-channel-http</module>
<module>rocketmq-streams-state</module>
<module>rocketmq-streams-examples</module>
+ <module>rocketmq-streams-checkpoint</module>
</modules>
diff --git a/rocketmq-streams-configurable/pom.xml b/rocketmq-streams-checkpoint/pom.xml
old mode 100755
new mode 100644
similarity index 54%
copy from rocketmq-streams-configurable/pom.xml
copy to rocketmq-streams-checkpoint/pom.xml
index f160060..40a28c0
--- a/rocketmq-streams-configurable/pom.xml
+++ b/rocketmq-streams-checkpoint/pom.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>
+<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -18,19 +18,49 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
- <artifactId>rocketmq-streams-configurable</artifactId>
- <name>ROCKETMQ STREAMS :: configurable</name>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-streams-checkpoint</artifactId>
+ <name>ROCKETMQ STREAMS :: checkpoint</name>
<packaging>jar</packaging>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-streams-serviceloader</artifactId>
+ <artifactId>rocketmq-streams-commons</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-db-operator</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <optional>true</optional>
</dependency>
</dependencies>
-</project>
+
+</project>
\ No newline at end of file
diff --git a/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java b/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
new file mode 100644
index 0000000..df1d046
--- /dev/null
+++ b/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.checkpoint.db;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.checkpoint.AbstractCheckPointStorage;
+import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
+import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
+import org.apache.rocketmq.streams.common.checkpoint.SourceSnapShot;
+import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
+
+import java.util.List;
+
+/**
+ * @description
+ */
+public class DBCheckPointStorage extends AbstractCheckPointStorage {
+
+ static final Log logger = LogFactory.getLog(DBCheckPointStorage.class);
+
+ static final String STORAGE_NAME = "DB";
+
+ public DBCheckPointStorage(){
+
+ }
+
+ @Override
+ public String getStorageName() {
+ return STORAGE_NAME;
+ }
+
+ @Override
+ public <T> void save(List<T> checkPointState) {
+ logger.info(String.format("save checkpoint size %d", checkPointState.size()));
+ ORMUtil.batchReplaceInto(checkPointState);
+ }
+
+ @Override
+ //todo
+ public CheckPoint recover(ISource iSource, String queueId) {
+ String sourceName = CheckPointManager.createSourceName(iSource, null);
+ String key = CheckPointManager.createCheckPointKey(sourceName, queueId);
+ String sql = "select * from source_snap_shot where `key` = " + "'" + key + "';";
+ SourceSnapShot snapShot = ORMUtil.queryForObject(sql, null, SourceSnapShot.class);
+
+ logger.info(String.format("checkpoint recover key is %s, sql is %s, recover sourceSnapShot : %s", key, sql, snapShot == null ? "null snapShot" : snapShot.toString()));
+ return new CheckPoint().fromSnapShot(snapShot);
+ }
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java
index 74ff5f4..7765669 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java
@@ -227,4 +227,14 @@ public abstract class AbstractChannel extends BasedConfigurable implements IChan
create();
((AbstractSource)source).setJsonData(isJsonData);
}
+
+ @Override
+ public String getTopic(){
+ return source.getTopic();
+ }
+
+ @Override
+ public void setTopic(String topic){
+ source.setTopic(topic);
+ }
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java
index 06dff1c..f3d06ee 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java
@@ -74,4 +74,9 @@ public class MemoryChannel extends AbstractChannel {
}
};
}
+
+ @Override
+ public String createCheckPointName() {
+ return "memory-source";
+ }
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
index 82218c2..aa7c1bc 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
@@ -32,8 +32,8 @@ import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MultiSplitMessa
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
-import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager.SourceState;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
+import org.apache.rocketmq.streams.common.checkpoint.SourceState;
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
import org.apache.rocketmq.streams.common.configurable.IConfigurableIdentification;
import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
index f5de98b..bf45e7e 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.streams.common.context.MessageHeader;
import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
/**
@@ -75,6 +76,7 @@ public abstract class AbstractSource extends BasedConfigurable implements ISourc
protected List<String> logFingerprintFields;//log fingerprint to filter msg quickly
+
/**
* 数据源投递消息的算子,此算子用来接收source的数据,做处理
*/
@@ -223,18 +225,22 @@ public abstract class AbstractSource extends BasedConfigurable implements ISourc
return createJson(message);
}
-
+ /**
+ * 交给receiver执行后续逻辑
+ *
+ * @param channelMessage
+ * @return
+ */
public AbstractContext executeMessage(Message channelMessage) {
AbstractContext context = new Context(channelMessage);
- if (!channelMessage.getHeader().isSystemMessage()) {
- messageQueueChangedCheck(channelMessage.getHeader());
- }
-
if (isSplitInRemoving(channelMessage)) {
return context;
}
+ if (!channelMessage.getHeader().isSystemMessage()) {
+ messageQueueChangedCheck(channelMessage.getHeader());
+ }
- boolean needFlush = !channelMessage.getHeader().isSystemMessage() && channelMessage.getHeader().isNeedFlush();
+ boolean needFlush = channelMessage.getHeader().isSystemMessage() == false && channelMessage.getHeader().isNeedFlush();
if (receiver != null) {
receiver.doMessage(channelMessage, context);
@@ -277,6 +283,9 @@ public abstract class AbstractSource extends BasedConfigurable implements ISourc
* @param header
*/
protected void messageQueueChangedCheck(MessageHeader header) {
+ if (supportNewSplitFind() && supportRemoveSplitFind()) {
+ return;
+ }
Set<String> queueIds = new HashSet<>();
String msgQueueId = header.getQueueId();
if (StringUtil.isNotEmpty(msgQueueId)) {
@@ -287,7 +296,7 @@ public abstract class AbstractSource extends BasedConfigurable implements ISourc
queueIds.addAll(checkpointQueueIds);
}
Set<String> newQueueIds = new HashSet<>();
-
+ Set<String> removeQueueIds = new HashSet<>();
for (String queueId : queueIds) {
if (isNotDataSplit(queueId)) {
continue;
@@ -536,4 +545,34 @@ public abstract class AbstractSource extends BasedConfigurable implements ISourc
return isBatchMessage;
}
+ @Override
+ public String createCheckPointName(){
+
+ ISource source = this;
+
+ String namespace = source.getNameSpace();
+ String name = source.getConfigureName();
+ String groupName = source.getGroupName();
+
+
+ if(StringUtil.isEmpty(namespace)){
+ namespace = "default_namespace";
+ }
+
+ if(StringUtil.isEmpty(name)){
+ name = "default_name";
+ }
+
+ if(StringUtil.isEmpty(groupName)){
+ groupName = "default_groupName";
+ }
+ String topic = source.getTopic();
+ if(topic == null || topic.trim().length() == 0){
+ topic = "default_topic";
+ }
+ return MapKeyUtil.createKey(namespace, groupName, topic, name);
+
+ }
+
+
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/ISource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/ISource.java
index 4b9f55f..d3f6802 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/ISource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/ISource.java
@@ -74,4 +74,22 @@ public interface ISource<T extends ISource> extends IConfigurable, IStageBuilder
*/
long getCheckpointTime();
+ /**
+ *
+ * @param topic
+ */
+ void setTopic(String topic);
+
+ /**
+ *
+ * @return
+ */
+ String getTopic();
+
+ /**
+ * 创建checkpoint名字
+ * @return checkpoint key name
+ */
+ String createCheckPointName();
+
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java
new file mode 100644
index 0000000..fad0ae1
--- /dev/null
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.common.checkpoint;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
+import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
+import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
+import org.apache.rocketmq.streams.common.context.MessageOffset;
+
+import java.util.*;
+
+public abstract class AbstractCheckPointStorage implements ICheckPointStorage{
+
+ static final Log logger = LogFactory.getLog(AbstractCheckPointStorage.class);
+
+ protected transient IMessageCache<CheckPointMessage> messageCache;
+
+ public AbstractCheckPointStorage(){
+ messageCache = new MessageCache<>(new IMessageFlushCallBack<CheckPointMessage>() {
+ @Override
+ public boolean flushMessage(List<CheckPointMessage> messages) {
+ //合并最近的checkpoint,只存储一次
+ // key 为 sourceName, Value中的map, k : v = queueid : offset
+ Map<String, SourceState> sourceStateMap = mergeSourceState(messages);
+
+ logger.info(String.format("flushMessage raw size %d, merge size %d", messages.size(), sourceStateMap.size()));
+ logger.info("flushMessage : " + messages.get(0).getCheckPointStates().get(0).getQueueIdAndOffset().toString());
+
+ saveCheckPoint(sourceStateMap);
+ return true;
+ }
+ });
+ ((MessageCache)messageCache).setAutoFlushSize(10);
+ ((MessageCache)messageCache).setAutoFlushTimeGap(500);
+ messageCache.openAutoFlush();
+ }
+
+ public void flush(){
+ messageCache.flush();
+ }
+
+
+ /**
+ * 可能有多次的offset合并在一起,对offset合并
+ * 合并包含两个逻辑:1.同1个CheckPointMessage中,选择最小的作为本次的offset
+ * @param messages
+ */
+ protected Map<String, SourceState> mergeSourceState(List<CheckPointMessage> messages) {
+ Map<String,SourceState> sourceStateMap = new HashMap<>();
+ for(CheckPointMessage checkPointMessage:messages){
+ SourceState sourceState = createSourceState(checkPointMessage);
+ if(sourceState == null){
+ continue;
+ }
+ String sourceName = sourceState.getSourceName();
+ SourceState existSourceState = sourceStateMap.get(sourceName);
+ SourceState lastSourceState = sourceState;
+ if(existSourceState != null){
+ lastSourceState = merge(sourceState,existSourceState);
+ }
+ sourceStateMap.put(sourceName,lastSourceState);
+ }
+ return sourceStateMap;
+ }
+
+ /**
+ * 如果多次的checkpoint在一起,先合并再保存
+ * @param sourceState
+ * @param existSourceState
+ * @return
+ */
+ protected SourceState merge(SourceState sourceState, SourceState existSourceState) {
+ Iterator<Map.Entry<String, MessageOffset>> it = sourceState.getQueueId2Offsets().entrySet()
+ .iterator();
+ while (it.hasNext()){
+ Map.Entry<String, MessageOffset> entry=it.next();
+ String queueId = entry.getKey();
+ MessageOffset offset = entry.getValue();
+ MessageOffset existOffset = existSourceState.getQueueId2Offsets().get(queueId);
+ if(existOffset == null){
+ existSourceState.getQueueId2Offsets().put(queueId,offset);
+ }else {
+ boolean isGreateThan = offset.greateThan(existOffset.getOffsetStr());
+ if(isGreateThan){
+ existSourceState.getQueueId2Offsets().put(queueId,offset);
+ }
+ }
+ }
+ return existSourceState;
+ }
+
+ /**
+ * 一个pipeline流程中,找最小的offset提交保存
+ * @param checkPointMessage
+ * @return
+ */
+ protected SourceState createSourceState(CheckPointMessage checkPointMessage) {
+ SourceState sourceState = new SourceState();
+ String pipelineName = checkPointMessage.getPipelineName();
+
+ Map<String, MessageOffset> queueId2Offsets = new HashMap<>();
+ sourceState.setSourceName(CheckPointManager.createSourceName(checkPointMessage.getSource(),pipelineName));
+ sourceState.setQueueId2Offsets(queueId2Offsets);
+
+ for(CheckPointState checkPointState:checkPointMessage.getCheckPointStates()){
+
+ if(checkPointState.isReplyAnyOny()){
+ continue;
+ }
+ if(checkPointState.isReplyRefuse()){
+ return null;
+ }
+ Iterator<Map.Entry<String, MessageOffset>> it = checkPointState.getQueueIdAndOffset().entrySet()
+ .iterator();
+ while (it.hasNext()){
+ Map.Entry<String, MessageOffset> entry=it.next();
+ String queueId=entry.getKey();
+ MessageOffset offset = entry.getValue();
+ MessageOffset existOffset = queueId2Offsets.get(queueId);
+ if(existOffset==null){
+ queueId2Offsets.put(queueId,offset);
+ }else {
+ boolean isGreateThan=existOffset.greateThan(offset.getOffsetStr());
+ if(isGreateThan){
+ queueId2Offsets.put(queueId,offset);
+ }else {
+ queueId2Offsets.put(queueId,existOffset);
+ }
+ }
+ }
+ }
+ return sourceState;
+ }
+
+ /**
+ * 先查询现在数据源的分片,如果已经不处理的分片,不做保存
+ * 否则把结果保存到db中
+ * @param sourceStateMap
+ */
+ protected void saveCheckPoint(Map<String, SourceState> sourceStateMap) {
+
+ List<SourceSnapShot> checkPoints = new ArrayList<>();
+
+ for(SourceState sourceState:sourceStateMap.values()){
+ for(Map.Entry<String, MessageOffset> entry : sourceState.getQueueId2Offsets().entrySet()){
+ CheckPoint checkPoint = new CheckPoint();
+ checkPoint.setSourceName(sourceState.getSourceName());
+ checkPoint.setQueueId(entry.getKey());
+ checkPoint.setData(entry.getValue().getMainOffset());
+ checkPoint.setGmtCreate(new Date());
+ checkPoint.setGmtModified(new Date());
+ SourceSnapShot object = checkPoint.toSnapShot();
+ checkPoints.add(object);
+
+ }
+ }
+ save(checkPoints);
+ }
+
+ public void addCheckPointMessage(CheckPointMessage message){
+ List<CheckPointState> states = message.getCheckPointStates();
+ for(CheckPointState state : states){
+ logger.debug(String.format("addCheckPointMessage states %s", state.getQueueIdAndOffset().toString()));
+ }
+ messageCache.addCache(message);
+ }
+
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java
index 4eb7211..8bcedbd 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java
@@ -16,28 +16,24 @@
*/
package org.apache.rocketmq.streams.common.checkpoint;
+import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.model.Entity;
/**
* model for checkpoint,need save in store
*/
-public class CheckPoint extends Entity {
+public class CheckPoint<T> extends Entity {
+
protected String sourceNamespace;
+ protected String pipelineName;
protected String sourceName;
protected String queueId;
- protected String offset;
+ protected String topic;
- public static String loadOffset(ISource source, String splitId) {
- return null;
- //Set<String> splits=new HashSet<>();
- //splits.add(splitId);
- //Map<String,String> queueId2Offset=loadOffset(source,splits);
- //if(queueId2Offset==null||queueId2Offset.containsKey(splitId)==false){
- // return null;
- //}
- //return queueId2Offset.get(splitId);
- }
+ protected T data;
+
+ protected JSONObject content;
public String getQueueId() {
return queueId;
@@ -47,13 +43,6 @@ public class CheckPoint extends Entity {
this.queueId = queueId;
}
- public String getOffset() {
- return offset;
- }
-
- public void setOffset(String offset) {
- this.offset = offset;
- }
public String getSourceNamespace() {
return sourceNamespace;
@@ -70,4 +59,73 @@ public class CheckPoint extends Entity {
public void setSourceName(String sourceName) {
this.sourceName = sourceName;
}
-}
+
+ public String getPipelineName() {
+ return pipelineName;
+ }
+
+ public void setPipelineName(String pipelineName) {
+ this.pipelineName = pipelineName;
+ }
+
+ public JSONObject getContent() {
+ return content;
+ }
+
+ public void setContent(JSONObject content) {
+ this.content = content;
+ }
+
+ public T getData() {
+ return data;
+ }
+
+ public void setData(T data) {
+ this.data = data;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public SourceSnapShot toSnapShot(){
+ SourceSnapShot snapShot = new SourceSnapShot();
+ snapShot.setGmtCreate(gmtCreate);
+ snapShot.setGmtModified(gmtModified);
+ snapShot.setKey(CheckPointManager.createCheckPointKey(sourceName, queueId));
+ if(content == null){
+ content = new JSONObject();
+ }
+ content.put("offset", data);
+
+ snapShot.setValue(content.toString());
+ return snapShot;
+
+ }
+
+ public CheckPoint fromSnapShot(SourceSnapShot sourceSnapShot){
+
+ if(sourceSnapShot == null){
+ return null;
+ }
+
+ String key = sourceSnapShot.getKey();
+ String value = sourceSnapShot.getValue();
+ CheckPoint<String> checkPoint = new CheckPoint<>();
+ String[] tmp1 = CheckPointManager.parseCheckPointKey(key);
+ String[] tmp2 = tmp1[0].split(";");
+ checkPoint.setSourceNamespace(tmp2[0]);
+ checkPoint.setPipelineName(tmp2[1]);
+ checkPoint.setSourceName(tmp2[2]);
+ checkPoint.setQueueId(tmp1[1]);
+ checkPoint.setData(value);
+ checkPoint.setGmtCreate(sourceSnapShot.getGmtCreate());
+ checkPoint.setGmtModified(sourceSnapShot.getGmtModified());
+ return checkPoint;
+
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java
index e29f7d7..f57bcc2 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java
@@ -16,164 +16,84 @@
*/
package org.apache.rocketmq.streams.common.checkpoint;
-import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
-import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
-import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.configurable.IConfigurableIdentification;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
-public class CheckPointManager {
- protected IMessageCache<CheckPointMessage> messageCache;
+public class CheckPointManager extends BasedConfigurable {
+
protected transient Map<String, Long> currentSplitAndLastUpdateTime = new HashMap<>();//保存这个实例处理的分片数
protected transient Map<String, Long> removingSplits = new HashMap<>();//正在删除的分片
- public CheckPointManager() {
- messageCache = new MessageCache<>(new IMessageFlushCallBack<CheckPointMessage>() {
- @Override
- public boolean flushMessage(List<CheckPointMessage> messages) {
- //合并最近的checkpoint,只存储一次,为了
- Map<String, SourceState> sourceStateMap = mergeSourceState(messages);
- saveCheckPoint(sourceStateMap);
- return true;
- }
- });
- messageCache.openAutoFlush();
- }
+ protected transient ICheckPointStorage iCheckPointStorage;
- public void flush() {
- messageCache.flush();
+ public CheckPointManager(){
+ String name = ComponentCreator.getProperties().getProperty(ConfigureFileKey.CHECKPOINT_STORAGE_NAME);
+ iCheckPointStorage = CheckPointStorageFactory.getInstance().getStorage(name);
}
- public synchronized void addSplit(String splitId) {
- this.currentSplitAndLastUpdateTime.put(splitId, System.currentTimeMillis());
- }
- public synchronized void removeSplit(String splitId) {
+ public synchronized void addSplit(String splitId){
+ this.currentSplitAndLastUpdateTime.put(splitId,System.currentTimeMillis());
+ }
+ public synchronized void removeSplit(String splitId){
this.currentSplitAndLastUpdateTime.remove(splitId);
}
- public boolean contains(String splitId) {
+ public boolean contains(String splitId){
return this.currentSplitAndLastUpdateTime.containsKey(splitId);
}
- /**
- * 可能有多次的offset合并在一起,对offset合并 合并包含两个逻辑:1.同1个CheckPointMessage中,选择最小的作为本次的offset
- *
- * @param messages
- */
- protected Map<String, SourceState> mergeSourceState(List<CheckPointMessage> messages) {
- Map<String, SourceState> sourceStateMap = new HashMap<>();
- for (CheckPointMessage checkPointMessage : messages) {
- SourceState sourceState = createSourceState(checkPointMessage);
- if (sourceState == null) {
- continue;
- }
- String sourceName = sourceState.getSourceName();
- SourceState existSourceState = sourceStateMap.get(sourceName);
- if (existSourceState != null) {
- SourceState lastSourceState = merge(sourceState, existSourceState);
- sourceStateMap.put(sourceName, lastSourceState);
+ private final List<CheckPoint> fromSourceState(Map<String, SourceState> sourceStateMap){
+
+ List<CheckPoint> checkPoints = new ArrayList<>();
+
+ for(Entry<String, SourceState> entry : sourceStateMap.entrySet()){
+ String key = entry.getKey();
+ SourceState value = entry.getValue();
+ String[] ss = key.split("\\;");
+ assert ss.length == 3 : "key length must be three. format is namespace;pipelineName;sourceName" + key;
+ for(Entry<String, MessageOffset> tmpEntry : value.getQueueId2Offsets().entrySet()){
+ String queueId = tmpEntry.getKey();
+ String offset = tmpEntry.getValue().getMainOffset();
+ CheckPoint checkPoint = new CheckPoint();
+ checkPoint.setSourceNamespace(ss[0]);
+ checkPoint.setPipelineName(ss[1]);
+ checkPoint.setSourceName(ss[2]);
+ checkPoint.setQueueId(queueId);
+ checkPoint.setData(offset);
+ checkPoints.add(checkPoint);
}
}
- return sourceStateMap;
- }
- /**
- * 一个pipeline流程中,找最小的offset提交保存
- *
- * @param checkPointMessage
- * @return
- */
- protected SourceState createSourceState(CheckPointMessage checkPointMessage) {
- SourceState sourceState = new SourceState();
- String pipelineName = null;
- if (checkPointMessage.getStreamOperator() instanceof IConfigurableIdentification) {
- IConfigurableIdentification configurable = (IConfigurableIdentification)checkPointMessage.getCheckPointStates();
- pipelineName = configurable.getConfigureName();
- }
- Map<String, MessageOffset> queueId2Offsets = new HashMap<>();
- sourceState.setSourceName(createSourceName(checkPointMessage.getSource(), pipelineName));
- sourceState.setQueueId2Offsets(queueId2Offsets);
+ return checkPoints;
- for (CheckPointState checkPointState : checkPointMessage.getCheckPointStates()) {
- if (checkPointState.isReplyAnyOny()) {
- continue;
- }
- if (checkPointState.isReplyRefuse()) {
- return null;
- }
- for (Entry<String, MessageOffset> entry : checkPointState.getQueueIdAndOffset().entrySet()) {
- String queueId = entry.getKey();
- MessageOffset offset = entry.getValue();
- MessageOffset existOffset = queueId2Offsets.get(queueId);
- if (existOffset == null) {
- queueId2Offsets.put(queueId, offset);
- } else {
- boolean isGreateThan = existOffset.greateThan(offset.getOffsetStr());
- if (isGreateThan) {
- queueId2Offsets.put(queueId, offset);
- } else {
- queueId2Offsets.put(queueId, existOffset);
- }
- }
- }
- }
- return sourceState;
}
- /**
- * 先查询现在数据源的分片,如果已经不处理的分片,不做保存 否则把结果保存到db中
- *
- * @param sourceStateMap
- */
- protected void saveCheckPoint(Map<String, SourceState> sourceStateMap) {
-
+ public void addCheckPointMessage(CheckPointMessage message){
+ this.iCheckPointStorage.addCheckPointMessage(message);
}
- /**
- * 如果多次的checkpoint在一起,先合并再保存
- *
- * @param sourceState
- * @param existSourceState
- * @return
- */
- protected SourceState merge(SourceState sourceState, SourceState existSourceState) {
- Iterator<Entry<String, MessageOffset>> it = sourceState.getQueueId2Offsets().entrySet()
- .iterator();
- while (it.hasNext()) {
- Entry<String, MessageOffset> entry = it.next();
- String queueId = entry.getKey();
- MessageOffset offset = entry.getValue();
- MessageOffset existOffset = existSourceState.getQueueId2Offsets().get(queueId);
- if (existOffset == null) {
- existSourceState.getQueueId2Offsets().put(queueId, offset);
- } else {
- boolean isGreaterThan = offset.greateThan(existOffset.getOffsetStr());
- if (isGreaterThan) {
- existSourceState.getQueueId2Offsets().put(queueId, offset);
- }
- }
+ public CheckPoint recover(ISource iSource, ISplit iSplit){
+ String isRecover = ComponentCreator.getProperties().getProperty(ConfigureFileKey.IS_RECOVER_MODE);
+ if(isRecover != null && Boolean.valueOf(isRecover)){
+ String queueId = iSplit.getQueueId();
+ return iCheckPointStorage.recover(iSource, queueId);
}
- return existSourceState;
- }
-
- public void addCheckPointMessage(CheckPointMessage message) {
- this.messageCache.addCache(message);
+ return null;
}
-
public void updateLastUpdate(String queueId) {
addSplit(queueId);
}
@@ -182,54 +102,29 @@ public class CheckPointManager {
return this.currentSplitAndLastUpdateTime.keySet();
}
- public static class SourceState {
- protected String sourceName;
- protected Map<String, MessageOffset> queueId2Offsets = new HashMap<>();
-
- public String getSourceName() {
- return sourceName;
- }
+ public void flush(){
+ iCheckPointStorage.flush();
+ }
- public void setSourceName(String sourceName) {
- this.sourceName = sourceName;
- }
- public Map<String, MessageOffset> getQueueId2Offsets() {
- return queueId2Offsets;
- }
- public void setQueueId2Offsets(
- Map<String, MessageOffset> queueId2Offsets) {
- this.queueId2Offsets = queueId2Offsets;
- }
- }
/**
* 根据source进行划分,主要是针对双流join的场景
- *
* @param source
* @return
*/
- public static String createSourceName(ISource source, String piplineName) {
- String namespace = null;
- String name = null;
- if (source != null) {
- namespace = source.getNameSpace();
- name = source.getConfigureName();
- }
+ public static String createSourceName(ISource source, String pipelineName){
- if (StringUtil.isEmpty(namespace)) {
- namespace = "default_namespace";
- }
- if (StringUtil.isEmpty(name)) {
- name = "default_name";
+ if(StringUtil.isNotEmpty(pipelineName)){
+ return MapKeyUtil.createKey(source.createCheckPointName(), pipelineName);
}
- if (StringUtil.isEmpty(piplineName)) {
- piplineName = "default_piplineName";
- }
- return MapKeyUtil.createKey(namespace, piplineName, name);
+ return source.createCheckPointName();
}
+
+
+
public Map<String, Long> getCurrentSplitAndLastUpdateTime() {
return currentSplitAndLastUpdateTime;
}
@@ -260,4 +155,43 @@ public class CheckPointManager {
}
return true;
}
+
+ public static final String createCheckPointKey(String key, String queueId){
+ return key + "^^^" + queueId;
+ }
+
+ public static final String[] parseCheckPointKey(String checkPointKey){
+ return checkPointKey.split("\\^\\^\\^");
+ }
+
+ public static final String getNameSpaceFromCheckPointKey(String checkPointKey){
+ return parseCheckPointKey(checkPointKey)[0].split("\\;")[0];
+ }
+
+ public static final String getGroupNameFromCheckPointKey(String checkPointKey){
+ return parseCheckPointKey(checkPointKey)[0].split("\\;")[1];
+ }
+
+ public static final String getNameFromCheckPointKey(String checkPointKey){
+ return parseCheckPointKey(checkPointKey)[0].split("\\;")[2];
+ }
+
+ public static final String getTopicFromCheckPointKey(String checkPointKey){
+ return parseCheckPointKey(checkPointKey)[0].split("\\;")[3];
+ }
+
+ public static final String getQueueIdFromCheckPointKey(String checkPointKey){
+ return parseCheckPointKey(checkPointKey)[1];
+ }
+
+ public static void main(String[] args){
+ SourceSnapShot snapShot = new SourceSnapShot();
+ snapShot.setId(1L);
+ snapShot.setGmtCreate(new Date());
+ snapShot.setGmtModified(new Date());
+ snapShot.setKey("key");
+ snapShot.setValue("value");
+ System.out.println(ReflectUtil.serializeObject(snapShot));
+
+ }
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointMessage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointMessage.java
index 96c510c..ee91d9c 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointMessage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointMessage.java
@@ -27,6 +27,8 @@ public class CheckPointMessage implements ISystemMessage {
protected IStreamOperator streamOperator;//当前的pipline
protected List<CheckPointState> checkPointStates = new ArrayList<>();
protected boolean isValidate = true;
+ protected String pipelineName;
+
public ISource getSource() {
return source;
@@ -68,4 +70,12 @@ public class CheckPointMessage implements ISystemMessage {
public boolean isValidate() {
return isValidate;
}
+
+ public String getPipelineName() {
+ return pipelineName;
+ }
+
+ public void setPipelineName(String pipelineName) {
+ this.pipelineName = pipelineName;
+ }
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java
new file mode 100644
index 0000000..747b5be
--- /dev/null
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.common.checkpoint;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+/**
+ * @description
+ */
+public class CheckPointStorageFactory {
+
+ private static final Log logger = LogFactory.getLog(CheckPointStorageFactory.class);
+
+ public static final String DEFAULT_CHECKPOINT_TYPE_NAME = "DB";
+
+ private static CheckPointStorageFactory instance;
+
+ private ServiceLoader<ICheckPointStorage> loader;
+
+ private CheckPointStorageFactory() {
+ URLClassLoader classLoader = (URLClassLoader)Thread.currentThread().getContextClassLoader();
+ URL[] urls = classLoader.getURLs();
+ for(URL u : urls){
+ String s = u.toString();
+ if(s.contains("rocketmq-streams")){
+ logger.info(String.format("list class : %s", s));
+ }
+ }
+ loader = ServiceLoader.load(ICheckPointStorage.class);
+ }
+
+ public static CheckPointStorageFactory getInstance() {
+ if (null == instance) {
+ synchronized (CheckPointStorageFactory.class) {
+ if (null == instance) {
+ instance = new CheckPointStorageFactory();
+ }
+ }
+ }
+ return instance;
+ }
+
+ public ICheckPointStorage getStorage(String name){
+
+ Iterator<ICheckPointStorage> it = loader.iterator();
+ ICheckPointStorage storage = null;
+
+ ICheckPointStorage defaultStorage = null;
+ while(it.hasNext()){
+ ICheckPointStorage local = it.next();
+ if(local.getStorageName().equalsIgnoreCase(name)){
+ return local;
+ }
+ if(local.getStorageName().equalsIgnoreCase(DEFAULT_CHECKPOINT_TYPE_NAME)){
+ defaultStorage = local;
+ }
+ }
+
+ if(storage == null){
+ logger.error(String.format("checkpoint storage name config error, name is %s. use default checkpoint type db.", name));
+ return defaultStorage;
+ }
+ return null;
+ }
+
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java
similarity index 63%
copy from rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java
copy to rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java
index 5319f41..bf60fff 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java
@@ -14,12 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.streams.common.functions;
+package org.apache.rocketmq.streams.common.checkpoint;
+
+
+import org.apache.rocketmq.streams.common.channel.source.ISource;
-import java.io.Serializable;
import java.util.List;
-public interface FlatMapFunction <T, O> extends Function{
+/**
+ * @description 负责checkpoint的保存、恢复
+ */
+public interface ICheckPointStorage {
+
+ String TYPE = "checkpoint_storage";
+
+ String getStorageName();
+
+ <T> void save(List<T> checkPointState);
+
+ <T> T recover(ISource iSource, String queueID);
+
+ void flush();
+
+ void addCheckPointMessage(CheckPointMessage message);
- List<T> flatMap(O message) throws Exception;
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java
new file mode 100644
index 0000000..43293d1
--- /dev/null
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.common.checkpoint;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.model.Entity;
+
+import java.io.Serializable;
+
+/**
+ * @create 2021-08-06 16:21:30
+ * @description
+ */
+public class SourceSnapShot extends Entity implements Serializable {
+
+ private static final long serialVersionUID = 4449170945607357658L;
+
+ public final static MetaData snapshotTable = new MetaData();
+
+ static{
+ snapshotTable.setTableName("checkpoint_snapshot");
+ snapshotTable.addMetaDataField("id", "long", false);
+ snapshotTable.addMetaDataField("gmt_create", "DATE", false);
+ snapshotTable.addMetaDataField("gmt_modified", "DATE", false);
+ snapshotTable.addMetaDataField("key", "string", false);
+ snapshotTable.addMetaDataField("value", "string", false);
+ }
+
+ String key;
+
+ String value;
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+
+ @Override
+ public String toString() {
+ return "CheckPointSnapShot{" +
+ "key='" + key + '\'' +
+ ", value='" + value + '\'' +
+ '}';
+ }
+
+ public JSONObject toJson(){
+ JSONObject object = new JSONObject();
+ object.put("key", key);
+ object.put("value", value);
+ return object;
+ }
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java
new file mode 100644
index 0000000..a239fdc
--- /dev/null
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.common.checkpoint;
+
+import org.apache.rocketmq.streams.common.context.MessageOffset;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @create 2021-08-11 15:51:50
+ * @description
+ */
+public class SourceState {
+
+ protected String sourceName;
+ protected Map<String, MessageOffset> queueId2Offsets = new HashMap<>();
+
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public void setSourceName(String sourceName) {
+ this.sourceName = sourceName;
+ }
+
+ public Map<String, MessageOffset> getQueueId2Offsets() {
+ return queueId2Offsets;
+ }
+
+ public void setQueueId2Offsets(
+ Map<String, MessageOffset> queueId2Offsets) {
+ this.queueId2Offsets = queueId2Offsets;
+ }
+
+ @Override
+ public String toString() {
+ return "SourceState{" +
+ "sourceName='" + sourceName + '\'' +
+ ", queueId2Offsets=" + queueId2Offsets +
+ '}';
+ }
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
index cbdb3e4..f060a7a 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
@@ -140,4 +140,10 @@ public interface ConfigureFileKey {
String LEASE_STORAGE_NAME = "DB";//通过这个配置,可以修改lease 的底层存储
+ String CHECKPOINT_STORAGE_NAME = "checkPointStorageName";
+
+ String IS_RECOVER_MODE = "isRecover";
+
+ String IS_ATOMIC_DB_SINK = "isAtomicDbSink";
+
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java
index 5319f41..87802b9 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.streams.common.functions;
import java.io.Serializable;
import java.util.List;
-public interface FlatMapFunction <T, O> extends Function{
+public interface FlatMapFunction <OUT, IN> extends Function, Serializable {
- List<T> flatMap(O message) throws Exception;
+ List<OUT> flatMap(IN message) throws Exception;
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/cachefilter/CacheFilterMeta.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/cachefilter/CacheFilterMeta.java
index ab4612b..bacb7b8 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/cachefilter/CacheFilterMeta.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/cachefilter/CacheFilterMeta.java
@@ -1,6 +1,20 @@
package org.apache.rocketmq.streams.common.optimization.cachefilter;
-
-import java.util.BitSet;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
import java.util.Map;
import org.apache.rocketmq.streams.common.cache.compress.BitSetCache;
import org.apache.rocketmq.streams.common.context.AbstractContext;
diff --git a/rocketmq-streams-configurable/pom.xml b/rocketmq-streams-configurable/pom.xml
index f160060..baacd27 100755
--- a/rocketmq-streams-configurable/pom.xml
+++ b/rocketmq-streams-configurable/pom.xml
@@ -1,4 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
+
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
diff --git a/rocketmq-streams-state/pom.xml b/rocketmq-streams-state/pom.xml
old mode 100755
new mode 100644